🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples d'Apache Kafka
Exemples de plateforme de streaming distribué Apache Kafka incluant producteurs, consommateurs, streams, et configuration de cluster
💻 Concepts de Base et Configuration Kafka javascript
🟢 simple
⭐⭐
Terminologie essentielle de Kafka, architecture, et exemples de base de producteur/consommateur
⏱️ 25 min
🏷️ kafka, producer, consumer, basics
Prerequisites:
Node.js, Basic messaging concepts, Docker
// Apache Kafka Basic Concepts and Examples
// 1. Installation
// Download Apache Kafka: https://kafka.apache.org/downloads
// or use Docker: docker compose -f kafka-docker-compose.yml up
// 2. Kafka Architecture Overview
/*
Topic: Category or feed name to which records are published
Producer: Publishes messages to one or more Kafka topics
Consumer: Subscribes to topics and processes the stream of records
Broker: Kafka server in a cluster
Cluster: Collection of brokers working together
Partition: Fundamental unit of parallelism in Kafka
Offset: Unique identifier for each record within a partition
Consumer Group: Group of consumers that share a topic for load balancing
*/
// 3. Basic Producer Example (Node.js)
// producer.js
const { Kafka } = require('kafkajs');
class BasicProducer {
constructor() {
this.kafka = new Kafka({
clientId: 'basic-producer',
brokers: ['localhost:9092']
});
this.producer = this.kafka.producer();
}
async sendMessage(topic, message) {
try {
await this.producer.connect();
const result = await this.producer.send({
topic,
messages: [
{
key: message.key,
value: JSON.stringify(message.value),
headers: message.headers || {},
timestamp: Date.now()
}
]
});
console.log('Message sent successfully:', result);
return result;
} catch (error) {
console.error('Error sending message:', error);
throw error;
} finally {
await this.producer.disconnect();
}
}
async sendBatchMessages(topic, messages) {
try {
await this.producer.connect();
const kafkaMessages = messages.map(msg => ({
key: msg.key,
value: JSON.stringify(msg.value),
headers: msg.headers || {},
timestamp: Date.now()
}));
const result = await this.producer.send({
topic,
messages: kafkaMessages
});
console.log(`Batch of ${messages.length} messages sent successfully`);
return result;
} catch (error) {
console.error('Error sending batch messages:', error);
throw error;
} finally {
await this.producer.disconnect();
}
}
}
// Usage example
const producer = new BasicProducer();
// Send single message
await producer.sendMessage('user-events', {
key: 'user-123',
value: {
userId: '123',
action: 'login',
timestamp: new Date().toISOString(),
metadata: { ip: '192.168.1.1' }
}
});
// Send batch messages
const batchMessages = [
{
key: 'order-1',
value: { orderId: '1', status: 'created', amount: 99.99 }
},
{
key: 'order-2',
value: { orderId: '2', status: 'shipped', amount: 149.99 }
}
];
await producer.sendBatchMessages('order-events', batchMessages);
// 4. Basic Consumer Example (Node.js)
// consumer.js
const { Kafka } = require('kafkajs');
class BasicConsumer {
constructor(groupId) {
this.kafka = new Kafka({
clientId: 'basic-consumer',
brokers: ['localhost:9092']
});
this.consumer = this.kafka.consumer({ groupId });
this.isRunning = false;
}
async consume(topic, messageHandler) {
try {
await this.consumer.connect();
await this.consumer.subscribe({ topic, fromBeginning: false });
this.isRunning = true;
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!this.isRunning) return;
try {
const parsedMessage = JSON.parse(message.value.toString());
await messageHandler({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
value: parsedMessage,
timestamp: message.timestamp,
headers: message.headers
});
} catch (parseError) {
console.error('Error parsing message:', parseError);
// Move to next message even if parsing fails
}
},
eachBatch: async ({ batch }) => {
console.log(`Processing batch of ${batch.messages.length} messages from partition ${batch.partition}`);
}
});
} catch (error) {
console.error('Consumer error:', error);
throw error;
}
}
async stop() {
console.log('Stopping consumer...');
this.isRunning = false;
await this.consumer.disconnect();
}
}
// Usage example
const consumer = new BasicConsumer('user-processor-group');
await consumer.consume('user-events', async (message) => {
console.log(`Received message from ${message.topic}:`);
console.log(`User ID: ${message.value.userId}`);
console.log(`Action: ${message.value.action}`);
// Process the message (e.g., save to database, trigger analytics, etc.)
if (message.value.action === 'login') {
console.log('Processing login event...');
// Add your business logic here
}
});
// 5. Docker Compose for Kafka Development
// kafka-docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://host.docker.internal:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
# Optional: Schema Registry for Avro/Protobuf schemas
schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registry
depends_on:
- kafka
- zookeeper
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
volumes:
zookeeper-data:
kafka-data:
// 6. Topic Management Scripts
// topics.js
const { Kafka } = require('kafkajs');
class TopicManager {
constructor() {
this.kafka = new Kafka({
clientId: 'topic-manager',
brokers: ['localhost:9092']
});
this.admin = this.kafka.admin();
}
async createTopic(topicName, partitions = 1, replicationFactor = 1) {
try {
await this.admin.connect();
const topicConfig = {
topic: topicName,
partitions: partitions,
replicationFactor: replicationFactor,
configEntries: [
{ name: 'retention.ms', value: '604800000' }, // 7 days
{ name: 'cleanup.policy', value: 'delete' },
{ name: 'segment.ms', value: '86400000' } // 1 day
]
};
await this.admin.createTopics({
topics: [topicConfig]
});
console.log(`Topic '${topicName}' created successfully with ${partitions} partitions`);
} catch (error) {
if (error.type === 'TOPIC_ALREADY_EXISTS') {
console.log(`Topic '${topicName}' already exists`);
} else {
console.error('Error creating topic:', error);
throw error;
}
} finally {
await this.admin.disconnect();
}
}
async listTopics() {
try {
await this.admin.connect();
const topics = await this.admin.listTopics();
console.log('Available topics:');
topics.topics.forEach(topic => {
console.log(`- ${topic.name} (partitions: ${topic.partitions})`);
});
return topics;
} catch (error) {
console.error('Error listing topics:', error);
throw error;
} finally {
await this.admin.disconnect();
}
}
async deleteTopic(topicName) {
try {
await this.admin.connect();
await this.admin.deleteTopics({
topics: [{ topic: topicName }]
});
console.log(`Topic '${topicName}' deleted successfully`);
} catch (error) {
console.error('Error deleting topic:', error);
throw error;
} finally {
await this.admin.disconnect();
}
}
async getTopicMetadata(topicName) {
try {
await this.admin.connect();
const metadata = await this.admin.fetchTopicMetadata({ topics: [topicName] });
const topicMetadata = metadata.topics[0];
console.log(`Topic metadata for '${topicName}':`);
console.log(`Partitions: ${topicMetadata.partitions.length}`);
topicMetadata.partitions.forEach((partition, index) => {
console.log(` Partition ${index}: Leader: ${partition.leader}, Replicas: ${partition.replicas.length}`);
});
return topicMetadata;
} catch (error) {
console.error('Error fetching topic metadata:', error);
throw error;
} finally {
await this.admin.disconnect();
}
}
}
// Usage example
const topicManager = new TopicManager();
// Create topics
await topicManager.createTopic('user-events', 3, 1);
await topicManager.createTopic('order-events', 5, 1);
await topicManager.createTopic('product-updates', 2, 1);
// List all topics
await topicManager.listTopics();
// Get topic metadata
await topicManager.getTopicMetadata('user-events');
// 7. Configuration Examples
// producer.config.js
const producerConfig = {
clientId: 'my-producer-app',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
// Connection configuration
connectionTimeout: 10000,
requestTimeout: 30000,
// Retry configuration
retry: {
initialRetryTime: 100,
factor: 2,
maxRetryTime: 60000,
multiplier: 1.5,
retries: 5
},
// Authentication (if using SASL/SSL)
ssl: true,
sasl: {
mechanism: 'plain',
username: 'kafka-user',
password: 'kafka-password'
},
// Producer-specific settings
allowAutoTopicCreation: false,
maxInFlightRequests: 5,
idempotent: true,
transactionTimeout: 60000,
// Compression
compression: 'gzip',
// Batch settings
batchSize: 16384,
linger: 5,
// ACK settings
acks: 'all',
ackTimeoutMs: 5000,
maxMessagesPerRequest: 100
};
// consumer.config.js
const consumerConfig = {
clientId: 'my-consumer-app',
groupId: 'my-consumer-group',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
// Connection configuration
connectionTimeout: 10000,
requestTimeout: 30000,
// Session timeout (ms)
sessionTimeout: 30000,
// Heartbeat interval (ms)
heartbeatInterval: 3000,
// Consumer offset management
autoCommit: true,
autoCommitInterval: 5000,
autoCommitThreshold: 100,
// Offset reset policy
autoOffsetReset: 'latest', // 'earliest' or 'latest'
// Assignment strategy
partitionAssigners: [],
// Range assignment strategy settings
rangeAssignorInstance: 'my-instance',
// Round-robin assignment strategy settings
roundRobinAssignor: true,
// Metadata refresh settings
metadataMaxAge: 300000,
metadataRequestTimeoutMs: 5000,
// Additional consumer settings
maxWaitTimeInMs: 5000,
minBytes: 1,
maxBytes: 1048576, // 1MB
// Rate limiting
maxMessagesPerPoll: 100,
// SSL/TLS configuration (if required)
ssl: {
rejectUnauthorized: true,
ca: [fs.readFileSync('/path/to/ca-cert.pem')],
key: fs.readFileSync('/path/to/client-key.pem'),
cert: fs.readFileSync('/path/to/client-cert.pem'),
passphrase: 'your-passphrase'
}
};
💻 Kafka Streams et Traitement Complexe javascript
🟡 intermediate
⭐⭐⭐⭐
Exemples avancés d'API Kafka Streams, motifs de traitement de streams, et analytique en temps réel
⏱️ 45 min
🏷️ kafka, streams, processing, advanced
Prerequisites:
Kafka basics, Stream processing concepts, Node.js streams
// Kafka Streams Processing Examples
// 1. Stream Processing Architecture
// stream-processor.js
const { Kafka } = require('kafkajs');
const { Transform } = require('stream');
class StreamProcessor {
constructor(config) {
this.kafka = new Kafka(config);
this.consumer = this.kafka.consumer({
groupId: 'stream-processor',
sessionTimeout: 30000,
heartbeatInterval: 3000
});
this.producer = this.kafka.producer();
this.processors = new Map();
}
// Register custom stream processors
registerProcessor(topic, processor) {
this.processors.set(topic, processor);
}
async start() {
try {
await this.consumer.connect();
await this.producer.connect();
// Subscribe to all topics with registered processors
const topics = Array.from(this.processors.keys());
await this.consumer.subscribe({ topics });
console.log(`Started stream processor for topics: ${topics.join(', ')}`);
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const processor = this.processors.get(topic);
if (processor) {
await this.processMessage(topic, message, processor);
}
}
});
} catch (error) {
console.error('Stream processor error:', error);
throw error;
}
}
async processMessage(topic, message, processor) {
try {
const startTime = Date.now();
// Parse incoming message
const input = JSON.parse(message.value.toString());
// Process the message
const output = await processor(input);
// Send processed message to output topic
if (output && output.topic) {
await this.producer.send({
topic: output.topic,
messages: [{
key: message.key?.toString(),
value: JSON.stringify(output.data),
headers: {
...message.headers,
processedAt: Date.now().toString(),
processingTime: Date.now() - startTime
}
}]
});
console.log(`Processed message from ${topic} to ${output.topic} in ${Date.now() - startTime}ms`);
}
} catch (error) {
console.error(`Error processing message from ${topic}:`, error);
// Could send to dead letter queue here
}
}
async stop() {
console.log('Stopping stream processor...');
await this.consumer.disconnect();
await this.producer.disconnect();
}
}
// 2. Real-time Analytics Processor
// analytics-processor.js
class AnalyticsProcessor {
constructor() {
this.counters = new Map();
this.aggregates = new Map();
this.sessionWindow = 5 * 60 * 1000; // 5 minutes
}
// Process user activity events
async processUserActivity(input) {
const { userId, action, timestamp } = input;
const now = Date.now();
// Update counters
const actionCount = this.counters.get(action) || 0;
this.counters.set(action, actionCount + 1);
// Update session-based aggregates
if (!this.aggregates.has(userId)) {
this.aggregates.set(userId, {
sessionId: this.generateSessionId(),
startTime: now,
actions: [],
lastActivity: now
});
}
const userAggregate = this.aggregates.get(userId);
userAggregate.actions.push({
action,
timestamp: now,
metadata: input.metadata || {}
});
userAggregate.lastActivity = now;
// Remove inactive sessions
this.cleanupInactiveSessions(now);
// Generate analytics events
return this.generateAnalyticsEvents(userAggregate, action);
}
generateSessionId() {
return `session_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
cleanupInactiveSessions(now) {
for (const [userId, aggregate] of this.aggregates.entries()) {
if (now - aggregate.lastActivity > this.sessionWindow) {
this.aggregates.delete(userId);
// Emit session completed event
this.emitSessionCompleted(userId, aggregate);
}
}
}
emitSessionCompleted(userId, aggregate) {
const sessionDuration = aggregate.lastActivity - aggregate.startTime;
const actionCount = aggregate.actions.length;
return {
topic: 'session-completed',
data: {
userId,
sessionId: aggregate.sessionId,
duration: sessionDuration,
actionCount,
actions: aggregate.actions,
completedAt: Date.now()
}
};
}
generateAnalyticsEvents(aggregate, currentAction) {
const events = [];
const actionCount = aggregate.actions.length;
// Milestone events
if (actionCount === 1) {
events.push({
topic: 'user-first-action',
data: {
userId: aggregate.userId,
sessionId: aggregate.sessionId,
firstAction: currentAction
}
});
}
if (actionCount === 10) {
events.push({
topic: 'user-milestone',
data: {
userId: aggregate.userId,
sessionId: aggregate.sessionId,
milestone: '10-actions'
}
});
}
// Real-time aggregation events
if (actionCount % 5 === 0) {
const actionCounts = this.countActionCounts(aggregate.actions);
events.push({
topic: 'user-activity-summary',
data: {
userId: aggregate.userId,
sessionId: aggregate.sessionId,
totalActions: actionCount,
actionBreakdown: actionCounts,
lastActivity: aggregate.lastActivity
}
});
}
return events;
}
countActionCounts(actions) {
const counts = {};
actions.forEach(action => {
counts[action.action] = (counts[action.action] || 0) + 1;
});
return counts;
}
getOverallStats() {
return {
totalActions: Array.from(this.counters.values()).reduce((sum, count) => sum + count, 0),
activeSessions: this.aggregates.size,
actionBreakdown: Object.fromEntries(this.counters)
};
}
}
// 3. Data Transformation Pipeline
// transformation-pipeline.js
class DataTransformationPipeline {
constructor() {
this.transformations = [];
}
// Add transformation step
addTransformation(name, transformFn) {
this.transformations.push({ name, transformFn });
}
// Process data through the transformation pipeline
async transform(input) {
let currentData = input;
const transformationLog = [];
for (const { name, transformFn } of this.transformations) {
try {
const startTime = Date.now();
currentData = await transformFn(currentData);
const duration = Date.now() - startTime;
transformationLog.push({
step: name,
duration,
success: true
});
} catch (error) {
transformationLog.push({
step: name,
error: error.message,
success: false
});
// Stop pipeline on error
break;
}
}
return {
data: currentData,
transformations: transformationLog
};
}
// Common transformation functions
static transformations = {
// Normalize data structure
normalize: (data) => {
const normalized = {
id: data.id || data.user_id || data.orderId,
timestamp: data.timestamp || data.created_at || Date.now(),
type: data.type || data.event || 'unknown',
source: data.source || 'unknown',
payload: { ...data }
};
// Remove top-level fields that are now in the main structure
delete normalized.payload.id;
delete normalized.payload.timestamp;
delete normalized.payload.type;
delete normalized.payload.source;
return normalized;
},
// Enrich data with additional information
enrich: (data) => {
return {
...data,
enrichedAt: Date.now(),
enrichedBy: 'transformation-pipeline',
version: '1.0',
environment: process.env.NODE_ENV || 'development',
metadata: {
...data.metadata,
country: this.extractCountry(data),
device: this.extractDevice(data),
platform: this.extractPlatform(data)
}
};
},
// Filter sensitive data
filterSensitive: (data) => {
const sensitiveFields = ['password', 'ssn', 'creditCard', 'token'];
const filtered = { ...data };
const filterObject = (obj) => {
if (typeof obj !== 'object' || obj === null) return obj;
if (Array.isArray(obj)) {
return obj.map(filterObject);
}
const result = {};
for (const [key, value] of Object.entries(obj)) {
if (sensitiveFields.some(field => key.toLowerCase().includes(field.toLowerCase()))) {
result[key] = '[FILTERED]';
} else {
result[key] = filterObject(value);
}
}
return result;
};
return filterObject(filtered);
},
// Validate data
validate: (data) => {
const requiredFields = ['id', 'timestamp', 'type'];
const missing = requiredFields.filter(field => !data[field]);
if (missing.length > 0) {
throw new Error(`Missing required fields: ${missing.join(', ')}`);
}
return {
...data,
validated: true,
validationTimestamp: Date.now()
};
},
// Aggregate data
aggregate: (data) => {
const now = Date.now();
const hourBucket = Math.floor(now / (60 * 60 * 1000));
const dayBucket = Math.floor(now / (24 * 60 * 60 * 1000));
return {
...data,
aggregations: {
hourly: {
bucket: hourBucket,
timestamp: hourBucket * (60 * 60 * 1000),
count: 1
},
daily: {
bucket: dayBucket,
timestamp: dayBucket * (24 * 60 * 60 * 1000),
count: 1
}
}
};
},
// Convert to Avro schema
toAvro: (data) => {
// This would convert the data to Avro format
// For demonstration, we'll just add Avro metadata
return {
schema: 'user_events_v1',
avroData: data,
convertedAt: Date.now()
};
}
}
extractCountry(data) {
// Extract country from IP address, metadata, or other fields
return data.country || data.ip_country || data.location?.country || 'unknown';
}
extractDevice(data) {
// Extract device information from user agent or other fields
const userAgent = data.userAgent || data.user_agent;
if (userAgent) {
if (userAgent.includes('Mobile')) return 'mobile';
if (userAgent.includes('Tablet')) return 'tablet';
if (userAgent.includes('Desktop')) return 'desktop';
}
return 'unknown';
}
extractPlatform(data) {
// Extract platform from various fields
const userAgent = data.userAgent || data.user_agent;
if (userAgent) {
if (userAgent.includes('Android')) return 'android';
if (userAgent.includes('iOS')) return 'ios';
if (userAgent.includes('Windows')) return 'windows';
if (userAgent.includes('Mac')) return 'macos';
if (userAgent.includes('Linux')) return 'linux';
}
return 'web';
}
}
// 4. Complex Event Processing
// event-processor.js
class ComplexEventProcessor {
constructor() {
this.eventPatterns = new Map();
this.eventRules = new Map();
this.eventHandlers = new Map();
}
// Register event pattern
registerPattern(patternName, pattern) {
this.eventPatterns.set(patternName, pattern);
}
// Register event rule
registerRule(ruleName, rule) {
this.eventRules.set(ruleName, rule);
}
// Register event handler
registerHandler(handlerName, handler) {
this.eventHandlers.set(handlerName, handler);
}
// Process complex event
async processEvent(event) {
const results = [];
const context = {
event,
timestamp: Date.now(),
processedPatterns: [],
matchedRules: [],
triggeredHandlers: []
};
// Apply pattern matching
for (const [patternName, pattern] of this.eventPatterns) {
if (this.matchPattern(event, pattern)) {
context.processedPatterns.push(patternName);
// Apply rules for this pattern
for (const [ruleName, rule] of this.eventRules) {
if (rule.patterns.includes(patternName)) {
const ruleResult = await this.applyRule(event, rule, context);
if (ruleResult.matched) {
context.matchedRules.push(ruleName);
// Trigger handlers
for (const handlerName of rule.handlers || []) {
const handler = this.eventHandlers.get(handlerName);
if (handler) {
const handlerResult = await handler(event, context);
results.push(handlerResult);
context.triggeredHandlers.push(handlerName);
}
}
}
}
}
}
}
return {
results,
context
};
}
matchPattern(event, pattern) {
// Simple pattern matching - in real implementation, use more sophisticated pattern matching
if (pattern.type && event.type !== pattern.type) return false;
if (pattern.source && event.source !== pattern.source) return false;
if (pattern.conditions) {
return pattern.conditions.every(condition => {
const value = this.getNestedValue(event, condition.field);
return this.compareValues(value, condition.operator, condition.value);
});
}
return true;
}
applyRule(event, rule, context) {
// Apply rule conditions
if (rule.conditions) {
const matches = rule.conditions.every(condition => {
const value = this.getNestedValue(event, condition.field);
return this.compareValues(value, condition.operator, condition.value);
});
return { matched: matches, rule };
}
return { matched: true, rule };
}
getNestedValue(obj, path) {
return path.split('.').reduce((current, key) => {
return current && current[key] !== undefined ? current[key] : undefined;
}, obj);
}
compareValues(value1, operator, value2) {
switch (operator) {
case 'equals': return value1 === value2;
case 'not_equals': return value1 !== value2;
case 'greater_than': return Number(value1) > Number(value2);
case 'less_than': return Number(value1) < Number(value2);
case 'contains': return String(value1).includes(String(value2));
case 'in': return Array.isArray(value2) ? value2.includes(value1) : false;
case 'exists': return value1 !== undefined && value1 !== null;
default: return true;
}
}
}
// 5. Usage Examples
// main.js
const streamProcessor = new StreamProcessor({
clientId: 'stream-processor-app',
brokers: ['localhost:9092']
});
// Register processors
streamProcessor.registerProcessor('user-events', new AnalyticsProcessor());
// Create transformation pipeline
const pipeline = new DataTransformationPipeline();
pipeline.addTransformation('normalize', DataTransformationPipeline.transformations.normalize);
pipeline.addTransformation('enrich', DataTransformationPipeline.transformations.enrich);
pipeline.addTransformation('filter', DataTransformationPipeline.transformations.filterSensitive);
pipeline.addTransformation('validate', DataTransformationPipeline.transformations.validate);
pipeline.addTransformation('aggregate', DataTransformationPipeline.transformations.aggregate);
// Register with stream processor
streamProcessor.registerProcessor('raw-events', async (input) => {
const result = await pipeline.transform(input);
return {
topic: 'processed-events',
data: result.data
};
});
// Start processing
await streamProcessor.start();
💻 Gestion et Opérations de Cluster Kafka yaml
🔴 complex
⭐⭐⭐⭐⭐
Configuration de cluster Kafka de production, moniteur, sécurité, et meilleures pratiques opérationnelles
⏱️ 60 min
🏷️ kafka, cluster, operations, production
Prerequisites:
Docker, Kafka basics, System administration
# Kafka Cluster Management and Operations
# 1. Production Kafka Cluster Docker Compose
# kafka-cluster.yml
version: '3.8'
services:
# Zookeeper Ensemble
zookeeper-1:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper-1
container_name: zookeeper-1
ports:
- "2181:2181"
- "2888:2888"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_TIME: 2
ZOOKEEPER_MAX_CLIENT_CNXNS: 60
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KAFKA_OPTS: "-Dzookeeper.4lw.commands.kv.path=/var/lib/zookeeper/data"
volumes:
- zookeeper-1-data:/var/lib/zookeeper/data
- zookeeper-1-logs:/var/lib/zookeeper/log
zookeeper-2:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper-2
container_name: zookeeper-2
ports:
- "2182:2181"
- "2889:2888"
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_TIME: 2
ZOOKEEPER_MAX_CLIENT_CNXNS: 60
ZOOKEEPER_SERVER_QUORUM_VOTES: 2
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KOOKEEPER_SERVERS: zookeeper-1:2888,zookeeper-2:2888
volumes:
- zookeeper-2-data:/var/lib/zookeeper/data
- zookeeper-2-logs:/var/lib/zookeeper/log
zookeeper-3:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper-3
container_name: zookeeper-3
ports:
- "2183:2181"
- "2890:2888"
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_TIME: 2
ZOOKEEPER_MAX_CLIENT_CNXNS: 60
ZOOKEEPER_SERVER_QUORUM_VOTES: 2
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KOOKEEPER_SERVERS: zookeeper-1:2888,zookeeper-2:2888
volumes:
- zookeeper-3-data:/var/lib/zookeeper/data
- zookeeper-3-logs:/var/lib/zookeeper/log
# Kafka Brokers
kafka-1:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-1
container_name: kafka-1
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- "9092:9092"
- "29092:29092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://host.docker.internal:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_MESSAGE_MAX_BYTES: 1000000
KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
KAFKA_LOG_CLEANUP_POLICY: 'delete'
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-1'
KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
volumes:
- kafka-1-data:/var/lib/kafka/data
kafka-2:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-2
container_name: kafka-2
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- "9093:9092"
- "29093:29092"
- "9102:9101"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://host.docker.internal:29093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_MESSAGE_MAX_BYTES: 1000000
KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
KAFKA_LOG_CLEANUP_POLICY: 'delete'
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-2'
KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
volumes:
- kafka-2-data:/var/lib/kafka/data
kafka-3:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-3
container_name: kafka-3
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- "9094:9092"
- "29094:29092"
- "9103:9101"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092,PLAINTEXT_HOST://host.docker.internal:29094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_MESSAGE_MAX_BYTES: 1000000
KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
KAFKA_LOG_CLEANUP_POLICY: 'delete'
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-3'
KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
volumes:
- kafka-3-data:/var/lib/kafka/data
# Load Balancer
kafka-lb:
image: nginx:alpine
container_name: kafka-lb
ports:
- "8091:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- kafka-1
- kafka-2
- kafka-3
# Schema Registry
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
SCHEMA_REGISTRY_DEBUG: 'true'
# Kafka UI
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: production
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_CLUSTERS_0_SCHEMA_REGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0: http://kafka-connect:8083
# Kafka Connect
kafka-connect:
image: confluentinc/cp-kafka-connect:7.4.0
hostname: kafka-connect
container_name: kafka-connect
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-statuses
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_REST_ADVERTISED_LISTENER: 'PLAINTEXT://kafka-connect:8083'
CONNECT_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
CONNECT_SECURITY_PROTOCOL: PLAINTEXT
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper,org.apache.kafka.connect.rest,org.apache.kafka.connect
CONNECT_AUTO_CREATE_TOPICS_ENABLE: 'true'
# Monitoring
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
depends_on:
- kafka-1
- kafka-2
- kafka-3
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana-data:/var/lib/grafana
depends_on:
- prometheus
# Elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
- "xpack.security.enabled=false"
ports:
- "9200:9200"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
# Kibana
kibana:
image: docker.elastic.co/kibana/kibana:8.8.0
container_name: kibana
ports:
- "5601:5601"
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
depends_on:
- elasticsearch
volumes:
zookeeper-1-data:
zookeeper-1-logs:
zookeeper-2-data:
zookeeper-2-logs:
zookeeper-3-data:
zookeeper-3-logs:
kafka-1-data:
kafka-2-data:
kafka-3-data:
prometheus-data:
grafana-data:
elasticsearch-data:
networks:
default:
driver: bridge
# 2. Nginx Configuration for Load Balancer
# nginx.conf
events {}
http {
upstream kafka_backend {
least_conn;
server kafka-1:8091 max_fails=3 fail_timeout=30s;
server kafka-2:8091 max_fails=3 fail_timeout=30s;
server kafka-3:8091 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://kafka_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /kafka-1 {
proxy_pass http://kafka-1:8091;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location /kafka-2 {
proxy_pass http://kafka-2:8091;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location /kafka-3 {
proxy_pass http://kafka-3:8091;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# Health check endpoints
location /health {
access_log off;
return 200 "healthy\n";
add_header Content-Type text/plain;
}
# Kafka Admin API proxy (if needed)
location /admin/ {
proxy_pass http://kafka-1:8090;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
}
# 3. Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "kafka_rules.yml"
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets:
- 'kafka-1:9101'
- 'kafka-2:9102'
- 'kafka-3:9103'
metrics_path: /metrics
scrape_interval: 10s
- job_name: 'schema-registry'
static_configs:
- targets: ['schema-registry:8081']
metrics_path: /metrics
- job_name: 'kafka-connect'
static_configs:
- targets: ['kafka-connect:8083']
metrics_path: /metrics
# 4. Kafka Alerting Rules
# kafka_rules.yml
groups:
- name: kafka
rules:
- alert: UnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 0m
labels:
severity: critical
annotations:
summary: "Under-replicated partitions detected"
description: "Kafka cluster has under-replicated partitions which may lead to data loss"
- alert: OfflinePartitionsCount
expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
for: 0m
labels:
severity: critical
annotations:
summary: "Offline partitions detected"
description: "Controller reports offline partitions"
- alert: ActiveControllerCount
expr: kafka_controller_kafkacontroller_activecontrollercount != 1
for: 0m
labels:
severity: critical
annotations:
summary: "Controller count is not 1"
description: "Active controller count should be exactly 1"
- alert: BrokerDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka broker is down"
description: "One or more Kafka brokers are not responding"
# 5. Kafka Cluster Management Scripts
# cluster-manager.sh
#!/bin/bash
# Kafka Cluster Management Script
# Usage: ./cluster-manager.sh [start|stop|restart|status|scale|backup|monitor]
KAFKA_HOME="/opt/kafka"
ZOOKEEPER_HOME="/opt/zookeeper"
KAFKA_BROKERS="kafka1:9092,kafka2:9092,kafka3:9092"
TOPICS=("user-events" "order-events" "product-updates" "error-logs")
start_cluster() {
echo "Starting Kafka cluster..."
# Start Zookeeper
for i in {1..3}; do
echo "Starting Zookeeper $i..."
$ZOOKEEPER_HOME/bin/zookeeper-server-start.sh -daemon $ZOOKEEPER_HOME/config/zookeeper$i.properties
sleep 5
done
# Start Kafka brokers
for i in {1..3}; do
echo "Starting Kafka broker $i..."
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties
sleep 10
done
echo "Kafka cluster started successfully!"
}
stop_cluster() {
echo "Stopping Kafka cluster..."
# Stop Kafka brokers
for i in {3..1}; do
echo "Stopping Kafka broker $i..."
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server$i.properties
done
# Stop Zookeeper
for i in {3..1}; do
echo "Stopping Zookeeper $i..."
$ZOOKEEPER_HOME/bin/zookeeper-server-stop.sh
sleep 2
done
echo "Kafka cluster stopped!"
}
check_status() {
echo "Checking Kafka cluster status..."
# Check Zookeeper
echo "Checking Zookeeper status..."
for i in {1..3}; do
if pgrep -f "zookeeper.*server$i.properties" > /dev/null; then
echo "✓ Zookeeper $i is running"
else
echo "✗ Zookeeper $i is not running"
fi
done
# Check Kafka brokers
echo "Checking Kafka broker status..."
for i in {1..3}; do
if pgrep -f "kafka.*server$i.properties" > /dev/null; then
echo "✓ Kafka broker $i is running"
else
echo "✗ Kafka broker $i is not running"
fi
done
# Check topic status
echo "Checking topic status..."
for topic in "${TOPICS[@]}"; do
if $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --describe --topic $topic >/dev/null 2>&1; then
echo "✓ Topic '$topic' exists"
else
echo "✗ Topic '$topic' does not exist"
fi
done
}
create_topics() {
echo "Creating Kafka topics..."
for topic in "${TOPICS[@]}"; do
echo "Creating topic: $topic"
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --create --topic $topic --partitions 3 --replication-factor 2
echo "✓ Topic '$topic' created with 3 partitions and replication factor 2"
done
}
backup_cluster() {
echo "Starting Kafka cluster backup..."
BACKUP_DIR="/backup/kafka/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$BACKUP_DIR"
# Backup configuration files
echo "Backing up configuration files..."
cp -r $KAFKA_HOME/config/ "$BACKUP_DIR/kafka-config/"
cp -r $ZOOKEEPER_HOME/config/ "$BACKUP_DIR/zookeeper-config/"
# Backup topics data (metadata)
echo "Backing up topic metadata..."
for topic in "${TOPICS[@]}"; do
echo "Backing up topic metadata for: $topic"
$KAFKA_HOME/bin/kafka-configs.sh --bootstrap-server $KAFKA_BROKERS --entity-type topics --entity-name $topic --describe > "$BACKUP_DIR/topic-${topic}.json"
done
# Backup consumer group offsets
echo "Backing up consumer group offsets..."
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA_BROKERS --describe --all-groups > "$BACKUP_DIR/consumer-groups.json"
echo "Backup completed: $BACKUP_DIR"
}
monitor_cluster() {
echo "Starting cluster monitoring..."
# JMX monitoring setup would go here
echo "Enabling JMX monitoring on all brokers..."
# Check if Kafka Exporter is running
if pgrep -f "kafka-exporter" > /dev/null; then
echo "✓ Kafka Exporter is running"
else
echo "Starting Kafka Exporter..."
# Start Kafka Exporter for Prometheus
$KAFKA_HOME/bin/kafka-exporter.sh --bootstrap-server $KAFKA_BROKERS --producer.config $KAFKA_HOME/config/producer.properties --kafka-prometheus-sink
fi
echo "Cluster monitoring enabled"
echo "Access Grafana at: http://localhost:3000"
echo "Access Prometheus at: http://localhost:9090"
}
scale_brokers() {
local new_brokers=$1
echo "Scaling Kafka cluster to $new_brokers brokers..."
# Add new broker configuration
for ((i=4; i<=new_brokers; i++)); do
echo "Configuring broker $i..."
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server$i.properties
# Update broker ID
sed -i "s/broker.id=1/broker.id=$i/" $KAFKA_HOME/config/server$i.properties
# Start new broker
echo "Starting Kafka broker $i..."
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties
sleep 10
done
# Rebalance topics for new brokers
echo "Rebalancing topics for new brokers..."
for topic in "${TOPICS[@]}"; do
echo "Increasing partition count for topic: $topic"
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --alter --topic $topic --partitions 6
done
echo "Cluster scaled to $new_broker_count brokers successfully!"
}
# Main execution
case "${1:-}" in
"start")
start_cluster
;;
"stop")
stop_cluster
;;
"restart")
stop_cluster
sleep 5
start_cluster
;;
"status")
check_status
;;
"create-topics")
create_topics
;;
"backup")
backup_cluster
;;
"monitor")
monitor_cluster
;;
"scale")
if [ -z "${2:-}" ]; then
echo "Usage: $0 scale <number_of_brokers>"
exit 1
fi
scale_brokers "$2"
;;
*)
echo "Usage: $0 {start|stop|restart|status|create-topics|backup|monitor|scale}"
echo " start - Start the Kafka cluster"
echo " stop - Stop the Kafka cluster"
echo " restart - Restart the Kafka cluster"
echo " status - Check cluster status"
echo " create-topics - Create predefined topics"
echo " backup - Backup cluster configuration and data"
echo " monitor - Enable monitoring"
echo " scale <number> - Scale cluster to specified number of brokers"
exit 1
;;
esac
# 6. Topic Configuration Templates
# topic-configs.json
{
"topic_templates": {
"high-throughput": {
"partitions": 12,
"replication_factor": 3,
"config": {
"retention.ms": "604800000",
"cleanup.policy": "delete",
"segment.ms": "86400000",
"max.message.bytes": "10485760",
"compression.type": "lz4",
"unclean.leader.election.enable": "true",
"min.insync.replicas": "2"
}
},
"low-latency": {
"partitions": 6,
"replication_factor": 3,
"config": {
"retention.ms": "86400000",
"cleanup.policy": "delete",
"segment.ms": "3600000",
"max.message.bytes": "1000000",
"compression.type": "none",
"unclean.leader.election.enable": "true",
"min.insync.replicas": "2",
"linger.ms": "0"
}
},
"event-sourcing": {
"partitions": 3,
"replication_factor": 3,
"config": {
"retention.ms": "-1",
"cleanup.policy": "compact",
"segment.ms": "86400000",
"max.message.bytes": "10485760",
"compression.type": "gzip",
"unclean.leader.election.enable": "true",
"min.insync.replicas": "2",
"min.cleanable.dirty.ratio": "0.01"
}
},
"logs": {
"partitions": 3,
"replication_factor": 2,
"config": {
"retention.ms": "604800000",
"cleanup.policy": "delete",
"segment.ms": "86400000",
"max.message.bytes": "10485760",
"compression.type": "gzip",
"unclean.leader.election.enable": "true",
"min.insync.replicas": "1"
}
}
}
}