Exemples d'Apache Kafka

Exemples de plateforme de streaming distribué Apache Kafka incluant producteurs, consommateurs, streams, et configuration de cluster

Key Facts

Category
Data Processing
Items
3
Format Families
yaml

Sample Overview

Exemples de plateforme de streaming distribué Apache Kafka incluant producteurs, consommateurs, streams, et configuration de cluster This sample set belongs to Data Processing and can be used to test related workflows inside Elysia Tools.

💻 Concepts de Base et Configuration Kafka javascript

🟢 simple ⭐⭐

Terminologie essentielle de Kafka, architecture, et exemples de base de producteur/consommateur

⏱️ 25 min 🏷️ kafka, producer, consumer, basics
Prerequisites: Node.js, Basic messaging concepts, Docker
// Apache Kafka Basic Concepts and Examples

// 1. Installation
// Download Apache Kafka: https://kafka.apache.org/downloads
// or use Docker: docker compose -f kafka-docker-compose.yml up

// 2. Kafka Architecture Overview
/*
Topic: Category or feed name to which records are published
Producer: Publishes messages to one or more Kafka topics
Consumer: Subscribes to topics and processes the stream of records
Broker: Kafka server in a cluster
Cluster: Collection of brokers working together
Partition: Fundamental unit of parallelism in Kafka
Offset: Unique identifier for each record within a partition
Consumer Group: Group of consumers that share a topic for load balancing
*/

// 3. Basic Producer Example (Node.js)
// producer.js
const { Kafka } = require('kafkajs');

class BasicProducer {
  constructor() {
    this.kafka = new Kafka({
      clientId: 'basic-producer',
      brokers: ['localhost:9092']
    });
    this.producer = this.kafka.producer();
  }

  async sendMessage(topic, message) {
    try {
      await this.producer.connect();

      const result = await this.producer.send({
        topic,
        messages: [
          {
            key: message.key,
            value: JSON.stringify(message.value),
            headers: message.headers || {},
            timestamp: Date.now()
          }
        ]
      });

      console.log('Message sent successfully:', result);
      return result;
    } catch (error) {
      console.error('Error sending message:', error);
      throw error;
    } finally {
      await this.producer.disconnect();
    }
  }

  async sendBatchMessages(topic, messages) {
    try {
      await this.producer.connect();

      const kafkaMessages = messages.map(msg => ({
        key: msg.key,
        value: JSON.stringify(msg.value),
        headers: msg.headers || {},
        timestamp: Date.now()
      }));

      const result = await this.producer.send({
        topic,
        messages: kafkaMessages
      });

      console.log(`Batch of ${messages.length} messages sent successfully`);
      return result;
    } catch (error) {
      console.error('Error sending batch messages:', error);
      throw error;
    } finally {
      await this.producer.disconnect();
    }
  }
}

// Usage example
async function runProducerExample() {
  const producer = new BasicProducer();

  // Send single message
  await producer.sendMessage('user-events', {
    key: 'user-123',
    value: {
      userId: '123',
      action: 'login',
      timestamp: new Date().toISOString(),
      metadata: { ip: '192.168.1.1' }
    }
  });

  // Send batch messages
  const batchMessages = [
    {
      key: 'order-1',
      value: { orderId: '1', status: 'created', amount: 99.99 }
    },
    {
      key: 'order-2',
      value: { orderId: '2', status: 'shipped', amount: 149.99 }
    }
  ];

  await producer.sendBatchMessages('order-events', batchMessages);
}

// 4. Basic Consumer Example (Node.js)
// consumer.js
class BasicConsumer {
  constructor(groupId) {
    this.kafka = new Kafka({
      clientId: 'basic-consumer',
      brokers: ['localhost:9092']
    });
    this.consumer = this.kafka.consumer({ groupId });
    this.isRunning = false;
  }

  async consume(topic, messageHandler) {
    try {
      await this.consumer.connect();
      await this.consumer.subscribe({ topic, fromBeginning: false });

      this.isRunning = true;

      await this.consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          if (!this.isRunning) return;

          try {
            const parsedMessage = JSON.parse(message.value.toString());
            await messageHandler({
              topic,
              partition,
              offset: message.offset,
              key: message.key?.toString(),
              value: parsedMessage,
              timestamp: message.timestamp,
              headers: message.headers
            });
          } catch (parseError) {
            console.error('Error parsing message:', parseError);
            // Move to next message even if parsing fails
          }
        },
        eachBatch: async ({ batch }) => {
          console.log(`Processing batch of ${batch.messages.length} messages from partition ${batch.partition}`);
        }
      });
    } catch (error) {
      console.error('Consumer error:', error);
      throw error;
    }
  }

  async stop() {
    console.log('Stopping consumer...');
    this.isRunning = false;
    await this.consumer.disconnect();
  }
}

// Usage example
async function runConsumerExample() {
  const consumer = new BasicConsumer('user-processor-group');

  await consumer.consume('user-events', async (message) => {
    console.log(`Received message from ${message.topic}:`);
    console.log(`User ID: ${message.value.userId}`);
    console.log(`Action: ${message.value.action}`);

    // Process the message (e.g., save to database, trigger analytics, etc.)
    if (message.value.action === 'login') {
      console.log('Processing login event...');
      // Add your business logic here
    }
  });
}

