Exemples d'Apache Kafka

Exemples de plateforme de streaming distribué Apache Kafka incluant producteurs, consommateurs, streams, et configuration de cluster

💻 Concepts de Base et Configuration Kafka javascript

🟢 simple ⭐⭐

Terminologie essentielle de Kafka, architecture, et exemples de base de producteur/consommateur

⏱️ 25 min 🏷️ kafka, producer, consumer, basics
Prerequisites: Node.js, Basic messaging concepts, Docker
// Apache Kafka Basic Concepts and Examples

// 1. Installation
// Download Apache Kafka: https://kafka.apache.org/downloads
// or use Docker: docker compose -f kafka-docker-compose.yml up

// 2. Kafka Architecture Overview
/*
Topic: Category or feed name to which records are published
Producer: Publishes messages to one or more Kafka topics
Consumer: Subscribes to topics and processes the stream of records
Broker: Kafka server in a cluster
Cluster: Collection of brokers working together
Partition: Fundamental unit of parallelism in Kafka
Offset: Unique identifier for each record within a partition
Consumer Group: Group of consumers that share a topic for load balancing
*/

// 3. Basic Producer Example (Node.js)
// producer.js
const { Kafka } = require('kafkajs');

class BasicProducer {
  constructor() {
    this.kafka = new Kafka({
      clientId: 'basic-producer',
      brokers: ['localhost:9092']
    });
    this.producer = this.kafka.producer();
  }

  async sendMessage(topic, message) {
    try {
      await this.producer.connect();

      const result = await this.producer.send({
        topic,
        messages: [
          {
            key: message.key,
            value: JSON.stringify(message.value),
            headers: message.headers || {},
            timestamp: Date.now()
          }
        ]
      });

      console.log('Message sent successfully:', result);
      return result;
    } catch (error) {
      console.error('Error sending message:', error);
      throw error;
    } finally {
      await this.producer.disconnect();
    }
  }

  async sendBatchMessages(topic, messages) {
    try {
      await this.producer.connect();

      const kafkaMessages = messages.map(msg => ({
        key: msg.key,
        value: JSON.stringify(msg.value),
        headers: msg.headers || {},
        timestamp: Date.now()
      }));

      const result = await this.producer.send({
        topic,
        messages: kafkaMessages
      });

      console.log(`Batch of ${messages.length} messages sent successfully`);
      return result;
    } catch (error) {
      console.error('Error sending batch messages:', error);
      throw error;
    } finally {
      await this.producer.disconnect();
    }
  }
}

// Usage example
const producer = new BasicProducer();

// Send single message
await producer.sendMessage('user-events', {
  key: 'user-123',
  value: {
    userId: '123',
    action: 'login',
    timestamp: new Date().toISOString(),
    metadata: { ip: '192.168.1.1' }
  }
});

// Send batch messages
const batchMessages = [
  {
    key: 'order-1',
    value: { orderId: '1', status: 'created', amount: 99.99 }
  },
  {
    key: 'order-2',
    value: { orderId: '2', status: 'shipped', amount: 149.99 }
  }
];

await producer.sendBatchMessages('order-events', batchMessages);

// 4. Basic Consumer Example (Node.js)
// consumer.js
const { Kafka } = require('kafkajs');

class BasicConsumer {
  constructor(groupId) {
    this.kafka = new Kafka({
      clientId: 'basic-consumer',
      brokers: ['localhost:9092']
    });
    this.consumer = this.kafka.consumer({ groupId });
    this.isRunning = false;
  }

  async consume(topic, messageHandler) {
    try {
      await this.consumer.connect();
      await this.consumer.subscribe({ topic, fromBeginning: false });

      this.isRunning = true;

      await this.consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          if (!this.isRunning) return;

          try {
            const parsedMessage = JSON.parse(message.value.toString());
            await messageHandler({
              topic,
              partition,
              offset: message.offset,
              key: message.key?.toString(),
              value: parsedMessage,
              timestamp: message.timestamp,
              headers: message.headers
            });
          } catch (parseError) {
            console.error('Error parsing message:', parseError);
            // Move to next message even if parsing fails
          }
        },
        eachBatch: async ({ batch }) => {
          console.log(`Processing batch of ${batch.messages.length} messages from partition ${batch.partition}`);
        }
      });
    } catch (error) {
      console.error('Consumer error:', error);
      throw error;
    }
  }

  async stop() {
    console.log('Stopping consumer...');
    this.isRunning = false;
    await this.consumer.disconnect();
  }
}

// Usage example
const consumer = new BasicConsumer('user-processor-group');

await consumer.consume('user-events', async (message) => {
  console.log(`Received message from ${message.topic}:`);
  console.log(`User ID: ${message.value.userId}`);
  console.log(`Action: ${message.value.action}`);

  // Process the message (e.g., save to database, trigger analytics, etc.)
  if (message.value.action === 'login') {
    console.log('Processing login event...');
    // Add your business logic here
  }
});

// 5. Docker Compose for Kafka Development
// kafka-docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://host.docker.internal:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

  # Optional: Schema Registry for Avro/Protobuf schemas
  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    container_name: schema-registry
    depends_on:
      - kafka
      - zookeeper
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

