ClickHouse Beispiele

ClickHouse Spaltendatenbank mit Beispielen für High-Performance-Analysen, Zeitreihenverarbeitung und Echtzeit-Dashboards

💻 ClickHouse Grundoperationen sql

🟢 simple

Grundlegende ClickHouse-Operationen einschließlich Tabellenerstellung, Dateneinfügung, grundlegende Abfragen und Aggregationen

-- ClickHouse Basic Operations

-- 1. Database Operations
CREATE DATABASE IF NOT EXISTS analytics_db;
USE analytics_db;

-- 2. Table Creation with MergeTree Engine
CREATE TABLE IF NOT EXISTS events (
    event_id UInt64,
    event_time DateTime,
    event_type LowCardinality(String),
    user_id UInt64,
    page_url String,
    ip_address IPv4,
    country LowCardinality(FixedString(2)),
    device_type LowCardinality(String),
    revenue Decimal(10, 2)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;

-- 3. Insert Sample Data
INSERT INTO events VALUES
(1, '2024-01-01 10:00:00', 'page_view', 12345, '/home', '192.168.1.1', 'US', 'desktop', 0.00),
(2, '2024-01-01 10:01:00', 'click', 12345, '/products/123', '192.168.1.1', 'US', 'desktop', 0.00),
(3, '2024-01-01 10:02:00', 'purchase', 12345, '/checkout', '192.168.1.1', 'US', 'desktop', 29.99),
(4, '2024-01-01 10:05:00', 'page_view', 67890, '/home', '10.0.0.1', 'UK', 'mobile', 0.00),
(5, '2024-01-01 10:06:00', 'page_view', 67890, '/products', '10.0.0.1', 'UK', 'mobile', 0.00);

-- 4. Basic Queries
SELECT * FROM events LIMIT 10;

-- Filter by event type
SELECT * FROM events WHERE event_type = 'purchase';

-- 5. Aggregation Functions
SELECT
    event_type,
    count() as event_count,
    sum(revenue) as total_revenue,
    avg(revenue) as avg_revenue
FROM events
GROUP BY event_type;

-- 6. Time-based aggregations
SELECT
    toYYYYMM(event_time) as month,
    count() as total_events,
    countDistinct(user_id) as unique_users,
    sum(revenue) as monthly_revenue
FROM events
GROUP BY month
ORDER BY month;

💻 ClickHouse Integrationsmuster sql

🟡 intermediate

Integration mit Daten-Pipelines, externen Systemen, Überwachung und Backup-Strategien

-- ClickHouse Integration Patterns

-- 1. External Data Sources
-- MySQL Engine for real-time integration
CREATE TABLE IF NOT EXISTS mysql_users (
    id UInt32,
    name String,
    email String,
    created_at DateTime
) ENGINE = MySQL('mysql-host:3306', 'database', 'users', 'user', 'password');

-- PostgreSQL Engine
CREATE TABLE IF NOT EXISTS pg_products (
    id UInt32,
    name String,
    price Decimal(10, 2),
    category String
) ENGINE = PostgreSQL('postgres-host:5432', 'database', 'products', 'user', 'password');

-- 2. Data Pipeline Integration
-- Buffer table for streaming data
CREATE TABLE IF NOT EXISTS raw_events_buffer (
    timestamp DateTime,
    json_data String
) ENGINE = Buffer(analytics_db, events, 16, 10, 100, 10000, 1000000);

-- Materialized View for automatic data processing
CREATE MATERIALIZED VIEW IF NOT EXISTS events_processing TO events
AS
SELECT
    JSONExtractUInt64(json_data, 'event_id') as event_id,
    parseDateTimeBestEffort(JSONExtractString(json_data, 'timestamp')) as event_time,
    JSONExtractString(json_data, 'event_type') as event_type,
    JSONExtractUInt64(json_data, 'user_id') as user_id,
    JSONExtractString(json_data, 'page_url') as page_url,
    IPv4StringToNum(JSONExtractString(json_data, 'ip_address')) as ip_address,
    JSONExtractString(json_data, 'country') as country,
    JSONExtractString(json_data, 'device_type') as device_type,
    JSONExtractDecimal(json_data, 'revenue', 10, 2) as revenue
FROM raw_events_buffer;

-- 3. Kafka Integration
-- Kafka Engine for real-time data ingestion
CREATE TABLE IF NOT EXISTS kafka_events (
    timestamp DateTime,
    user_id UInt64,
    event_type String,
    metadata String
) ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka-broker:9092',
    kafka_topic_list = 'events_topic',
    kafka_group_name = 'clickhouse_consumer',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 3;

-- Materialized View to consume from Kafka
CREATE MATERIALIZED VIEW IF NOT EXISTS kafka_consumer TO events
AS
SELECT
    generateUUIDv4() as event_id,
    timestamp as event_time,
    event_type,
    user_id,
    '' as page_url,
    toIPv4('127.0.0.1') as ip_address,
    '' as country,
    '' as device_type,
    0.00 as revenue
FROM kafka_events;

-- 4. Replication and Cluster Setup
-- ReplicatedMergeTree for high availability
CREATE TABLE IF NOT EXISTS replicated_events ON CLUSTER '{cluster}'
(
    event_id UInt64,
    event_time DateTime,
    event_type LowCardinality(String),
    user_id UInt64,
    page_url String,
    ip_address IPv4,
    country LowCardinality(FixedString(2)),
    device_type LowCardinality(String),
    revenue Decimal(10, 2)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;

-- Distributed table across cluster
CREATE TABLE IF NOT EXISTS distributed_events ON CLUSTER '{cluster}'
(
    event_id UInt64,
    event_time DateTime,
    event_type LowCardinality(String),
    user_id UInt64,
    page_url String,
    ip_address IPv4,
    country LowCardinality(FixedString(2)),
    device_type LowCardinality(String),
    revenue Decimal(10, 2)
)
ENGINE = Distributed('{cluster}', analytics_db, replicated_events, cityHash64(user_id));

-- 5. Backup and Export
-- Export to Parquet
INSERT INTO TABLE FUNCTION
file('events_backup.parquet', 'Parquet')
SELECT * FROM events
WHERE event_time >= '2024-01-01';

-- Export to CSV
INSERT INTO TABLE FUNCTION
file('events_export.csv', 'CSV')
SELECT
    event_id,
    toString(event_time),
    event_type,
    user_id,
    page_url,
    toString(ip_address),
    country,
    device_type,
    toString(revenue)
FROM events
WHERE event_time >= '2024-01-01';

-- 6. Monitoring and Health Checks
-- System monitoring queries
SELECT
    database,
    table,
    sum(bytes) as table_size,
    sum(rows) as total_rows,
    round(sum(bytes) / 1024 / 1024 / 1024, 2) as size_gb
FROM system.parts
WHERE active = 1
GROUP BY database, table
ORDER BY table_size DESC;

-- Query performance analysis
SELECT
    query,
    count() as execution_count,
    avg(query_duration_ms) as avg_duration_ms,
    max(query_duration_ms) as max_duration_ms,
    sum(read_rows) as total_rows_read,
    sum(read_bytes) as total_bytes_read
FROM system.query_log
WHERE event_time >= now() - INTERVAL 1 HOUR
  AND type = 'QueryFinish'
GROUP BY query
ORDER BY avg_duration_ms DESC
LIMIT 10;

-- 7. Data Quality Checks
-- Data validation queries
SELECT
    'Missing user_id' as issue,
    count() as count
FROM events
WHERE user_id = 0

UNION ALL

SELECT
    'Invalid revenue' as issue,
    count() as count
FROM events
WHERE revenue < 0

UNION ALL

SELECT
    'Future dates' as issue,
    count() as count
FROM events
WHERE event_time > now()

UNION ALL

SELECT
    'Empty page URLs' as issue,
    count() as count
FROM events
WHERE event_type IN ('page_view', 'click') AND empty(page_url);

-- 8. Automation Scripts
-- Partition maintenance
ALTER TABLE events DROP PARTITION '202401';

-- Merge optimization
OPTIMIZE TABLE events PARTITION '202401' FINAL;

-- Table statistics update
ANALYZE TABLE events;

💻 ClickHouse Performance-Optimierung sql

🔴 complex

ClickHouse-Performance-Tuning mit Partitionierungsstrategien, Komprimierung, Indizierung und Abfrageoptimierung

-- ClickHouse Performance Optimization

-- 1. Partitioning Strategies
CREATE TABLE IF NOT EXISTS events_optimized (
    event_id UInt64,
    event_time DateTime,
    event_type LowCardinality(String),
    user_id UInt64,
    revenue Decimal(10, 2)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;

-- 2. Indexing Optimization
CREATE TABLE IF NOT EXISTS events_with_indexes (
    event_id UInt64,
    event_time DateTime,
    event_type LowCardinality(String),
    user_id UInt64,
    page_url String,
    country LowCardinality(FixedString(2)),
    revenue Decimal(10, 2),
    INDEX idx_event_type (event_type) TYPE minmax GRANULARITY 1,
    INDEX idx_user_id (user_id) TYPE set(1000) GRANULARITY 1
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;

-- 3. Compression Optimization
CREATE TABLE IF NOT EXISTS events_compressed (
    event_id UInt64 CODEC(DoubleDelta),
    event_time DateTime CODEC(Delta),
    event_type LowCardinality(String) CODEC(ZSTD(3)),
    user_id UInt64 CODEC(DoubleDelta),
    page_url String CODEC(ZSTD(5)),
    country LowCardinality(FixedString(2)) CODEC(ZSTD(1)),
    revenue Decimal(10, 2) CODEC(Gorilla)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, event_type)
SETTINGS index_granularity = 8192;

-- 4. Query Optimization with PREWHERE
SELECT
    user_id,
    count() as event_count,
    sum(revenue) as total_revenue
FROM events
PREWHERE event_time >= '2024-01-01'
WHERE event_type = 'purchase'
GROUP BY user_id
HAVING total_revenue > 100
ORDER BY total_revenue DESC;

-- 5. Performance Monitoring
SELECT
    query,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    memory_usage
FROM system.query_log
WHERE event_time >= now() - INTERVAL 1 HOUR
  AND type = 'QueryFinish'
ORDER BY query_duration_ms DESC
LIMIT 10;

💻 Erweiterte Analytics-Funktionen sql

🔴 complex

Komplexe Analytik einschließlich Fensterfunktionen, Zeitreihenanalyse, Trichteranalyse und statistische Operationen

-- ClickHouse Advanced Analytics

-- 1. Window Functions and Time Series Analysis
CREATE TABLE IF NOT EXISTS user_sessions (
    session_id UUID,
    user_id UInt64,
    start_time DateTime,
    end_time DateTime,
    page_views UInt32,
    revenue Decimal(10, 2),
    device_type LowCardinality(String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(start_time)
ORDER BY (user_id, start_time);

-- Session analysis with window functions
SELECT
    user_id,
    session_id,
    start_time,
    page_views,
    revenue,
    lag(revenue) OVER (PARTITION BY user_id ORDER BY start_time) as prev_session_revenue,
    revenue - lag(revenue) OVER (PARTITION BY user_id ORDER BY start_time) as revenue_change,
    count() OVER (PARTITION BY user_id ORDER BY start_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as session_rank
FROM user_sessions
WHERE start_time >= '2024-01-01'
ORDER BY user_id, start_time;

-- 2. Funnel Analysis
-- E-commerce funnel events
CREATE TABLE IF NOT EXISTS funnel_events (
    user_id UInt64,
    event_time DateTime,
    event_type LowCardinality(String),
    product_id UInt64,
    step_number UInt8
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time, step_number);

-- Conversion funnel analysis
SELECT
    step_number,
    event_type,
    countDistinct(user_id) as unique_users,
    countDistinct(user_id) * 100.0 / (SELECT countDistinct(user_id) FROM funnel_events WHERE step_number = 1) as conversion_rate
FROM funnel_events
WHERE event_time >= '2024-01-01'
GROUP BY step_number, event_type
ORDER BY step_number;

-- 3. Cohort Analysis
-- User registration and activity tracking
CREATE TABLE IF NOT EXISTS user_activity (
    user_id UInt64,
    registration_date Date,
    activity_date Date,
    activity_type LowCardinality(String),
    session_duration UInt32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(registration_date)
ORDER BY (user_id, activity_date);

-- Monthly cohort retention analysis
WITH
cohort_sizes AS (
    SELECT
        registration_date,
        countDistinct(user_id) as cohort_size
    FROM user_activity
    WHERE activity_type = 'registration'
    GROUP BY registration_date
),

retention_data AS (
    SELECT
        ua.registration_date,
        ua.activity_date,
        dateDiff('month', ua.registration_date, ua.activity_date) as month_number,
        countDistinct(ua.user_id) as active_users
    FROM user_activity ua
    WHERE ua.activity_type = 'session'
    GROUP BY ua.registration_date, ua.activity_date
)

SELECT
    r.registration_date,
    r.month_number,
    r.active_users,
    c.cohort_size,
    round(r.active_users * 100.0 / c.cohort_size, 2) as retention_rate
FROM retention_data r
JOIN cohort_sizes c ON r.registration_date = c.registration_date
WHERE r.month_number <= 12
ORDER BY r.registration_date, r.month_number;

-- 4. Statistical Analysis
-- Revenue statistics per user
SELECT
    user_id,
    sum(revenue) as total_revenue,
    avg(revenue) as avg_revenue,
    quantile(0.5)(revenue) as median_revenue,
    quantile(0.95)(revenue) as p95_revenue,
    quantile(0.99)(revenue) as p99_revenue,
    stddevSamp(revenue) as revenue_stddev,
    correlation(revenue, session_duration) as revenue_session_corr
FROM user_sessions
WHERE start_time >= '2024-01-01'
GROUP BY user_id
HAVING total_revenue > 0
ORDER BY total_revenue DESC;

-- 5. Time Series Anomaly Detection
-- Detect unusual traffic patterns
WITH
daily_metrics AS (
    SELECT
        toDate(event_time) as date,
        count() as total_events,
        countDistinct(user_id) as unique_users,
        sum(revenue) as total_revenue
    FROM events
    WHERE event_time >= '2024-01-01'
    GROUP BY date
),

baseline_stats AS (
    SELECT
        date,
        total_events,
        unique_users,
        total_revenue,
        avg(total_events) OVER (ORDER BY date ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING) as avg_events_7d,
        stddevSamp(total_events) OVER (ORDER BY date ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING) as std_events_7d
    FROM daily_metrics
)

SELECT
    date,
    total_events,
    avg_events_7d,
    std_events_7d,
    abs(total_events - avg_events_7d) / nullIf(std_events_7d, 0) as z_score,
    CASE
        WHEN abs(total_events - avg_events_7d) / nullIf(std_events_7d, 0) > 2 THEN 'Anomaly'
        ELSE 'Normal'
    END as anomaly_flag
FROM baseline_stats
WHERE date >= '2024-01-01'
ORDER BY date DESC;

-- 6. Real-time Dashboard Queries
-- KPI Dashboard
SELECT
    'Today' as period,
    count() as total_events,
    countDistinct(user_id) as unique_users,
    sum(revenue) as total_revenue,
    avg(revenue) as avg_revenue_per_event,
    countDistinct(ip_address) as unique_ips
FROM events
WHERE event_time >= today()

UNION ALL

SELECT
    'Yesterday' as period,
    count() as total_events,
    countDistinct(user_id) as unique_users,
    sum(revenue) as total_revenue,
    avg(revenue) as avg_revenue_per_event,
    countDistinct(ip_address) as unique_ips
FROM events
WHERE event_time >= yesterday() AND event_time < today()

UNION ALL

SELECT
    'Last 7 Days' as period,
    count() as total_events,
    countDistinct(user_id) as unique_users,
    sum(revenue) as total_revenue,
    avg(revenue) as avg_revenue_per_event,
    countDistinct(ip_address) as unique_ips
FROM events
WHERE event_time >= today() - INTERVAL 7 DAY;