// Demo entry point (CommonJS-friendly, no top-level await)
async function runExamples() {
  await runProducerExample();
  await runConsumerExample();
}

runExamples().catch(console.error);

// 5. Docker Compose for Kafka Development
// kafka-docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://host.docker.internal:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

  # Optional: Schema Registry for Avro/Protobuf schemas
  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    container_name: schema-registry
    depends_on:
      - kafka
      - zookeeper
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

volumes:
  zookeeper-data:
  kafka-data:

// 6. Topic Management Scripts
// topics.js
const { Kafka } = require('kafkajs');

class TopicManager {
  constructor() {
    this.kafka = new Kafka({
      clientId: 'topic-manager',
      brokers: ['localhost:9092']
    });
    this.admin = this.kafka.admin();
  }

  async createTopic(topicName, partitions = 1, replicationFactor = 1) {
    try {
      await this.admin.connect();

      const topicConfig = {
        topic: topicName,
        partitions: partitions,
        replicationFactor: replicationFactor,
        configEntries: [
          { name: 'retention.ms', value: '604800000' }, // 7 days
          { name: 'cleanup.policy', value: 'delete' },
          { name: 'segment.ms', value: '86400000' } // 1 day
        ]
      };

      await this.admin.createTopics({
        topics: [topicConfig]
      });

      console.log(`Topic '${topicName}' created successfully with ${partitions} partitions`);
    } catch (error) {
      if (error.type === 'TOPIC_ALREADY_EXISTS') {
        console.log(`Topic '${topicName}' already exists`);
      } else {
        console.error('Error creating topic:', error);
        throw error;
      }
    } finally {
      await this.admin.disconnect();
    }
  }

  async listTopics() {
    try {
      await this.admin.connect();
      const topics = await this.admin.listTopics();

      console.log('Available topics:');
      topics.topics.forEach(topic => {
        console.log(`- ${topic.name} (partitions: ${topic.partitions})`);
      });

      return topics;
    } catch (error) {
      console.error('Error listing topics:', error);
      throw error;
    } finally {
      await this.admin.disconnect();
    }
  }

  async deleteTopic(topicName) {
    try {
      await this.admin.connect();
      await this.admin.deleteTopics({
        topics: [{ topic: topicName }]
      });
      console.log(`Topic '${topicName}' deleted successfully`);
    } catch (error) {
      console.error('Error deleting topic:', error);
      throw error;
    } finally {
      await this.admin.disconnect();
    }
  }

  async getTopicMetadata(topicName) {
    try {
      await this.admin.connect();
      const metadata = await this.admin.fetchTopicMetadata({ topics: [topicName] });

      const topicMetadata = metadata.topics[0];
      console.log(`Topic metadata for '${topicName}':`);
      console.log(`Partitions: ${topicMetadata.partitions.length}`);

      topicMetadata.partitions.forEach((partition, index) => {
        console.log(`  Partition ${index}: Leader: ${partition.leader}, Replicas: ${partition.replicas.length}`);
      });

      return topicMetadata;
    } catch (error) {
      console.error('Error fetching topic metadata:', error);
      throw error;
    } finally {
      await this.admin.disconnect();
    }
  }
}

// Usage example
const topicManager = new TopicManager();

// Create topics
await topicManager.createTopic('user-events', 3, 1);
await topicManager.createTopic('order-events', 5, 1);
await topicManager.createTopic('product-updates', 2, 1);

// List all topics
await topicManager.listTopics();

// Get topic metadata
await topicManager.getTopicMetadata('user-events');

// 7. Configuration Examples

// producer.config.js
const producerConfig = {
  clientId: 'my-producer-app',
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],

  // Connection configuration
  connectionTimeout: 10000,
  requestTimeout: 30000,

  // Retry configuration
  retry: {
    initialRetryTime: 100,
    factor: 2,
    maxRetryTime: 60000,
    multiplier: 1.5,
    retries: 5
  },

  // Authentication (if using SASL/SSL)
  ssl: true,
  sasl: {
    mechanism: 'plain',
    username: 'kafka-user',
    password: 'kafka-password'
  },

  // Producer-specific settings
  allowAutoTopicCreation: false,
  maxInFlightRequests: 5,
  idempotent: true,
  transactionTimeout: 60000,

  // Compression
  compression: 'gzip',

  // Batch settings
  batchSize: 16384,
  linger: 5,

  // ACK settings
  acks: 'all',
  ackTimeoutMs: 5000,
  maxMessagesPerRequest: 100
};