volumes:
  zookeeper-data:
  kafka-data:

// 6. Topic Management Scripts
// topics.js
const { Kafka } = require('kafkajs');

class TopicManager {
  constructor() {
    this.kafka = new Kafka({
      clientId: 'topic-manager',
      brokers: ['localhost:9092']
    });
    this.admin = this.kafka.admin();
  }

  async createTopic(topicName, partitions = 1, replicationFactor = 1) {
    try {
      await this.admin.connect();

      const topicConfig = {
        topic: topicName,
        partitions: partitions,
        replicationFactor: replicationFactor,
        configEntries: [
          { name: 'retention.ms', value: '604800000' }, // 7 days
          { name: 'cleanup.policy', value: 'delete' },
          { name: 'segment.ms', value: '86400000' } // 1 day
        ]
      };

      await this.admin.createTopics({
        topics: [topicConfig]
      });

      console.log(`Topic '${topicName}' created successfully with ${partitions} partitions`);
    } catch (error) {
      if (error.type === 'TOPIC_ALREADY_EXISTS') {
        console.log(`Topic '${topicName}' already exists`);
      } else {
        console.error('Error creating topic:', error);
        throw error;
      }
    } finally {
      await this.admin.disconnect();
    }
  }

  async listTopics() {
    try {
      await this.admin.connect();
      const topics = await this.admin.listTopics();

      console.log('Available topics:');
      topics.topics.forEach(topic => {
        console.log(`- ${topic.name} (partitions: ${topic.partitions})`);
      });

      return topics;
    } catch (error) {
      console.error('Error listing topics:', error);
      throw error;
    } finally {
      await this.admin.disconnect();
    }
  }

  async deleteTopic(topicName) {
    try {
      await this.admin.connect();
      await this.admin.deleteTopics({
        topics: [{ topic: topicName }]
      });
      console.log(`Topic '${topicName}' deleted successfully`);
    } catch (error) {
      console.error('Error deleting topic:', error);
      throw error;
    } finally {
      await this.admin.disconnect();
    }
  }

  async getTopicMetadata(topicName) {
    try {
      await this.admin.connect();
      const metadata = await this.admin.fetchTopicMetadata({ topics: [topicName] });

      const topicMetadata = metadata.topics[0];
      console.log(`Topic metadata for '${topicName}':`);
      console.log(`Partitions: ${topicMetadata.partitions.length}`);

      topicMetadata.partitions.forEach((partition, index) => {
        console.log(`  Partition ${index}: Leader: ${partition.leader}, Replicas: ${partition.replicas.length}`);
      });

      return topicMetadata;
    } catch (error) {
      console.error('Error fetching topic metadata:', error);
      throw error;
    } finally {
      await this.admin.disconnect();
    }
  }
}

// Usage example
const topicManager = new TopicManager();

// Create topics
await topicManager.createTopic('user-events', 3, 1);
await topicManager.createTopic('order-events', 5, 1);
await topicManager.createTopic('product-updates', 2, 1);

// List all topics
await topicManager.listTopics();

// Get topic metadata
await topicManager.getTopicMetadata('user-events');

// 7. Configuration Examples

// producer.config.js
const producerConfig = {
  clientId: 'my-producer-app',
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],

  // Connection configuration
  connectionTimeout: 10000,
  requestTimeout: 30000,

  // Retry configuration
  retry: {
    initialRetryTime: 100,
    factor: 2,
    maxRetryTime: 60000,
    multiplier: 1.5,
    retries: 5
  },

  // Authentication (if using SASL/SSL)
  ssl: true,
  sasl: {
    mechanism: 'plain',
    username: 'kafka-user',
    password: 'kafka-password'
  },

  // Producer-specific settings
  allowAutoTopicCreation: false,
  maxInFlightRequests: 5,
  idempotent: true,
  transactionTimeout: 60000,

  // Compression
  compression: 'gzip',

  // Batch settings
  batchSize: 16384,
  linger: 5,

  // ACK settings
  acks: 'all',
  ackTimeoutMs: 5000,
  maxMessagesPerRequest: 100
};

// consumer.config.js
const consumerConfig = {
  clientId: 'my-consumer-app',
  groupId: 'my-consumer-group',
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],

  // Connection configuration
  connectionTimeout: 10000,
  requestTimeout: 30000,

  // Session timeout (ms)
  sessionTimeout: 30000,

  // Heartbeat interval (ms)
  heartbeatInterval: 3000,

  // Consumer offset management
  autoCommit: true,
  autoCommitInterval: 5000,
  autoCommitThreshold: 100,

  // Offset reset policy
  autoOffsetReset: 'latest', // 'earliest' or 'latest'

  // Assignment strategy
  partitionAssigners: [],

  // Range assignment strategy settings
  rangeAssignorInstance: 'my-instance',

  // Round-robin assignment strategy settings
  roundRobinAssignor: true,

  // Metadata refresh settings
  metadataMaxAge: 300000,
  metadataRequestTimeoutMs: 5000,

  // Additional consumer settings
  maxWaitTimeInMs: 5000,
  minBytes: 1,
  maxBytes: 1048576, // 1MB

  // Rate limiting
  maxMessagesPerPoll: 100,

  // SSL/TLS configuration (if required)
  ssl: {
    rejectUnauthorized: true,
    ca: [fs.readFileSync('/path/to/ca-cert.pem')],
    key: fs.readFileSync('/path/to/client-key.pem'),
    cert: fs.readFileSync('/path/to/client-cert.pem'),
    passphrase: 'your-passphrase'
  }
};

