ClickHouse 示例
ClickHouse列式数据库,包含高性能分析、时序处理和实时仪表板示例
💻 ClickHouse 基础操作 sql
🟢 simple
ClickHouse基础操作,包括表创建、数据插入、基本查询和聚合
-- ClickHouse Basic Operations
-- 1. Database Operations
CREATE DATABASE IF NOT EXISTS analytics_db;
USE analytics_db;
-- 2. Table Creation with MergeTree Engine
CREATE TABLE IF NOT EXISTS events (
event_id UInt64,
event_time DateTime,
event_type LowCardinality(String),
user_id UInt64,
page_url String,
ip_address IPv4,
country LowCardinality(FixedString(2)),
device_type LowCardinality(String),
revenue Decimal(10, 2)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;
-- 3. Insert Sample Data
INSERT INTO events VALUES
(1, '2024-01-01 10:00:00', 'page_view', 12345, '/home', '192.168.1.1', 'US', 'desktop', 0.00),
(2, '2024-01-01 10:01:00', 'click', 12345, '/products/123', '192.168.1.1', 'US', 'desktop', 0.00),
(3, '2024-01-01 10:02:00', 'purchase', 12345, '/checkout', '192.168.1.1', 'US', 'desktop', 29.99),
(4, '2024-01-01 10:05:00', 'page_view', 67890, '/home', '10.0.0.1', 'UK', 'mobile', 0.00),
(5, '2024-01-01 10:06:00', 'page_view', 67890, '/products', '10.0.0.1', 'UK', 'mobile', 0.00);
-- 4. Basic Queries
SELECT * FROM events LIMIT 10;
-- Filter by event type
SELECT * FROM events WHERE event_type = 'purchase';
-- 5. Aggregation Functions
SELECT
event_type,
count() as event_count,
sum(revenue) as total_revenue,
avg(revenue) as avg_revenue
FROM events
GROUP BY event_type;
-- 6. Time-based aggregations
SELECT
toYYYYMM(event_time) as month,
count() as total_events,
countDistinct(user_id) as unique_users,
sum(revenue) as monthly_revenue
FROM events
GROUP BY month
ORDER BY month;
💻 ClickHouse 集成模式 sql
🟡 intermediate
与数据管道、外部系统、监控和备份策略的集成
-- ClickHouse Integration Patterns
-- 1. External Data Sources
-- MySQL Engine for real-time integration
CREATE TABLE IF NOT EXISTS mysql_users (
id UInt32,
name String,
email String,
created_at DateTime
) ENGINE = MySQL('mysql-host:3306', 'database', 'users', 'user', 'password');
-- PostgreSQL Engine
CREATE TABLE IF NOT EXISTS pg_products (
id UInt32,
name String,
price Decimal(10, 2),
category String
) ENGINE = PostgreSQL('postgres-host:5432', 'database', 'products', 'user', 'password');
-- 2. Data Pipeline Integration
-- Buffer table for streaming data
CREATE TABLE IF NOT EXISTS raw_events_buffer (
timestamp DateTime,
json_data String
) ENGINE = Buffer(analytics_db, events, 16, 10, 100, 10000, 1000000);
-- Materialized View for automatic data processing
CREATE MATERIALIZED VIEW IF NOT EXISTS events_processing TO events
AS
SELECT
JSONExtractUInt64(json_data, 'event_id') as event_id,
parseDateTimeBestEffort(JSONExtractString(json_data, 'timestamp')) as event_time,
JSONExtractString(json_data, 'event_type') as event_type,
JSONExtractUInt64(json_data, 'user_id') as user_id,
JSONExtractString(json_data, 'page_url') as page_url,
IPv4StringToNum(JSONExtractString(json_data, 'ip_address')) as ip_address,
JSONExtractString(json_data, 'country') as country,
JSONExtractString(json_data, 'device_type') as device_type,
JSONExtractDecimal(json_data, 'revenue', 10, 2) as revenue
FROM raw_events_buffer;
-- 3. Kafka Integration
-- Kafka Engine for real-time data ingestion
CREATE TABLE IF NOT EXISTS kafka_events (
timestamp DateTime,
user_id UInt64,
event_type String,
metadata String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka-broker:9092',
kafka_topic_list = 'events_topic',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 3;
-- Materialized View to consume from Kafka
CREATE MATERIALIZED VIEW IF NOT EXISTS kafka_consumer TO events
AS
SELECT
generateUUIDv4() as event_id,
timestamp as event_time,
event_type,
user_id,
'' as page_url,
toIPv4('127.0.0.1') as ip_address,
'' as country,
'' as device_type,
0.00 as revenue
FROM kafka_events;
-- 4. Replication and Cluster Setup
-- ReplicatedMergeTree for high availability
CREATE TABLE IF NOT EXISTS replicated_events ON CLUSTER '{cluster}'
(
event_id UInt64,
event_time DateTime,
event_type LowCardinality(String),
user_id UInt64,
page_url String,
ip_address IPv4,
country LowCardinality(FixedString(2)),
device_type LowCardinality(String),
revenue Decimal(10, 2)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;
-- Distributed table across cluster
CREATE TABLE IF NOT EXISTS distributed_events ON CLUSTER '{cluster}'
(
event_id UInt64,
event_time DateTime,
event_type LowCardinality(String),
user_id UInt64,
page_url String,
ip_address IPv4,
country LowCardinality(FixedString(2)),
device_type LowCardinality(String),
revenue Decimal(10, 2)
)
ENGINE = Distributed('{cluster}', analytics_db, replicated_events, cityHash64(user_id));
-- 5. Backup and Export
-- Export to Parquet
INSERT INTO TABLE FUNCTION
file('events_backup.parquet', 'Parquet')
SELECT * FROM events
WHERE event_time >= '2024-01-01';
-- Export to CSV
INSERT INTO TABLE FUNCTION
file('events_export.csv', 'CSV')
SELECT
event_id,
toString(event_time),
event_type,
user_id,
page_url,
toString(ip_address),
country,
device_type,
toString(revenue)
FROM events
WHERE event_time >= '2024-01-01';
-- 6. Monitoring and Health Checks
-- System monitoring queries
SELECT
database,
table,
sum(bytes) as table_size,
sum(rows) as total_rows,
round(sum(bytes) / 1024 / 1024 / 1024, 2) as size_gb
FROM system.parts
WHERE active = 1
GROUP BY database, table
ORDER BY table_size DESC;
-- Query performance analysis
SELECT
query,
count() as execution_count,
avg(query_duration_ms) as avg_duration_ms,
max(query_duration_ms) as max_duration_ms,
sum(read_rows) as total_rows_read,
sum(read_bytes) as total_bytes_read
FROM system.query_log
WHERE event_time >= now() - INTERVAL 1 HOUR
AND type = 'QueryFinish'
GROUP BY query
ORDER BY avg_duration_ms DESC
LIMIT 10;
-- 7. Data Quality Checks
-- Data validation queries
SELECT
'Missing user_id' as issue,
count() as count
FROM events
WHERE user_id = 0
UNION ALL
SELECT
'Invalid revenue' as issue,
count() as count
FROM events
WHERE revenue < 0
UNION ALL
SELECT
'Future dates' as issue,
count() as count
FROM events
WHERE event_time > now()
UNION ALL
SELECT
'Empty page URLs' as issue,
count() as count
FROM events
WHERE event_type IN ('page_view', 'click') AND empty(page_url);
-- 8. Automation Scripts
-- Partition maintenance
ALTER TABLE events DROP PARTITION '202401';
-- Merge optimization
OPTIMIZE TABLE events PARTITION '202401' FINAL;
-- Table statistics update
ANALYZE TABLE events;
💻 ClickHouse 性能优化 sql
🔴 complex
ClickHouse性能调优,包括分区策略、压缩、索引和查询优化
-- ClickHouse Performance Optimization
-- 1. Partitioning Strategies
CREATE TABLE IF NOT EXISTS events_optimized (
event_id UInt64,
event_time DateTime,
event_type LowCardinality(String),
user_id UInt64,
revenue Decimal(10, 2)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;
-- 2. Indexing Optimization
CREATE TABLE IF NOT EXISTS events_with_indexes (
event_id UInt64,
event_time DateTime,
event_type LowCardinality(String),
user_id UInt64,
page_url String,
country LowCardinality(FixedString(2)),
revenue Decimal(10, 2),
INDEX idx_event_type (event_type) TYPE minmax GRANULARITY 1,
INDEX idx_user_id (user_id) TYPE set(1000) GRANULARITY 1
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;
-- 3. Compression Optimization
CREATE TABLE IF NOT EXISTS events_compressed (
event_id UInt64 CODEC(DoubleDelta),
event_time DateTime CODEC(Delta),
event_type LowCardinality(String) CODEC(ZSTD(3)),
user_id UInt64 CODEC(DoubleDelta),
page_url String CODEC(ZSTD(5)),
country LowCardinality(FixedString(2)) CODEC(ZSTD(1)),
revenue Decimal(10, 2) CODEC(Gorilla)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;
-- 4. Query Optimization with PREWHERE
SELECT
user_id,
count() as event_count,
sum(revenue) as total_revenue
FROM events
PREWHERE event_time >= '2024-01-01'
WHERE event_type = 'purchase'
GROUP BY user_id
HAVING total_revenue > 100
ORDER BY total_revenue DESC;
-- 5. Performance Monitoring
SELECT
query,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
memory_usage
FROM system.query_log
WHERE event_time >= now() - INTERVAL 1 HOUR
AND type = 'QueryFinish'
ORDER BY query_duration_ms DESC
LIMIT 10;
💻 高级分析功能 sql
🔴 complex
复杂分析功能,包括窗口函数、时序分析、漏斗分析和统计操作
-- ClickHouse Advanced Analytics
-- 1. Window Functions and Time Series Analysis
CREATE TABLE IF NOT EXISTS user_sessions (
session_id UUID,
user_id UInt64,
start_time DateTime,
end_time DateTime,
page_views UInt32,
revenue Decimal(10, 2),
device_type LowCardinality(String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(start_time)
ORDER BY (user_id, start_time);
-- Session analysis with window functions
SELECT
user_id,
session_id,
start_time,
page_views,
revenue,
lag(revenue) OVER (PARTITION BY user_id ORDER BY start_time) as prev_session_revenue,
revenue - lag(revenue) OVER (PARTITION BY user_id ORDER BY start_time) as revenue_change,
count() OVER (PARTITION BY user_id ORDER BY start_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as session_rank
FROM user_sessions
WHERE start_time >= '2024-01-01'
ORDER BY user_id, start_time;
-- 2. Funnel Analysis
-- E-commerce funnel events
CREATE TABLE IF NOT EXISTS funnel_events (
user_id UInt64,
event_time DateTime,
event_type LowCardinality(String),
product_id UInt64,
step_number UInt8
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, step_number);
-- Conversion funnel analysis
SELECT
step_number,
event_type,
countDistinct(user_id) as unique_users,
countDistinct(user_id) * 100.0 / (SELECT countDistinct(user_id) FROM funnel_events WHERE step_number = 1) as conversion_rate
FROM funnel_events
WHERE event_time >= '2024-01-01'
GROUP BY step_number, event_type
ORDER BY step_number;
-- 3. Cohort Analysis
-- User registration and activity tracking
CREATE TABLE IF NOT EXISTS user_activity (
user_id UInt64,
registration_date Date,
activity_date Date,
activity_type LowCardinality(String),
session_duration UInt32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(registration_date)
ORDER BY (user_id, activity_date);
-- Monthly cohort retention analysis
WITH
cohort_sizes AS (
SELECT
registration_date,
countDistinct(user_id) as cohort_size
FROM user_activity
WHERE activity_type = 'registration'
GROUP BY registration_date
),
retention_data AS (
SELECT
ua.registration_date,
ua.activity_date,
dateDiff('month', ua.registration_date, ua.activity_date) as month_number,
countDistinct(ua.user_id) as active_users
FROM user_activity ua
WHERE ua.activity_type = 'session'
GROUP BY ua.registration_date, ua.activity_date
)
SELECT
r.registration_date,
r.month_number,
r.active_users,
c.cohort_size,
round(r.active_users * 100.0 / c.cohort_size, 2) as retention_rate
FROM retention_data r
JOIN cohort_sizes c ON r.registration_date = c.registration_date
WHERE r.month_number <= 12
ORDER BY r.registration_date, r.month_number;
-- 4. Statistical Analysis
-- Revenue statistics per user
SELECT
user_id,
sum(revenue) as total_revenue,
avg(revenue) as avg_revenue,
quantile(0.5)(revenue) as median_revenue,
quantile(0.95)(revenue) as p95_revenue,
quantile(0.99)(revenue) as p99_revenue,
stddevSamp(revenue) as revenue_stddev,
correlation(revenue, session_duration) as revenue_session_corr
FROM user_sessions
WHERE start_time >= '2024-01-01'
GROUP BY user_id
HAVING total_revenue > 0
ORDER BY total_revenue DESC;
-- 5. Time Series Anomaly Detection
-- Detect unusual traffic patterns
WITH
daily_metrics AS (
SELECT
toDate(event_time) as date,
count() as total_events,
countDistinct(user_id) as unique_users,
sum(revenue) as total_revenue
FROM events
WHERE event_time >= '2024-01-01'
GROUP BY date
),
baseline_stats AS (
SELECT
date,
total_events,
unique_users,
total_revenue,
avg(total_events) OVER (ORDER BY date ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING) as avg_events_7d,
stddevSamp(total_events) OVER (ORDER BY date ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING) as std_events_7d
FROM daily_metrics
)
SELECT
date,
total_events,
avg_events_7d,
std_events_7d,
abs(total_events - avg_events_7d) / nullIf(std_events_7d, 0) as z_score,
CASE
WHEN abs(total_events - avg_events_7d) / nullIf(std_events_7d, 0) > 2 THEN 'Anomaly'
ELSE 'Normal'
END as anomaly_flag
FROM baseline_stats
WHERE date >= '2024-01-01'
ORDER BY date DESC;
-- 6. Real-time Dashboard Queries
-- KPI Dashboard
SELECT
'Today' as period,
count() as total_events,
countDistinct(user_id) as unique_users,
sum(revenue) as total_revenue,
avg(revenue) as avg_revenue_per_event,
countDistinct(ip_address) as unique_ips
FROM events
WHERE event_time >= today()
UNION ALL
SELECT
'Yesterday' as period,
count() as total_events,
countDistinct(user_id) as unique_users,
sum(revenue) as total_revenue,
avg(revenue) as avg_revenue_per_event,
countDistinct(ip_address) as unique_ips
FROM events
WHERE event_time >= yesterday() AND event_time < today()
UNION ALL
SELECT
'Last 7 Days' as period,
count() as total_events,
countDistinct(user_id) as unique_users,
sum(revenue) as total_revenue,
avg(revenue) as avg_revenue_per_event,
countDistinct(ip_address) as unique_ips
FROM events
WHERE event_time >= today() - INTERVAL 7 DAY;