// consumer.config.js
const consumerConfig = {
  clientId: 'my-consumer-app',
  groupId: 'my-consumer-group',
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],

  // Connection configuration
  connectionTimeout: 10000,
  requestTimeout: 30000,

  // Session timeout (ms)
  sessionTimeout: 30000,

  // Heartbeat interval (ms)
  heartbeatInterval: 3000,

  // Consumer offset management
  autoCommit: true,
  autoCommitInterval: 5000,
  autoCommitThreshold: 100,

  // Offset reset policy
  autoOffsetReset: 'latest', // 'earliest' or 'latest'

  // Assignment strategy
  partitionAssigners: [],

  // Range assignment strategy settings
  rangeAssignorInstance: 'my-instance',

  // Round-robin assignment strategy settings
  roundRobinAssignor: true,

  // Metadata refresh settings
  metadataMaxAge: 300000,
  metadataRequestTimeoutMs: 5000,

  // Additional consumer settings
  maxWaitTimeInMs: 5000,
  minBytes: 1,
  maxBytes: 1048576, // 1MB

  // Rate limiting
  maxMessagesPerPoll: 100,

  // SSL/TLS configuration (if required)
  ssl: {
    rejectUnauthorized: true,
    ca: [fs.readFileSync('/path/to/ca-cert.pem')],
    key: fs.readFileSync('/path/to/client-key.pem'),
    cert: fs.readFileSync('/path/to/client-cert.pem'),
    passphrase: 'your-passphrase'
  }
};

💻 Kafka Streams et Traitement Complexe javascript

🟡 intermediate ⭐⭐⭐⭐

Exemples avancés d'API Kafka Streams, motifs de traitement de streams, et analytique en temps réel

⏱️ 45 min 🏷️ kafka, streams, processing, advanced
Prerequisites: Kafka basics, Stream processing concepts, Node.js streams
// Kafka Streams Processing Examples

// 1. Stream Processing Architecture
// stream-processor.js
const { Kafka } = require('kafkajs');
const { Transform } = require('stream');

class StreamProcessor {
  constructor(config) {
    this.kafka = new Kafka(config);
    this.consumer = this.kafka.consumer({
      groupId: 'stream-processor',
      sessionTimeout: 30000,
      heartbeatInterval: 3000
    });
    this.producer = this.kafka.producer();
    this.processors = new Map();
  }

  // Register custom stream processors
  registerProcessor(topic, processor) {
    this.processors.set(topic, processor);
  }

  async start() {
    try {
      await this.consumer.connect();
      await this.producer.connect();

      // Subscribe to all topics with registered processors
      const topics = Array.from(this.processors.keys());
      await this.consumer.subscribe({ topics });

      console.log(`Started stream processor for topics: ${topics.join(', ')}`);

      await this.consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          const processor = this.processors.get(topic);
          if (processor) {
            await this.processMessage(topic, message, processor);
          }
        }
      });
    } catch (error) {
      console.error('Stream processor error:', error);
      throw error;
    }
  }

  async processMessage(topic, message, processor) {
    try {
      const startTime = Date.now();

      // Parse incoming message
      const input = JSON.parse(message.value.toString());

      // Process the message
      const output = await processor(input);

      // Send processed message to output topic
      if (output && output.topic) {
        await this.producer.send({
          topic: output.topic,
          messages: [{
            key: message.key?.toString(),
            value: JSON.stringify(output.data),
            headers: {
              ...message.headers,
              processedAt: Date.now().toString(),
              processingTime: Date.now() - startTime
            }
          }]
        });

        console.log(`Processed message from ${topic} to ${output.topic} in ${Date.now() - startTime}ms`);
      }
    } catch (error) {
      console.error(`Error processing message from ${topic}:`, error);
      // Could send to dead letter queue here
    }
  }

  async stop() {
    console.log('Stopping stream processor...');
    await this.consumer.disconnect();
    await this.producer.disconnect();
  }
}

// 2. Real-time Analytics Processor
// analytics-processor.js
class AnalyticsProcessor {
  constructor() {
    this.counters = new Map();
    this.aggregates = new Map();
    this.sessionWindow = 5 * 60 * 1000; // 5 minutes
  }

  // Process user activity events
  async processUserActivity(input) {
    const { userId, action, timestamp } = input;
    const now = Date.now();

    // Update counters
    const actionCount = this.counters.get(action) || 0;
    this.counters.set(action, actionCount + 1);

    // Update session-based aggregates
    if (!this.aggregates.has(userId)) {
      this.aggregates.set(userId, {
        sessionId: this.generateSessionId(),
        startTime: now,
        actions: [],
        lastActivity: now
      });
    }

    const userAggregate = this.aggregates.get(userId);
    userAggregate.actions.push({
      action,
      timestamp: now,
      metadata: input.metadata || {}
    });
    userAggregate.lastActivity = now;

    // Remove inactive sessions
    this.cleanupInactiveSessions(now);

    // Generate analytics events
    return this.generateAnalyticsEvents(userAggregate, action);
  }