💻 Kafka Streams et Traitement Complexe javascript

🟡 intermediate ⭐⭐⭐⭐

Exemples avancés d'API Kafka Streams, motifs de traitement de streams, et analytique en temps réel

⏱️ 45 min 🏷️ kafka, streams, processing, advanced
Prerequisites: Kafka basics, Stream processing concepts, Node.js streams
// Kafka Streams Processing Examples

// 1. Stream Processing Architecture
// stream-processor.js
const { Kafka } = require('kafkajs');
const { Transform } = require('stream');

class StreamProcessor {
  constructor(config) {
    this.kafka = new Kafka(config);
    this.consumer = this.kafka.consumer({
      groupId: 'stream-processor',
      sessionTimeout: 30000,
      heartbeatInterval: 3000
    });
    this.producer = this.kafka.producer();
    this.processors = new Map();
  }

  // Register custom stream processors
  registerProcessor(topic, processor) {
    this.processors.set(topic, processor);
  }

  async start() {
    try {
      await this.consumer.connect();
      await this.producer.connect();

      // Subscribe to all topics with registered processors
      const topics = Array.from(this.processors.keys());
      await this.consumer.subscribe({ topics });

      console.log(`Started stream processor for topics: ${topics.join(', ')}`);

      await this.consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          const processor = this.processors.get(topic);
          if (processor) {
            await this.processMessage(topic, message, processor);
          }
        }
      });
    } catch (error) {
      console.error('Stream processor error:', error);
      throw error;
    }
  }

  async processMessage(topic, message, processor) {
    try {
      const startTime = Date.now();

      // Parse incoming message
      const input = JSON.parse(message.value.toString());

      // Process the message
      const output = await processor(input);

      // Send processed message to output topic
      if (output && output.topic) {
        await this.producer.send({
          topic: output.topic,
          messages: [{
            key: message.key?.toString(),
            value: JSON.stringify(output.data),
            headers: {
              ...message.headers,
              processedAt: Date.now().toString(),
              processingTime: Date.now() - startTime
            }
          }]
        });

        console.log(`Processed message from ${topic} to ${output.topic} in ${Date.now() - startTime}ms`);
      }
    } catch (error) {
      console.error(`Error processing message from ${topic}:`, error);
      // Could send to dead letter queue here
    }
  }

  async stop() {
    console.log('Stopping stream processor...');
    await this.consumer.disconnect();
    await this.producer.disconnect();
  }
}

// 2. Real-time Analytics Processor
// analytics-processor.js
class AnalyticsProcessor {
  constructor() {
    this.counters = new Map();
    this.aggregates = new Map();
    this.sessionWindow = 5 * 60 * 1000; // 5 minutes
  }

  // Process user activity events
  async processUserActivity(input) {
    const { userId, action, timestamp } = input;
    const now = Date.now();

    // Update counters
    const actionCount = this.counters.get(action) || 0;
    this.counters.set(action, actionCount + 1);

    // Update session-based aggregates
    if (!this.aggregates.has(userId)) {
      this.aggregates.set(userId, {
        sessionId: this.generateSessionId(),
        startTime: now,
        actions: [],
        lastActivity: now
      });
    }

    const userAggregate = this.aggregates.get(userId);
    userAggregate.actions.push({
      action,
      timestamp: now,
      metadata: input.metadata || {}
    });
    userAggregate.lastActivity = now;

    // Remove inactive sessions
    this.cleanupInactiveSessions(now);

    // Generate analytics events
    return this.generateAnalyticsEvents(userAggregate, action);
  }