  generateSessionId() {
    return `session_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  cleanupInactiveSessions(now) {
    for (const [userId, aggregate] of this.aggregates.entries()) {
      if (now - aggregate.lastActivity > this.sessionWindow) {
        this.aggregates.delete(userId);

        // Emit session completed event
        this.emitSessionCompleted(userId, aggregate);
      }
    }
  }

  emitSessionCompleted(userId, aggregate) {
    const sessionDuration = aggregate.lastActivity - aggregate.startTime;
    const actionCount = aggregate.actions.length;

    return {
      topic: 'session-completed',
      data: {
        userId,
        sessionId: aggregate.sessionId,
        duration: sessionDuration,
        actionCount,
        actions: aggregate.actions,
        completedAt: Date.now()
      }
    };
  }

  generateAnalyticsEvents(aggregate, currentAction) {
    const events = [];
    const actionCount = aggregate.actions.length;

    // Milestone events
    if (actionCount === 1) {
      events.push({
        topic: 'user-first-action',
        data: {
          userId: aggregate.userId,
          sessionId: aggregate.sessionId,
          firstAction: currentAction
        }
      });
    }

    if (actionCount === 10) {
      events.push({
        topic: 'user-milestone',
        data: {
          userId: aggregate.userId,
          sessionId: aggregate.sessionId,
          milestone: '10-actions'
        }
      });
    }

    // Real-time aggregation events
    if (actionCount % 5 === 0) {
      const actionCounts = this.countActionCounts(aggregate.actions);
      events.push({
        topic: 'user-activity-summary',
        data: {
          userId: aggregate.userId,
          sessionId: aggregate.sessionId,
          totalActions: actionCount,
          actionBreakdown: actionCounts,
          lastActivity: aggregate.lastActivity
        }
      });
    }

    return events;
  }

  countActionCounts(actions) {
    const counts = {};
    actions.forEach(action => {
      counts[action.action] = (counts[action.action] || 0) + 1;
    });
    return counts;
  }

  getOverallStats() {
    return {
      totalActions: Array.from(this.counters.values()).reduce((sum, count) => sum + count, 0),
      activeSessions: this.aggregates.size,
      actionBreakdown: Object.fromEntries(this.counters)
    };
  }
}

// 3. Data Transformation Pipeline
// transformation-pipeline.js
class DataTransformationPipeline {
  constructor() {
    this.transformations = [];
  }

  // Add transformation step
  addTransformation(name, transformFn) {
    this.transformations.push({ name, transformFn });
  }

  // Process data through the transformation pipeline
  async transform(input) {
    let currentData = input;
    const transformationLog = [];

    for (const { name, transformFn } of this.transformations) {
      try {
        const startTime = Date.now();
        currentData = await transformFn(currentData);
        const duration = Date.now() - startTime;

        transformationLog.push({
          step: name,
          duration,
          success: true
        });
      } catch (error) {
        transformationLog.push({
          step: name,
          error: error.message,
          success: false
        });

        // Stop pipeline on error
        break;
      }
    }

    return {
      data: currentData,
      transformations: transformationLog
    };
  }

  // Common transformation functions
  static transformations = {
    // Normalize data structure
    normalize: (data) => {
      const normalized = {
        id: data.id || data.user_id || data.orderId,
        timestamp: data.timestamp || data.created_at || Date.now(),
        type: data.type || data.event || 'unknown',
        source: data.source || 'unknown',
        payload: { ...data }
      };

      // Remove top-level fields that are now in the main structure
      delete normalized.payload.id;
      delete normalized.payload.timestamp;
      delete normalized.payload.type;
      delete normalized.payload.source;

      return normalized;
    },

    // Enrich data with additional information
    enrich: (data) => {
      return {
        ...data,
        enrichedAt: Date.now(),
        enrichedBy: 'transformation-pipeline',
        version: '1.0',
        environment: process.env.NODE_ENV || 'development',
        metadata: {
          ...data.metadata,
          country: this.extractCountry(data),
          device: this.extractDevice(data),
          platform: this.extractPlatform(data)
        }
      };
    },

    // Filter sensitive data
    filterSensitive: (data) => {
      const sensitiveFields = ['password', 'ssn', 'creditCard', 'token'];
      const filtered = { ...data };

      const filterObject = (obj) => {
        if (typeof obj !== 'object' || obj === null) return obj;

        if (Array.isArray(obj)) {
          return obj.map(filterObject);
        }

        const result = {};
        for (const [key, value] of Object.entries(obj)) {
          if (sensitiveFields.some(field => key.toLowerCase().includes(field.toLowerCase()))) {
            result[key] = '[FILTERED]';
          } else {
            result[key] = filterObject(value);
          }
        }
        return result;
      };

      return filterObject(filtered);
    },

    // Validate data
    validate: (data) => {
      const requiredFields = ['id', 'timestamp', 'type'];
      const missing = requiredFields.filter(field => !data[field]);

      if (missing.length > 0) {
        throw new Error(`Missing required fields: ${missing.join(', ')}`);
      }

      return {
        ...data,
        validated: true,
        validationTimestamp: Date.now()
      };
    },

    // Aggregate data
    aggregate: (data) => {
      const now = Date.now();
      const hourBucket = Math.floor(now / (60 * 60 * 1000));
      const dayBucket = Math.floor(now / (24 * 60 * 60 * 1000));

      return {
        ...data,
        aggregations: {
          hourly: {
            bucket: hourBucket,
            timestamp: hourBucket * (60 * 60 * 1000),
            count: 1
          },
          daily: {
            bucket: dayBucket,
            timestamp: dayBucket * (24 * 60 * 60 * 1000),
            count: 1
          }
        }
      };
    },

    // Convert to Avro schema
    toAvro: (data) => {
      // This would convert the data to Avro format
      // For demonstration, we'll just add Avro metadata
      return {
        schema: 'user_events_v1',
        avroData: data,
        convertedAt: Date.now()
      };
    }
  }

  extractCountry(data) {
    // Extract country from IP address, metadata, or other fields
    return data.country || data.ip_country || data.location?.country || 'unknown';
  }

  extractDevice(data) {
    // Extract device information from user agent or other fields
    const userAgent = data.userAgent || data.user_agent;
    if (userAgent) {
      if (userAgent.includes('Mobile')) return 'mobile';
      if (userAgent.includes('Tablet')) return 'tablet';
      if (userAgent.includes('Desktop')) return 'desktop';
    }
    return 'unknown';
  }

  extractPlatform(data) {
    // Extract platform from various fields
    const userAgent = data.userAgent || data.user_agent;
    if (userAgent) {
      if (userAgent.includes('Android')) return 'android';
      if (userAgent.includes('iOS')) return 'ios';
      if (userAgent.includes('Windows')) return 'windows';
      if (userAgent.includes('Mac')) return 'macos';
      if (userAgent.includes('Linux')) return 'linux';
    }
    return 'web';
  }
}

// 4. Complex Event Processing
// event-processor.js
class ComplexEventProcessor {
  constructor() {
    this.eventPatterns = new Map();
    this.eventRules = new Map();
    this.eventHandlers = new Map();
  }

  // Register event pattern
  registerPattern(patternName, pattern) {
    this.eventPatterns.set(patternName, pattern);
  }

  // Register event rule
  registerRule(ruleName, rule) {
    this.eventRules.set(ruleName, rule);
  }

  // Register event handler
  registerHandler(handlerName, handler) {
    this.eventHandlers.set(handlerName, handler);
  }

  // Process complex event
  async processEvent(event) {
    const results = [];
    const context = {
      event,
      timestamp: Date.now(),
      processedPatterns: [],
      matchedRules: [],
      triggeredHandlers: []
    };

    // Apply pattern matching
    for (const [patternName, pattern] of this.eventPatterns) {
      if (this.matchPattern(event, pattern)) {
        context.processedPatterns.push(patternName);

        // Apply rules for this pattern
        for (const [ruleName, rule] of this.eventRules) {
          if (rule.patterns.includes(patternName)) {
            const ruleResult = await this.applyRule(event, rule, context);
            if (ruleResult.matched) {
              context.matchedRules.push(ruleName);

              // Trigger handlers
              for (const handlerName of rule.handlers || []) {
                const handler = this.eventHandlers.get(handlerName);
                if (handler) {
                  const handlerResult = await handler(event, context);
                  results.push(handlerResult);
                  context.triggeredHandlers.push(handlerName);
                }
              }
            }
          }
        }
      }
    }

    return {
      results,
      context
    };
  }

  matchPattern(event, pattern) {
    // Simple pattern matching - in real implementation, use more sophisticated pattern matching
    if (pattern.type && event.type !== pattern.type) return false;
    if (pattern.source && event.source !== pattern.source) return false;

    if (pattern.conditions) {
      return pattern.conditions.every(condition => {
        const value = this.getNestedValue(event, condition.field);
        return this.compareValues(value, condition.operator, condition.value);
      });
    }

    return true;
  }

  applyRule(event, rule, context) {
    // Apply rule conditions
    if (rule.conditions) {
      const matches = rule.conditions.every(condition => {
        const value = this.getNestedValue(event, condition.field);
        return this.compareValues(value, condition.operator, condition.value);
      });

      return { matched: matches, rule };
    }

    return { matched: true, rule };
  }

  getNestedValue(obj, path) {
    return path.split('.').reduce((current, key) => {
      return current && current[key] !== undefined ? current[key] : undefined;
    }, obj);
  }

  compareValues(value1, operator, value2) {
    switch (operator) {
      case 'equals': return value1 === value2;
      case 'not_equals': return value1 !== value2;
      case 'greater_than': return Number(value1) > Number(value2);
      case 'less_than': return Number(value1) < Number(value2);
      case 'contains': return String(value1).includes(String(value2));
      case 'in': return Array.isArray(value2) ? value2.includes(value1) : false;
      case 'exists': return value1 !== undefined && value1 !== null;
      default: return true;
    }
  }
}

// 5. Usage Examples
// main.js
const streamProcessor = new StreamProcessor({
  clientId: 'stream-processor-app',
  brokers: ['localhost:9092']
});

// Register processors
streamProcessor.registerProcessor('user-events', new AnalyticsProcessor());

// Create transformation pipeline
const pipeline = new DataTransformationPipeline();
pipeline.addTransformation('normalize', DataTransformationPipeline.transformations.normalize);
pipeline.addTransformation('enrich', DataTransformationPipeline.transformations.enrich);
pipeline.addTransformation('filter', DataTransformationPipeline.transformations.filterSensitive);
pipeline.addTransformation('validate', DataTransformationPipeline.transformations.validate);
pipeline.addTransformation('aggregate', DataTransformationPipeline.transformations.aggregate);

// Register with stream processor
streamProcessor.registerProcessor('raw-events', async (input) => {
  const result = await pipeline.transform(input);

  return {
    topic: 'processed-events',
    data: result.data
  };
});

// Start processing
await streamProcessor.start();

💻 Gestion et Opérations de Cluster Kafka yaml

🔴 complex ⭐⭐⭐⭐⭐

Configuration de cluster Kafka de production, moniteur, sécurité, et meilleures pratiques opérationnelles

⏱️ 60 min 🏷️ kafka, cluster, operations, production
Prerequisites: Docker, Kafka basics, System administration
# Kafka Cluster Management and Operations

# 1. Production Kafka Cluster Docker Compose
# kafka-cluster.yml
version: '3.8'

services:
  # Zookeeper Ensemble
  zookeeper-1:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper-1
    container_name: zookeeper-1
    ports:
      - "2181:2181"
      - "2888:2888"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_TIME: 2
      ZOOKEEPER_MAX_CLIENT_CNXNS: 60
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.kv.path=/var/lib/zookeeper/data"
    volumes:
      - zookeeper-1-data:/var/lib/zookeeper/data
      - zookeeper-1-logs:/var/lib/zookeeper/log

  zookeeper-2:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper-2
    container_name: zookeeper-2
    ports:
      - "2182:2181"
      - "2889:2888"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_TIME: 2
      ZOOKEEPER_MAX_CLIENT_CNXNS: 60
      ZOOKEEPER_SERVER_QUORUM_VOTES: 2
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost
      KOOKEEPER_SERVERS: zookeeper-1:2888,zookeeper-2:2888
    volumes:
      - zookeeper-2-data:/var/lib/zookeeper/data
      - zookeeper-2-logs:/var/lib/zookeeper/log

  zookeeper-3:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper-3
    container_name: zookeeper-3
    ports:
      - "2183:2181"
      - "2890:2888"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_TIME: 2
      ZOOKEEPER_MAX_CLIENT_CNXNS: 60
      ZOOKEEPER_SERVER_QUORUM_VOTES: 2
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost
      KOOKEEPER_SERVERS: zookeeper-1:2888,zookeeper-2:2888
    volumes:
      - zookeeper-3-data:/var/lib/zookeeper/data
      - zookeeper-3-logs:/var/lib/zookeeper/log

  # Kafka Brokers
  kafka-1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-1
    container_name: kafka-1
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://host.docker.internal:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_MESSAGE_MAX_BYTES: 1000000
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_CLEANUP_POLICY: 'delete'
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
      KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
      KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-1'
      KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
      KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
      KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
    volumes:
      - kafka-1-data:/var/lib/kafka/data

  kafka-2:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-2
    container_name: kafka-2
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "9093:9092"
      - "29093:29092"
      - "9102:9101"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://host.docker.internal:29093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_MESSAGE_MAX_BYTES: 1000000
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_CLEANUP_POLICY: 'delete'
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
      KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
      KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-2'
      KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
      KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
      KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
    volumes:
      - kafka-2-data:/var/lib/kafka/data

  kafka-3:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-3
    container_name: kafka-3
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "9094:9092"
      - "29094:29092"
      - "9103:9101"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092,PLAINTEXT_HOST://host.docker.internal:29094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_MESSAGE_MAX_BYTES: 1000000
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_CLEANUP_POLICY: 'delete'
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
      KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
      KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-3'
      KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
      KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
      KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
    volumes:
      - kafka-3-data:/var/lib/kafka/data

  # Load Balancer
  kafka-lb:
    image: nginx:alpine
    container_name: kafka-lb
    ports:
      - "8091:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3

  # Schema Registry
  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
      SCHEMA_REGISTRY_DEBUG: 'true'

  # Kafka UI
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: production
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_CLUSTERS_0_SCHEMA_REGISTRY: http://schema-registry:8081
      KAFKA_CLUSTERS_0_KAFKACONNECT_0: http://kafka-connect:8083

  # Kafka Connect
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.4.0
    hostname: kafka-connect
    container_name: kafka-connect
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-statuses
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_REST_ADVERTISED_LISTENER: 'PLAINTEXT://kafka-connect:8083'
      CONNECT_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      CONNECT_SECURITY_PROTOCOL: PLAINTEXT
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper,org.apache.kafka.connect.rest,org.apache.kafka.connect
      CONNECT_AUTO_CREATE_TOPICS_ENABLE: 'true'

  # Monitoring
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    volumes:
      - grafana-data:/var/lib/grafana
    depends_on:
      - prometheus

  # Elasticsearch
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
      - "xpack.security.enabled=false"
    ports:
      - "9200:9200"
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data

  # Kibana
  kibana:
    image: docker.elastic.co/kibana/kibana:8.8.0
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    depends_on:
      - elasticsearch

volumes:
  zookeeper-1-data:
  zookeeper-1-logs:
  zookeeper-2-data:
  zookeeper-2-logs:
  zookeeper-3-data:
  zookeeper-3-logs:
  kafka-1-data:
  kafka-2-data:
  kafka-3-data:
  prometheus-data:
  grafana-data:
  elasticsearch-data:

networks:
  default:
    driver: bridge

# 2. Nginx Configuration for Load Balancer
# nginx.conf
events {}

http {
    upstream kafka_backend {
        least_conn;
        server kafka-1:8091 max_fails=3 fail_timeout=30s;
        server kafka-2:8091 max_fails=3 fail_timeout=30s;
        server kafka-3:8091 max_fails=3 fail_timeout=30s;
    }

    server {
        listen 80;
        server_name localhost;

        location / {
            proxy_pass http://kafka_backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }

        location /kafka-1 {
            proxy_pass http://kafka-1:8091;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }

        location /kafka-2 {
            proxy_pass http://kafka-2:8091;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }

        location /kafka-3 {
            proxy_pass http://kafka-3:8091;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }

        # Health check endpoints
        location /health {
            access_log off;
            return 200 "healthy\n";
            add_header Content-Type text/plain;
        }

        # Kafka Admin API proxy (if needed)
        location /admin/ {
            proxy_pass http://kafka-1:8090;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }
    }
}

# 3. Prometheus Configuration
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "kafka_rules.yml"

scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets:
        - 'kafka-1:9101'
        - 'kafka-2:9102'
        - 'kafka-3:9103'
    metrics_path: /metrics
    scrape_interval: 10s

  - job_name: 'schema-registry'
    static_configs:
      - targets: ['schema-registry:8081']
    metrics_path: /metrics

  - job_name: 'kafka-connect'
    static_configs:
      - targets: ['kafka-connect:8083']
    metrics_path: /metrics

# 4. Kafka Alerting Rules
# kafka_rules.yml
groups:
  - name: kafka
    rules:
      - alert: UnderReplicatedPartitions
        expr: kafka_server_replicamanager_underreplicatedpartitions > 0
        for: 0m
        labels:
          severity: critical
        annotations:
          summary: "Under-replicated partitions detected"
          description: "Kafka cluster has under-replicated partitions which may lead to data loss"

      - alert: OfflinePartitionsCount
        expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
        for: 0m
        labels:
          severity: critical
        annotations:
          summary: "Offline partitions detected"
          description: "Controller reports offline partitions"

      - alert: ActiveControllerCount
        expr: kafka_controller_kafkacontroller_activecontrollercount != 1
        for: 0m
        labels:
          severity: critical
        annotations:
          summary: "Controller count is not 1"
          description: "Active controller count should be exactly 1"

      - alert: BrokerDown
        expr: up == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka broker is down"
          description: "One or more Kafka brokers are not responding"

# 5. Kafka Cluster Management Scripts
# cluster-manager.sh
#!/bin/bash

# Kafka Cluster Management Script
# Usage: ./cluster-manager.sh [start|stop|restart|status|scale|backup|monitor]

KAFKA_HOME="/opt/kafka"
ZOOKEEPER_HOME="/opt/zookeeper"
KAFKA_BROKERS="kafka1:9092,kafka2:9092,kafka3:9092"
TOPICS=("user-events" "order-events" "product-updates" "error-logs")

start_cluster() {
    echo "Starting Kafka cluster..."

    # Start Zookeeper
    for i in {1..3}; do
        echo "Starting Zookeeper $i..."
        $ZOOKEEPER_HOME/bin/zookeeper-server-start.sh -daemon $ZOOKEEPER_HOME/config/zookeeper$i.properties
        sleep 5
    done

    # Start Kafka brokers
    for i in {1..3}; do
        echo "Starting Kafka broker $i..."
        $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties
        sleep 10
    done

    echo "Kafka cluster started successfully!"
}

stop_cluster() {
    echo "Stopping Kafka cluster..."

    # Stop Kafka brokers
    for i in {3..1}; do
        echo "Stopping Kafka broker $i..."
        $KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server$i.properties
    done

    # Stop Zookeeper
    for i in {3..1}; do
        echo "Stopping Zookeeper $i..."
        $ZOOKEEPER_HOME/bin/zookeeper-server-stop.sh
        sleep 2
    done

    echo "Kafka cluster stopped!"
}

check_status() {
    echo "Checking Kafka cluster status..."

    # Check Zookeeper
    echo "Checking Zookeeper status..."
    for i in {1..3}; do
        if pgrep -f "zookeeper.*server$i.properties" > /dev/null; then
            echo "✓ Zookeeper $i is running"
        else
            echo "✗ Zookeeper $i is not running"
        fi
    done

    # Check Kafka brokers
    echo "Checking Kafka broker status..."
    for i in {1..3}; do
        if pgrep -f "kafka.*server$i.properties" > /dev/null; then
            echo "✓ Kafka broker $i is running"
        else
            echo "✗ Kafka broker $i is not running"
        fi
    done

    # Check topic status
    echo "Checking topic status..."
    for topic in "${TOPICS[@]}"; do
        if $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --describe --topic $topic >/dev/null 2>&1; then
            echo "✓ Topic '$topic' exists"
        else
            echo "✗ Topic '$topic' does not exist"
        fi
    done
}

create_topics() {
    echo "Creating Kafka topics..."

    for topic in "${TOPICS[@]}"; do
        echo "Creating topic: $topic"
        $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS             --create --topic $topic             --partitions 3             --replication-factor 2

        echo "✓ Topic '$topic' created with 3 partitions and replication factor 2"
    done
}

backup_cluster() {
    echo "Starting Kafka cluster backup..."

    BACKUP_DIR="/backup/kafka/$(date +%Y%m%d_%H%M%S)"
    mkdir -p "$BACKUP_DIR"

    # Backup configuration files
    echo "Backing up configuration files..."
    cp -r $KAFKA_HOME/config/ "$BACKUP_DIR/kafka-config/"
    cp -r $ZOOKEEPER_HOME/config/ "$BACKUP_DIR/zookeeper-config/"

    # Backup topics data (metadata)
    echo "Backing up topic metadata..."
    for topic in "${TOPICS[@]}"; do
        echo "Backing up topic metadata for: $topic"
        $KAFKA_HOME/bin/kafka-configs.sh --bootstrap-server $KAFKA_BROKERS             --entity-type topics --entity-name $topic --describe             > "$BACKUP_DIR/topic-${topic}.json"
    done

    # Backup consumer group offsets
    echo "Backing up consumer group offsets..."
    $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA_BROKERS         --describe --all-groups         > "$BACKUP_DIR/consumer-groups.json"

    echo "Backup completed: $BACKUP_DIR"
}

monitor_cluster() {
    echo "Starting cluster monitoring..."

    # JMX monitoring setup would go here
    echo "Enabling JMX monitoring on all brokers..."

    # Check if Kafka Exporter is running
    if pgrep -f "kafka-exporter" > /dev/null; then
        echo "✓ Kafka Exporter is running"
    else
        echo "Starting Kafka Exporter..."
        # Start Kafka Exporter for Prometheus
        $KAFKA_HOME/bin/kafka-exporter.sh             --bootstrap-server $KAFKA_BROKERS             --producer.config $KAFKA_HOME/config/producer.properties             --kafka-prometheus-sink
    fi

    echo "Cluster monitoring enabled"
    echo "Access Grafana at: http://localhost:3000"
    echo "Access Prometheus at: http://localhost:9090"
}

scale_brokers() {
    local new_brokers=$1

    echo "Scaling Kafka cluster to $new_brokers brokers..."

    # Add new broker configuration
    for ((i=4; i<=new_brokers; i++)); do
        echo "Configuring broker $i..."
        cp $KAFKA_HOME/config/server.properties            $KAFKA_HOME/config/server$i.properties

        # Update broker ID
        sed -i "s/broker.id=1/broker.id=$i/" $KAFKA_HOME/config/server$i.properties

        # Start new broker
        echo "Starting Kafka broker $i..."
        $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties
        sleep 10
    done

    # Rebalance topics for new brokers
    echo "Rebalancing topics for new brokers..."
    for topic in "${TOPICS[@]}"; do
        echo "Increasing partition count for topic: $topic"
        $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS             --alter --topic $topic             --partitions 6
    done

    echo "Cluster scaled to $new_broker_count brokers successfully!"
}

# Main execution
case "${1:-}" in
    "start")
        start_cluster
        ;;
    "stop")
        stop_cluster
        ;;
    "restart")
        stop_cluster
        sleep 5
        start_cluster
        ;;
    "status")
        check_status
        ;;
    "create-topics")
        create_topics
        ;;
    "backup")
        backup_cluster
        ;;
    "monitor")
        monitor_cluster
        ;;
    "scale")
        if [ -z "${2:-}" ]; then
            echo "Usage: $0 scale <number_of_brokers>"
            exit 1
        fi
        scale_brokers "$2"
        ;;
    *)
        echo "Usage: $0 {start|stop|restart|status|create-topics|backup|monitor|scale}"
        echo "  start    - Start the Kafka cluster"
        echo "  stop     - Stop the Kafka cluster"
        echo "  restart  - Restart the Kafka cluster"
        echo "  status   - Check cluster status"
        echo "  create-topics - Create predefined topics"
        echo "  backup   - Backup cluster configuration and data"
        echo "  monitor  - Enable monitoring"
        echo "  scale    <number> - Scale cluster to specified number of brokers"
        exit 1
        ;;
esac

# 6. Topic Configuration Templates
# topic-configs.json
{
  "topic_templates": {
    "high-throughput": {
      "partitions": 12,
      "replication_factor": 3,
      "config": {
        "retention.ms": "604800000",
        "cleanup.policy": "delete",
        "segment.ms": "86400000",
        "max.message.bytes": "10485760",
        "compression.type": "lz4",
        "unclean.leader.election.enable": "true",
        "min.insync.replicas": "2"
      }
    },
    "low-latency": {
      "partitions": 6,
      "replication_factor": 3,
      "config": {
        "retention.ms": "86400000",
        "cleanup.policy": "delete",
        "segment.ms": "3600000",
        "max.message.bytes": "1000000",
        "compression.type": "none",
        "unclean.leader.election.enable": "true",
        "min.insync.replicas": "2",
        "linger.ms": "0"
      }
    },
    "event-sourcing": {
      "partitions": 3,
      "replication_factor": 3,
      "config": {
        "retention.ms": "-1",
        "cleanup.policy": "compact",
        "segment.ms": "86400000",
        "max.message.bytes": "10485760",
        "compression.type": "gzip",
        "unclean.leader.election.enable": "true",
        "min.insync.replicas": "2",
        "min.cleanable.dirty.ratio": "0.01"
      }
    },
    "logs": {
      "partitions": 3,
      "replication_factor": 2,
      "config": {
        "retention.ms": "604800000",
        "cleanup.policy": "delete",
        "segment.ms": "86400000",
        "max.message.bytes": "10485760",
        "compression.type": "gzip",
        "unclean.leader.election.enable": "true",
        "min.insync.replicas": "1"
      }
    }
  }
}