  generateSessionId() {
    return `session_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  cleanupInactiveSessions(now) {
    for (const [userId, aggregate] of this.aggregates.entries()) {
      if (now - aggregate.lastActivity > this.sessionWindow) {
        this.aggregates.delete(userId);

        // Emit session completed event
        this.emitSessionCompleted(userId, aggregate);
      }
    }
  }

  emitSessionCompleted(userId, aggregate) {
    const sessionDuration = aggregate.lastActivity - aggregate.startTime;
    const actionCount = aggregate.actions.length;

    return {
      topic: 'session-completed',
      data: {
        userId,
        sessionId: aggregate.sessionId,
        duration: sessionDuration,
        actionCount,
        actions: aggregate.actions,
        completedAt: Date.now()
      }
    };
  }

  generateAnalyticsEvents(aggregate, currentAction) {
    const events = [];
    const actionCount = aggregate.actions.length;

    // Milestone events
    if (actionCount === 1) {
      events.push({
        topic: 'user-first-action',
        data: {
          userId: aggregate.userId,
          sessionId: aggregate.sessionId,
          firstAction: currentAction
        }
      });
    }

    if (actionCount === 10) {
      events.push({
        topic: 'user-milestone',
        data: {
          userId: aggregate.userId,
          sessionId: aggregate.sessionId,
          milestone: '10-actions'
        }
      });
    }

    // Real-time aggregation events
    if (actionCount % 5 === 0) {
      const actionCounts = this.countActionCounts(aggregate.actions);
      events.push({
        topic: 'user-activity-summary',
        data: {
          userId: aggregate.userId,
          sessionId: aggregate.sessionId,
          totalActions: actionCount,
          actionBreakdown: actionCounts,
          lastActivity: aggregate.lastActivity
        }
      });
    }

    return events;
  }

  countActionCounts(actions) {
    const counts = {};
    actions.forEach(action => {
      counts[action.action] = (counts[action.action] || 0) + 1;
    });
    return counts;
  }

  getOverallStats() {
    return {
      totalActions: Array.from(this.counters.values()).reduce((sum, count) => sum + count, 0),
      activeSessions: this.aggregates.size,
      actionBreakdown: Object.fromEntries(this.counters)
    };
  }
}

// 3. Data Transformation Pipeline
// transformation-pipeline.js
class DataTransformationPipeline {
  constructor() {
    this.transformations = [];
  }

  // Add transformation step
  addTransformation(name, transformFn) {
    this.transformations.push({ name, transformFn });
  }

  // Process data through the transformation pipeline
  async transform(input) {
    let currentData = input;
    const transformationLog = [];

    for (const { name, transformFn } of this.transformations) {
      try {
        const startTime = Date.now();
        currentData = await transformFn(currentData);
        const duration = Date.now() - startTime;

        transformationLog.push({
          step: name,
          duration,
          success: true
        });
      } catch (error) {
        transformationLog.push({
          step: name,
          error: error.message,
          success: false
        });

        // Stop pipeline on error
        break;
      }
    }

    return {
      data: currentData,
      transformations: transformationLog
    };
  }

  // Common transformation functions
  static transformations = {
    // Normalize data structure
    normalize: (data) => {
      const normalized = {
        id: data.id || data.user_id || data.orderId,
        timestamp: data.timestamp || data.created_at || Date.now(),
        type: data.type || data.event || 'unknown',
        source: data.source || 'unknown',
        payload: { ...data }
      };

      // Remove top-level fields that are now in the main structure
      delete normalized.payload.id;
      delete normalized.payload.timestamp;
      delete normalized.payload.type;
      delete normalized.payload.source;

      return normalized;
    },

    // Enrich data with additional information
    enrich: (data) => {
      return {
        ...data,
        enrichedAt: Date.now(),
        enrichedBy: 'transformation-pipeline',
        version: '1.0',
        environment: process.env.NODE_ENV || 'development',
        metadata: {
          ...data.metadata,
          country: this.extractCountry(data),
          device: this.extractDevice(data),
          platform: this.extractPlatform(data)
        }
      };
    },

    // Filter sensitive data
    filterSensitive: (data) => {
      const sensitiveFields = ['password', 'ssn', 'creditCard', 'token'];
      const filtered = { ...data };

      const filterObject = (obj) => {
        if (typeof obj !== 'object' || obj === null) return obj;

        if (Array.isArray(obj)) {
          return obj.map(filterObject);
        }

        const result = {};
        for (const [key, value] of Object.entries(obj)) {
          if (sensitiveFields.some(field => key.toLowerCase().includes(field.toLowerCase()))) {
            result[key] = '[FILTERED]';
          } else {
            result[key] = filterObject(value);
          }
        }
        return result;
      };

      return filterObject(filtered);
    },

    // Validate data
    validate: (data) => {
      const requiredFields = ['id', 'timestamp', 'type'];
      const missing = requiredFields.filter(field => !data[field]);

      if (missing.length > 0) {
        throw new Error(`Missing required fields: ${missing.join(', ')}`);
      }

      return {
        ...data,
        validated: true,
        validationTimestamp: Date.now()
      };
    },

    // Aggregate data
    aggregate: (data) => {
      const now = Date.now();
      const hourBucket = Math.floor(now / (60 * 60 * 1000));
      const dayBucket = Math.floor(now / (24 * 60 * 60 * 1000));

      return {
        ...data,
        aggregations: {
          hourly: {
            bucket: hourBucket,
            timestamp: hourBucket * (60 * 60 * 1000),
            count: 1
          },
          daily: {
            bucket: dayBucket,
            timestamp: dayBucket * (24 * 60 * 60 * 1000),
            count: 1
          }
        }
      };
    },

    // Convert to Avro schema
    toAvro: (data) => {
      // This would convert the data to Avro format
      // For demonstration, we'll just add Avro metadata
      return {
        schema: 'user_events_v1',
        avroData: data,
        convertedAt: Date.now()
      };
    }
  }

  extractCountry(data) {
    // Extract country from IP address, metadata, or other fields
    return data.country || data.ip_country || data.location?.country || 'unknown';
  }

  extractDevice(data) {
    // Extract device information from user agent or other fields
    const userAgent = data.userAgent || data.user_agent;
    if (userAgent) {
      if (userAgent.includes('Mobile')) return 'mobile';
      if (userAgent.includes('Tablet')) return 'tablet';
      if (userAgent.includes('Desktop')) return 'desktop';
    }
    return 'unknown';
  }

  extractPlatform(data) {
    // Extract platform from various fields
    const userAgent = data.userAgent || data.user_agent;
    if (userAgent) {
      if (userAgent.includes('Android')) return 'android';
      if (userAgent.includes('iOS')) return 'ios';
      if (userAgent.includes('Windows')) return 'windows';
      if (userAgent.includes('Mac')) return 'macos';
      if (userAgent.includes('Linux')) return 'linux';
    }
    return 'web';
  }
}

// 4. Complex Event Processing
// event-processor.js
class ComplexEventProcessor {
  constructor() {
    this.eventPatterns = new Map();
    this.eventRules = new Map();
    this.eventHandlers = new Map();
  }

  // Register event pattern
  registerPattern(patternName, pattern) {
    this.eventPatterns.set(patternName, pattern);
  }

  // Register event rule
  registerRule(ruleName, rule) {
    this.eventRules.set(ruleName, rule);
  }

  // Register event handler
  registerHandler(handlerName, handler) {
    this.eventHandlers.set(handlerName, handler);
  }

  // Process complex event
  async processEvent(event) {
    const results = [];
    const context = {
      event,
      timestamp: Date.now(),
      processedPatterns: [],
      matchedRules: [],
      triggeredHandlers: []
    };

    // Apply pattern matching
    for (const [patternName, pattern] of this.eventPatterns) {
      if (this.matchPattern(event, pattern)) {
        context.processedPatterns.push(patternName);

        // Apply rules for this pattern
        for (const [ruleName, rule] of this.eventRules) {
          if (rule.patterns.includes(patternName)) {
            const ruleResult = await this.applyRule(event, rule, context);
            if (ruleResult.matched) {
              context.matchedRules.push(ruleName);

              // Trigger handlers
              for (const handlerName of rule.handlers || []) {
                const handler = this.eventHandlers.get(handlerName);
                if (handler) {
                  const handlerResult = await handler(event, context);
                  results.push(handlerResult);
                  context.triggeredHandlers.push(handlerName);
                }
              }
            }
          }
        }
      }
    }

    return {
      results,
      context
    };
  }

  matchPattern(event, pattern) {
    // Simple pattern matching - in real implementation, use more sophisticated pattern matching
    if (pattern.type && event.type !== pattern.type) return false;
    if (pattern.source && event.source !== pattern.source) return false;

    if (pattern.conditions) {
      return pattern.conditions.every(condition => {
        const value = this.getNestedValue(event, condition.field);
        return this.compareValues(value, condition.operator, condition.value);
      });
    }

    return true;
  }

  applyRule(event, rule, context) {
    // Apply rule conditions
    if (rule.conditions) {
      const matches = rule.conditions.every(condition => {
        const value = this.getNestedValue(event, condition.field);
        return this.compareValues(value, condition.operator, condition.value);
      });

      return { matched: matches, rule };
    }

    return { matched: true, rule };
  }

  getNestedValue(obj, path) {
    return path.split('.').reduce((current, key) => {
      return current && current[key] !== undefined ? current[key] : undefined;
    }, obj);
  }

  compareValues(value1, operator, value2) {
    switch (operator) {
      case 'equals': return value1 === value2;
      case 'not_equals': return value1 !== value2;
      case 'greater_than': return Number(value1) > Number(value2);
      case 'less_than': return Number(value1) < Number(value2);
      case 'contains': return String(value1).includes(String(value2));
      case 'in': return Array.isArray(value2) ? value2.includes(value1) : false;
      case 'exists': return value1 !== undefined && value1 !== null;
      default: return true;
    }
  }
}

// 5. Usage Examples
// main.js
const streamProcessor = new StreamProcessor({
  clientId: 'stream-processor-app',
  brokers: ['localhost:9092']
});

// Register processors
streamProcessor.registerProcessor('user-events', new AnalyticsProcessor());

// Create transformation pipeline
const pipeline = new DataTransformationPipeline();
pipeline.addTransformation('normalize', DataTransformationPipeline.transformations.normalize);
pipeline.addTransformation('enrich', DataTransformationPipeline.transformations.enrich);
pipeline.addTransformation('filter', DataTransformationPipeline.transformations.filterSensitive);
pipeline.addTransformation('validate', DataTransformationPipeline.transformations.validate);
pipeline.addTransformation('aggregate', DataTransformationPipeline.transformations.aggregate);

// Register with stream processor
streamProcessor.registerProcessor('raw-events', async (input) => {
  const result = await pipeline.transform(input);

  return {
    topic: 'processed-events',
    data: result.data
  };
});

// Start processing
await streamProcessor.start();

💻 Gestion et Opérations de Cluster Kafka yaml

🔴 complex ⭐⭐⭐⭐⭐

Configuration de cluster Kafka de production, moniteur, sécurité, et meilleures pratiques opérationnelles

⏱️ 60 min 🏷️ kafka, cluster, operations, production
Prerequisites: Docker, Kafka basics, System administration
# Kafka Cluster Management and Operations

# 1. Production Kafka Cluster Docker Compose
# kafka-cluster.yml
version: '3.8'

services:
  # Zookeeper Ensemble
  zookeeper-1:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper-1
    container_name: zookeeper-1
    ports:
      - "2181:2181"
      - "2888:2888"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_TIME: 2
      ZOOKEEPER_MAX_CLIENT_CNXNS: 60
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.kv.path=/var/lib/zookeeper/data"
    volumes:
      - zookeeper-1-data:/var/lib/zookeeper/data
      - zookeeper-1-logs:/var/lib/zookeeper/log

  zookeeper-2:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper-2
    container_name: zookeeper-2
    ports:
      - "2182:2181"
      - "2889:2888"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_TIME: 2
      ZOOKEEPER_MAX_CLIENT_CNXNS: 60
      ZOOKEEPER_SERVER_QUORUM_VOTES: 2
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost
      KOOKEEPER_SERVERS: zookeeper-1:2888,zookeeper-2:2888
    volumes:
      - zookeeper-2-data:/var/lib/zookeeper/data
      - zookeeper-2-logs:/var/lib/zookeeper/log

  zookeeper-3:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper-3
    container_name: zookeeper-3
    ports:
      - "2183:2181"
      - "2890:2888"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_TIME: 2
      ZOOKEEPER_MAX_CLIENT_CNXNS: 60
      ZOOKEEPER_SERVER_QUORUM_VOTES: 2
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost
      KOOKEEPER_SERVERS: zookeeper-1:2888,zookeeper-2:2888
    volumes:
      - zookeeper-3-data:/var/lib/zookeeper/data
      - zookeeper-3-logs:/var/lib/zookeeper/log

  # Kafka Brokers
  kafka-1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-1
    container_name: kafka-1
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://host.docker.internal:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_MESSAGE_MAX_BYTES: 1000000
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_CLEANUP_POLICY: 'delete'
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
      KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
      KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-1'
      KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
      KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
      KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
    volumes:
      - kafka-1-data:/var/lib/kafka/data

  kafka-2:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-2
    container_name: kafka-2
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "9093:9092"
      - "29093:29092"
      - "9102:9101"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://host.docker.internal:29093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_MESSAGE_MAX_BYTES: 1000000
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_CLEANUP_POLICY: 'delete'
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
      KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
      KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-2'
      KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
      KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
      KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
    volumes:
      - kafka-2-data:/var/lib/kafka/data

  kafka-3:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-3
    container_name: kafka-3
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "9094:9092"
      - "29094:29092"
      - "9103:9101"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092,PLAINTEXT_HOST://host.docker.internal:29094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_DELETE_TOPIC_ENABLE: 'true'
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_MESSAGE_MAX_BYTES: 1000000
      KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
      KAFKA_LOG_CLEANUP_POLICY: 'delete'
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
      KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
      KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-3'
      KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
      KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
      KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
    volumes:
      - kafka-3-data:/var/lib/kafka/data

  # Load Balancer
  kafka-lb:
    image: nginx:alpine
    container_name: kafka-lb
    ports:
      - "8091:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3

  # Schema Registry
  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
      SCHEMA_REGISTRY_DEBUG: 'true'

  # Kafka UI
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: production
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_CLUSTERS_0_SCHEMA_REGISTRY: http://schema-registry:8081
      KAFKA_CLUSTERS_0_KAFKACONNECT_0: http://kafka-connect:8083

  # Kafka Connect
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.4.0
    hostname: kafka-connect
    container_name: kafka-connect
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-statuses
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_REST_ADVERTISED_LISTENER: 'PLAINTEXT://kafka-connect:8083'
      CONNECT_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      CONNECT_SECURITY_PROTOCOL: PLAINTEXT
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper,org.apache.kafka.connect.rest,org.apache.kafka.connect
      CONNECT_AUTO_CREATE_TOPICS_ENABLE: 'true'

  # Monitoring
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    volumes:
      - grafana-data:/var/lib/grafana
    depends_on:
      - prometheus

  # Elasticsearch
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
      - "xpack.security.enabled=false"
    ports:
      - "9200:9200"
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data

  # Kibana
  kibana:
    image: docker.elastic.co/kibana/kibana:8.8.0
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    depends_on:
      - elasticsearch

volumes:
  zookeeper-1-data:
  zookeeper-1-logs:
  zookeeper-2-data:
  zookeeper-2-logs:
  zookeeper-3-data:
  zookeeper-3-logs:
  kafka-1-data:
  kafka-2-data:
  kafka-3-data:
  prometheus-data:
  grafana-data:
  elasticsearch-data:

networks:
  default:
    driver: bridge

# 2. Nginx Configuration for Load Balancer
# nginx.conf
events {}

http {
    upstream kafka_backend {
        least_conn;
        server kafka-1:8091 max_fails=3 fail_timeout=30s;
        server kafka-2:8091 max_fails=3 fail_timeout=30s;
        server kafka-3:8091 max_fails=3 fail_timeout=30s;
    }

    server {
        listen 80;
        server_name localhost;

        location / {
            proxy_pass http://kafka_backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }

        location /kafka-1 {
            proxy_pass http://kafka-1:8091;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }

        location /kafka-2 {
            proxy_pass http://kafka-2:8091;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }

        location /kafka-3 {
            proxy_pass http://kafka-3:8091;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }

        # Health check endpoints
        location /health {
            access_log off;
            return 200 "healthy\n";
            add_header Content-Type text/plain;
        }

        # Kafka Admin API proxy (if needed)
        location /admin/ {
            proxy_pass http://kafka-1:8090;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }
    }
}

# 3. Prometheus Configuration
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "kafka_rules.yml"

scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets:
        - 'kafka-1:9101'
        - 'kafka-2:9102'
        - 'kafka-3:9103'
    metrics_path: /metrics
    scrape_interval: 10s

  - job_name: 'schema-registry'
    static_configs:
      - targets: ['schema-registry:8081']
    metrics_path: /metrics

  - job_name: 'kafka-connect'
    static_configs:
      - targets: ['kafka-connect:8083']
    metrics_path: /metrics

# 4. Kafka Alerting Rules
# kafka_rules.yml
groups:
  - name: kafka
    rules:
      - alert: UnderReplicatedPartitions
        expr: kafka_server_replicamanager_underreplicatedpartitions > 0
        for: 0m
        labels:
          severity: critical
        annotations:
          summary: "Under-replicated partitions detected"
          description: "Kafka cluster has under-replicated partitions which may lead to data loss"

      - alert: OfflinePartitionsCount
        expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
        for: 0m
        labels:
          severity: critical
        annotations:
          summary: "Offline partitions detected"
          description: "Controller reports offline partitions"

      - alert: ActiveControllerCount
        expr: kafka_controller_kafkacontroller_activecontrollercount != 1
        for: 0m
        labels:
          severity: critical
        annotations:
          summary: "Controller count is not 1"
          description: "Active controller count should be exactly 1"

      - alert: BrokerDown
        expr: up == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka broker is down"
          description: "One or more Kafka brokers are not responding"

# 5. Kafka Cluster Management Scripts
# cluster-manager.sh
#!/bin/bash

# Kafka Cluster Management Script
# Usage: ./cluster-manager.sh [start|stop|restart|status|scale|backup|monitor]

KAFKA_HOME="/opt/kafka"
ZOOKEEPER_HOME="/opt/zookeeper"
KAFKA_BROKERS="kafka1:9092,kafka2:9092,kafka3:9092"
TOPICS=("user-events" "order-events" "product-updates" "error-logs")

start_cluster() {
    echo "Starting Kafka cluster..."

    # Start Zookeeper
    for i in {1..3}; do
        echo "Starting Zookeeper $i..."
        $ZOOKEEPER_HOME/bin/zookeeper-server-start.sh -daemon $ZOOKEEPER_HOME/config/zookeeper$i.properties
        sleep 5
    done

    # Start Kafka brokers
    for i in {1..3}; do
        echo "Starting Kafka broker $i..."
        $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties
        sleep 10
    done

    echo "Kafka cluster started successfully!"
}

stop_cluster() {
    echo "Stopping Kafka cluster..."

    # Stop Kafka brokers
    for i in {3..1}; do
        echo "Stopping Kafka broker $i..."
        $KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server$i.properties
    done

    # Stop Zookeeper
    for i in {3..1}; do
        echo "Stopping Zookeeper $i..."
        $ZOOKEEPER_HOME/bin/zookeeper-server-stop.sh
        sleep 2
    done

    echo "Kafka cluster stopped!"
}

check_status() {
    echo "Checking Kafka cluster status..."

    # Check Zookeeper
    echo "Checking Zookeeper status..."
    for i in {1..3}; do
        if pgrep -f "zookeeper.*server$i.properties" > /dev/null; then
            echo "✓ Zookeeper $i is running"
        else
            echo "✗ Zookeeper $i is not running"
        fi
    done

    # Check Kafka brokers
    echo "Checking Kafka broker status..."
    for i in {1..3}; do
        if pgrep -f "kafka.*server$i.properties" > /dev/null; then
            echo "✓ Kafka broker $i is running"
        else
            echo "✗ Kafka broker $i is not running"
        fi
    done

    # Check topic status
    echo "Checking topic status..."
    for topic in "${TOPICS[@]}"; do
        if $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --describe --topic $topic >/dev/null 2>&1; then
            echo "✓ Topic '$topic' exists"
        else
            echo "✗ Topic '$topic' does not exist"
        fi
    done
}

create_topics() {
    echo "Creating Kafka topics..."

    for topic in "${TOPICS[@]}"; do
        echo "Creating topic: $topic"
        $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS             --create --topic $topic             --partitions 3             --replication-factor 2

        echo "✓ Topic '$topic' created with 3 partitions and replication factor 2"
    done
}

backup_cluster() {
    echo "Starting Kafka cluster backup..."

    BACKUP_DIR="/backup/kafka/$(date +%Y%m%d_%H%M%S)"
    mkdir -p "$BACKUP_DIR"

    # Backup configuration files
    echo "Backing up configuration files..."
    cp -r $KAFKA_HOME/config/ "$BACKUP_DIR/kafka-config/"
    cp -r $ZOOKEEPER_HOME/config/ "$BACKUP_DIR/zookeeper-config/"

    # Backup topics data (metadata)
    echo "Backing up topic metadata..."
    for topic in "${TOPICS[@]}"; do
        echo "Backing up topic metadata for: $topic"
        $KAFKA_HOME/bin/kafka-configs.sh --bootstrap-server $KAFKA_BROKERS             --entity-type topics --entity-name $topic --describe             > "$BACKUP_DIR/topic-${topic}.json"
    done

    # Backup consumer group offsets
    echo "Backing up consumer group offsets..."
    $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA_BROKERS         --describe --all-groups         > "$BACKUP_DIR/consumer-groups.json"

    echo "Backup completed: $BACKUP_DIR"
}

monitor_cluster() {
    echo "Starting cluster monitoring..."

    # JMX monitoring setup would go here
    echo "Enabling JMX monitoring on all brokers..."

    # Check if Kafka Exporter is running
    if pgrep -f "kafka-exporter" > /dev/null; then
        echo "✓ Kafka Exporter is running"
    else
        echo "Starting Kafka Exporter..."
        # Start Kafka Exporter for Prometheus
        $KAFKA_HOME/bin/kafka-exporter.sh             --bootstrap-server $KAFKA_BROKERS             --producer.config $KAFKA_HOME/config/producer.properties             --kafka-prometheus-sink
    fi

    echo "Cluster monitoring enabled"
    echo "Access Grafana at: http://localhost:3000"
    echo "Access Prometheus at: http://localhost:9090"
}

scale_brokers() {
    local new_brokers=$1

    echo "Scaling Kafka cluster to $new_brokers brokers..."

    # Add new broker configuration
    for ((i=4; i<=new_brokers; i++)); do
        echo "Configuring broker $i..."
        cp $KAFKA_HOME/config/server.properties            $KAFKA_HOME/config/server$i.properties

        # Update broker ID
        sed -i "s/broker.id=1/broker.id=$i/" $KAFKA_HOME/config/server$i.properties

        # Start new broker
        echo "Starting Kafka broker $i..."
        $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties
        sleep 10
    done

    # Rebalance topics for new brokers
    echo "Rebalancing topics for new brokers..."
    for topic in "${TOPICS[@]}"; do
        echo "Increasing partition count for topic: $topic"
        $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS             --alter --topic $topic             --partitions 6
    done

    echo "Cluster scaled to $new_broker_count brokers successfully!"
}

# Main execution
case "${1:-}" in
    "start")
        start_cluster
        ;;
    "stop")
        stop_cluster
        ;;
    "restart")
        stop_cluster
        sleep 5
        start_cluster
        ;;
    "status")
        check_status
        ;;
    "create-topics")
        create_topics
        ;;
    "backup")
        backup_cluster
        ;;
    "monitor")
        monitor_cluster
        ;;
    "scale")
        if [ -z "${2:-}" ]; then
            echo "Usage: $0 scale <number_of_brokers>"
            exit 1
        fi
        scale_brokers "$2"
        ;;
    *)
        echo "Usage: $0 {start|stop|restart|status|create-topics|backup|monitor|scale}"
        echo "  start    - Start the Kafka cluster"
        echo "  stop     - Stop the Kafka cluster"
        echo "  restart  - Restart the Kafka cluster"
        echo "  status   - Check cluster status"
        echo "  create-topics - Create predefined topics"
        echo "  backup   - Backup cluster configuration and data"
        echo "  monitor  - Enable monitoring"
        echo "  scale    <number> - Scale cluster to specified number of brokers"
        exit 1
        ;;
esac

# 6. Topic Configuration Templates
# topic-configs.json
{
  "topic_templates": {
    "high-throughput": {
      "partitions": 12,
      "replication_factor": 3,
      "config": {
        "retention.ms": "604800000",
        "cleanup.policy": "delete",
        "segment.ms": "86400000",
        "max.message.bytes": "10485760",
        "compression.type": "lz4",
        "unclean.leader.election.enable": "true",
        "min.insync.replicas": "2"
      }
    },
    "low-latency": {
      "partitions": 6,
      "replication_factor": 3,
      "config": {
        "retention.ms": "86400000",
        "cleanup.policy": "delete",
        "segment.ms": "3600000",
        "max.message.bytes": "1000000",
        "compression.type": "none",
        "unclean.leader.election.enable": "true",
        "min.insync.replicas": "2",
        "linger.ms": "0"
      }
    },
    "event-sourcing": {
      "partitions": 3,
      "replication_factor": 3,
      "config": {
        "retention.ms": "-1",
        "cleanup.policy": "compact",
        "segment.ms": "86400000",
        "max.message.bytes": "10485760",
        "compression.type": "gzip",
        "unclean.leader.election.enable": "true",
        "min.insync.replicas": "2",
        "min.cleanable.dirty.ratio": "0.01"
      }
    },
    "logs": {
      "partitions": 3,
      "replication_factor": 2,
      "config": {
        "retention.ms": "604800000",
        "cleanup.policy": "delete",
        "segment.ms": "86400000",
        "max.message.bytes": "10485760",
        "compression.type": "gzip",
        "unclean.leader.election.enable": "true",
        "min.insync.replicas": "1"
      }
    }
  }
}