🎯 Рекомендуемые коллекции
Балансированные коллекции примеров кода из различных категорий, которые вы можете исследовать
Примеры Apache Kafka
Примеры распределенной платформы потоковой передачи данных Apache Kafka, включая производителей, потребителей, потоки и конфигурацию кластера
💻 Базовые концепции и настройка Kafka javascript
🟢 simple
⭐⭐
Основная терминология Kafka, архитектура и базовые примеры производителя/потребителя
⏱️ 25 min
🏷️ kafka, producer, consumer, basics
Prerequisites:
Node.js, Basic messaging concepts, Docker
// Apache Kafka Basic Concepts and Examples
// 1. Installation
// Download Apache Kafka: https://kafka.apache.org/downloads
// or use Docker: docker compose -f kafka-docker-compose.yml up
// 2. Kafka Architecture Overview
/*
Topic: Category or feed name to which records are published
Producer: Publishes messages to one or more Kafka topics
Consumer: Subscribes to topics and processes the stream of records
Broker: Kafka server in a cluster
Cluster: Collection of brokers working together
Partition: Fundamental unit of parallelism in Kafka
Offset: Unique identifier for each record within a partition
Consumer Group: Group of consumers that share a topic for load balancing
*/
// 3. Basic Producer Example (Node.js)
// producer.js
const { Kafka } = require('kafkajs');
class BasicProducer {
constructor() {
this.kafka = new Kafka({
clientId: 'basic-producer',
brokers: ['localhost:9092']
});
this.producer = this.kafka.producer();
}
async sendMessage(topic, message) {
try {
await this.producer.connect();
const result = await this.producer.send({
topic,
messages: [
{
key: message.key,
value: JSON.stringify(message.value),
headers: message.headers || {},
timestamp: Date.now()
}
]
});
console.log('Message sent successfully:', result);
return result;
} catch (error) {
console.error('Error sending message:', error);
throw error;
} finally {
await this.producer.disconnect();
}
}
async sendBatchMessages(topic, messages) {
try {
await this.producer.connect();
const kafkaMessages = messages.map(msg => ({
key: msg.key,
value: JSON.stringify(msg.value),
headers: msg.headers || {},
timestamp: Date.now()
}));
const result = await this.producer.send({
topic,
messages: kafkaMessages
});
console.log(`Batch of ${messages.length} messages sent successfully`);
return result;
} catch (error) {
console.error('Error sending batch messages:', error);
throw error;
} finally {
await this.producer.disconnect();
}
}
}
// Usage example
const producer = new BasicProducer();
// Send single message
await producer.sendMessage('user-events', {
key: 'user-123',
value: {
userId: '123',
action: 'login',
timestamp: new Date().toISOString(),
metadata: { ip: '192.168.1.1' }
}
});
// Send batch messages
const batchMessages = [
{
key: 'order-1',
value: { orderId: '1', status: 'created', amount: 99.99 }
},
{
key: 'order-2',
value: { orderId: '2', status: 'shipped', amount: 149.99 }
}
];
await producer.sendBatchMessages('order-events', batchMessages);
// 4. Basic Consumer Example (Node.js)
// consumer.js
const { Kafka } = require('kafkajs');
class BasicConsumer {
constructor(groupId) {
this.kafka = new Kafka({
clientId: 'basic-consumer',
brokers: ['localhost:9092']
});
this.consumer = this.kafka.consumer({ groupId });
this.isRunning = false;
}
async consume(topic, messageHandler) {
try {
await this.consumer.connect();
await this.consumer.subscribe({ topic, fromBeginning: false });
this.isRunning = true;
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!this.isRunning) return;
try {
const parsedMessage = JSON.parse(message.value.toString());
await messageHandler({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
value: parsedMessage,
timestamp: message.timestamp,
headers: message.headers
});
} catch (parseError) {
console.error('Error parsing message:', parseError);
// Move to next message even if parsing fails
}
},
eachBatch: async ({ batch }) => {
console.log(`Processing batch of ${batch.messages.length} messages from partition ${batch.partition}`);
}
});
} catch (error) {
console.error('Consumer error:', error);
throw error;
}
}
async stop() {
console.log('Stopping consumer...');
this.isRunning = false;
await this.consumer.disconnect();
}
}
// Usage example
const consumer = new BasicConsumer('user-processor-group');
await consumer.consume('user-events', async (message) => {
console.log(`Received message from ${message.topic}:`);
console.log(`User ID: ${message.value.userId}`);
console.log(`Action: ${message.value.action}`);
// Process the message (e.g., save to database, trigger analytics, etc.)
if (message.value.action === 'login') {
console.log('Processing login event...');
// Add your business logic here
}
});
// 5. Docker Compose for Kafka Development
// kafka-docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://host.docker.internal:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
# Optional: Schema Registry for Avro/Protobuf schemas
schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registry
depends_on:
- kafka
- zookeeper
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
volumes:
zookeeper-data:
kafka-data:
// 6. Topic Management Scripts
// topics.js
const { Kafka } = require('kafkajs');
class TopicManager {
constructor() {
this.kafka = new Kafka({
clientId: 'topic-manager',
brokers: ['localhost:9092']
});
this.admin = this.kafka.admin();
}
async createTopic(topicName, partitions = 1, replicationFactor = 1) {
try {
await this.admin.connect();
const topicConfig = {
topic: topicName,
partitions: partitions,
replicationFactor: replicationFactor,
configEntries: [
{ name: 'retention.ms', value: '604800000' }, // 7 days
{ name: 'cleanup.policy', value: 'delete' },
{ name: 'segment.ms', value: '86400000' } // 1 day
]
};
await this.admin.createTopics({
topics: [topicConfig]
});
console.log(`Topic '${topicName}' created successfully with ${partitions} partitions`);
} catch (error) {
if (error.type === 'TOPIC_ALREADY_EXISTS') {
console.log(`Topic '${topicName}' already exists`);
} else {
console.error('Error creating topic:', error);
throw error;
}
} finally {
await this.admin.disconnect();
}
}
async listTopics() {
try {
await this.admin.connect();
const topics = await this.admin.listTopics();
console.log('Available topics:');
topics.topics.forEach(topic => {
console.log(`- ${topic.name} (partitions: ${topic.partitions})`);
});
return topics;
} catch (error) {
console.error('Error listing topics:', error);
throw error;
} finally {
await this.admin.disconnect();
}
}
async deleteTopic(topicName) {
try {
await this.admin.connect();
await this.admin.deleteTopics({
topics: [{ topic: topicName }]
});
console.log(`Topic '${topicName}' deleted successfully`);
} catch (error) {
console.error('Error deleting topic:', error);
throw error;
} finally {
await this.admin.disconnect();
}
}
async getTopicMetadata(topicName) {
try {
await this.admin.connect();
const metadata = await this.admin.fetchTopicMetadata({ topics: [topicName] });
const topicMetadata = metadata.topics[0];
console.log(`Topic metadata for '${topicName}':`);
console.log(`Partitions: ${topicMetadata.partitions.length}`);
topicMetadata.partitions.forEach((partition, index) => {
console.log(` Partition ${index}: Leader: ${partition.leader}, Replicas: ${partition.replicas.length}`);
});
return topicMetadata;
} catch (error) {
console.error('Error fetching topic metadata:', error);
throw error;
} finally {
await this.admin.disconnect();
}
}
}
// Usage example
const topicManager = new TopicManager();
// Create topics
await topicManager.createTopic('user-events', 3, 1);
await topicManager.createTopic('order-events', 5, 1);
await topicManager.createTopic('product-updates', 2, 1);
// List all topics
await topicManager.listTopics();
// Get topic metadata
await topicManager.getTopicMetadata('user-events');
// 7. Configuration Examples
// producer.config.js
const producerConfig = {
clientId: 'my-producer-app',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
// Connection configuration
connectionTimeout: 10000,
requestTimeout: 30000,
// Retry configuration
retry: {
initialRetryTime: 100,
factor: 2,
maxRetryTime: 60000,
multiplier: 1.5,
retries: 5
},
// Authentication (if using SASL/SSL)
ssl: true,
sasl: {
mechanism: 'plain',
username: 'kafka-user',
password: 'kafka-password'
},
// Producer-specific settings
allowAutoTopicCreation: false,
maxInFlightRequests: 5,
idempotent: true,
transactionTimeout: 60000,
// Compression
compression: 'gzip',
// Batch settings
batchSize: 16384,
linger: 5,
// ACK settings
acks: 'all',
ackTimeoutMs: 5000,
maxMessagesPerRequest: 100
};
// consumer.config.js
const consumerConfig = {
clientId: 'my-consumer-app',
groupId: 'my-consumer-group',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
// Connection configuration
connectionTimeout: 10000,
requestTimeout: 30000,
// Session timeout (ms)
sessionTimeout: 30000,
// Heartbeat interval (ms)
heartbeatInterval: 3000,
// Consumer offset management
autoCommit: true,
autoCommitInterval: 5000,
autoCommitThreshold: 100,
// Offset reset policy
autoOffsetReset: 'latest', // 'earliest' or 'latest'
// Assignment strategy
partitionAssigners: [],
// Range assignment strategy settings
rangeAssignorInstance: 'my-instance',
// Round-robin assignment strategy settings
roundRobinAssignor: true,
// Metadata refresh settings
metadataMaxAge: 300000,
metadataRequestTimeoutMs: 5000,
// Additional consumer settings
maxWaitTimeInMs: 5000,
minBytes: 1,
maxBytes: 1048576, // 1MB
// Rate limiting
maxMessagesPerPoll: 100,
// SSL/TLS configuration (if required)
ssl: {
rejectUnauthorized: true,
ca: [fs.readFileSync('/path/to/ca-cert.pem')],
key: fs.readFileSync('/path/to/client-key.pem'),
cert: fs.readFileSync('/path/to/client-cert.pem'),
passphrase: 'your-passphrase'
}
};
💻 Kafka Streams и сложная обработка javascript
🟡 intermediate
⭐⭐⭐⭐
Расширенные примеры API Kafka Streams, шаблоны обработки потоков и анализ в реальном времени
⏱️ 45 min
🏷️ kafka, streams, processing, advanced
Prerequisites:
Kafka basics, Stream processing concepts, Node.js streams
// Kafka Streams Processing Examples
// 1. Stream Processing Architecture
// stream-processor.js
const { Kafka } = require('kafkajs');
const { Transform } = require('stream');
class StreamProcessor {
constructor(config) {
this.kafka = new Kafka(config);
this.consumer = this.kafka.consumer({
groupId: 'stream-processor',
sessionTimeout: 30000,
heartbeatInterval: 3000
});
this.producer = this.kafka.producer();
this.processors = new Map();
}
// Register custom stream processors
registerProcessor(topic, processor) {
this.processors.set(topic, processor);
}
async start() {
try {
await this.consumer.connect();
await this.producer.connect();
// Subscribe to all topics with registered processors
const topics = Array.from(this.processors.keys());
await this.consumer.subscribe({ topics });
console.log(`Started stream processor for topics: ${topics.join(', ')}`);
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const processor = this.processors.get(topic);
if (processor) {
await this.processMessage(topic, message, processor);
}
}
});
} catch (error) {
console.error('Stream processor error:', error);
throw error;
}
}
async processMessage(topic, message, processor) {
try {
const startTime = Date.now();
// Parse incoming message
const input = JSON.parse(message.value.toString());
// Process the message
const output = await processor(input);
// Send processed message to output topic
if (output && output.topic) {
await this.producer.send({
topic: output.topic,
messages: [{
key: message.key?.toString(),
value: JSON.stringify(output.data),
headers: {
...message.headers,
processedAt: Date.now().toString(),
processingTime: Date.now() - startTime
}
}]
});
console.log(`Processed message from ${topic} to ${output.topic} in ${Date.now() - startTime}ms`);
}
} catch (error) {
console.error(`Error processing message from ${topic}:`, error);
// Could send to dead letter queue here
}
}
async stop() {
console.log('Stopping stream processor...');
await this.consumer.disconnect();
await this.producer.disconnect();
}
}
// 2. Real-time Analytics Processor
// analytics-processor.js
class AnalyticsProcessor {
constructor() {
this.counters = new Map();
this.aggregates = new Map();
this.sessionWindow = 5 * 60 * 1000; // 5 minutes
}
// Process user activity events
async processUserActivity(input) {
const { userId, action, timestamp } = input;
const now = Date.now();
// Update counters
const actionCount = this.counters.get(action) || 0;
this.counters.set(action, actionCount + 1);
// Update session-based aggregates
if (!this.aggregates.has(userId)) {
this.aggregates.set(userId, {
sessionId: this.generateSessionId(),
startTime: now,
actions: [],
lastActivity: now
});
}
const userAggregate = this.aggregates.get(userId);
userAggregate.actions.push({
action,
timestamp: now,
metadata: input.metadata || {}
});
userAggregate.lastActivity = now;
// Remove inactive sessions
this.cleanupInactiveSessions(now);
// Generate analytics events
return this.generateAnalyticsEvents(userAggregate, action);
}
generateSessionId() {
return `session_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
cleanupInactiveSessions(now) {
for (const [userId, aggregate] of this.aggregates.entries()) {
if (now - aggregate.lastActivity > this.sessionWindow) {
this.aggregates.delete(userId);
// Emit session completed event
this.emitSessionCompleted(userId, aggregate);
}
}
}
emitSessionCompleted(userId, aggregate) {
const sessionDuration = aggregate.lastActivity - aggregate.startTime;
const actionCount = aggregate.actions.length;
return {
topic: 'session-completed',
data: {
userId,
sessionId: aggregate.sessionId,
duration: sessionDuration,
actionCount,
actions: aggregate.actions,
completedAt: Date.now()
}
};
}
generateAnalyticsEvents(aggregate, currentAction) {
const events = [];
const actionCount = aggregate.actions.length;
// Milestone events
if (actionCount === 1) {
events.push({
topic: 'user-first-action',
data: {
userId: aggregate.userId,
sessionId: aggregate.sessionId,
firstAction: currentAction
}
});
}
if (actionCount === 10) {
events.push({
topic: 'user-milestone',
data: {
userId: aggregate.userId,
sessionId: aggregate.sessionId,
milestone: '10-actions'
}
});
}
// Real-time aggregation events
if (actionCount % 5 === 0) {
const actionCounts = this.countActionCounts(aggregate.actions);
events.push({
topic: 'user-activity-summary',
data: {
userId: aggregate.userId,
sessionId: aggregate.sessionId,
totalActions: actionCount,
actionBreakdown: actionCounts,
lastActivity: aggregate.lastActivity
}
});
}
return events;
}
countActionCounts(actions) {
const counts = {};
actions.forEach(action => {
counts[action.action] = (counts[action.action] || 0) + 1;
});
return counts;
}
getOverallStats() {
return {
totalActions: Array.from(this.counters.values()).reduce((sum, count) => sum + count, 0),
activeSessions: this.aggregates.size,
actionBreakdown: Object.fromEntries(this.counters)
};
}
}
// 3. Data Transformation Pipeline
// transformation-pipeline.js
class DataTransformationPipeline {
constructor() {
this.transformations = [];
}
// Add transformation step
addTransformation(name, transformFn) {
this.transformations.push({ name, transformFn });
}
// Process data through the transformation pipeline
async transform(input) {
let currentData = input;
const transformationLog = [];
for (const { name, transformFn } of this.transformations) {
try {
const startTime = Date.now();
currentData = await transformFn(currentData);
const duration = Date.now() - startTime;
transformationLog.push({
step: name,
duration,
success: true
});
} catch (error) {
transformationLog.push({
step: name,
error: error.message,
success: false
});
// Stop pipeline on error
break;
}
}
return {
data: currentData,
transformations: transformationLog
};
}
// Common transformation functions
static transformations = {
// Normalize data structure
normalize: (data) => {
const normalized = {
id: data.id || data.user_id || data.orderId,
timestamp: data.timestamp || data.created_at || Date.now(),
type: data.type || data.event || 'unknown',
source: data.source || 'unknown',
payload: { ...data }
};
// Remove top-level fields that are now in the main structure
delete normalized.payload.id;
delete normalized.payload.timestamp;
delete normalized.payload.type;
delete normalized.payload.source;
return normalized;
},
// Enrich data with additional information
enrich: (data) => {
return {
...data,
enrichedAt: Date.now(),
enrichedBy: 'transformation-pipeline',
version: '1.0',
environment: process.env.NODE_ENV || 'development',
metadata: {
...data.metadata,
country: this.extractCountry(data),
device: this.extractDevice(data),
platform: this.extractPlatform(data)
}
};
},
// Filter sensitive data
filterSensitive: (data) => {
const sensitiveFields = ['password', 'ssn', 'creditCard', 'token'];
const filtered = { ...data };
const filterObject = (obj) => {
if (typeof obj !== 'object' || obj === null) return obj;
if (Array.isArray(obj)) {
return obj.map(filterObject);
}
const result = {};
for (const [key, value] of Object.entries(obj)) {
if (sensitiveFields.some(field => key.toLowerCase().includes(field.toLowerCase()))) {
result[key] = '[FILTERED]';
} else {
result[key] = filterObject(value);
}
}
return result;
};
return filterObject(filtered);
},
// Validate data
validate: (data) => {
const requiredFields = ['id', 'timestamp', 'type'];
const missing = requiredFields.filter(field => !data[field]);
if (missing.length > 0) {
throw new Error(`Missing required fields: ${missing.join(', ')}`);
}
return {
...data,
validated: true,
validationTimestamp: Date.now()
};
},
// Aggregate data
aggregate: (data) => {
const now = Date.now();
const hourBucket = Math.floor(now / (60 * 60 * 1000));
const dayBucket = Math.floor(now / (24 * 60 * 60 * 1000));
return {
...data,
aggregations: {
hourly: {
bucket: hourBucket,
timestamp: hourBucket * (60 * 60 * 1000),
count: 1
},
daily: {
bucket: dayBucket,
timestamp: dayBucket * (24 * 60 * 60 * 1000),
count: 1
}
}
};
},
// Convert to Avro schema
toAvro: (data) => {
// This would convert the data to Avro format
// For demonstration, we'll just add Avro metadata
return {
schema: 'user_events_v1',
avroData: data,
convertedAt: Date.now()
};
}
}
extractCountry(data) {
// Extract country from IP address, metadata, or other fields
return data.country || data.ip_country || data.location?.country || 'unknown';
}
extractDevice(data) {
// Extract device information from user agent or other fields
const userAgent = data.userAgent || data.user_agent;
if (userAgent) {
if (userAgent.includes('Mobile')) return 'mobile';
if (userAgent.includes('Tablet')) return 'tablet';
if (userAgent.includes('Desktop')) return 'desktop';
}
return 'unknown';
}
extractPlatform(data) {
// Extract platform from various fields
const userAgent = data.userAgent || data.user_agent;
if (userAgent) {
if (userAgent.includes('Android')) return 'android';
if (userAgent.includes('iOS')) return 'ios';
if (userAgent.includes('Windows')) return 'windows';
if (userAgent.includes('Mac')) return 'macos';
if (userAgent.includes('Linux')) return 'linux';
}
return 'web';
}
}
// 4. Complex Event Processing
// event-processor.js
class ComplexEventProcessor {
constructor() {
this.eventPatterns = new Map();
this.eventRules = new Map();
this.eventHandlers = new Map();
}
// Register event pattern
registerPattern(patternName, pattern) {
this.eventPatterns.set(patternName, pattern);
}
// Register event rule
registerRule(ruleName, rule) {
this.eventRules.set(ruleName, rule);
}
// Register event handler
registerHandler(handlerName, handler) {
this.eventHandlers.set(handlerName, handler);
}
// Process complex event
async processEvent(event) {
const results = [];
const context = {
event,
timestamp: Date.now(),
processedPatterns: [],
matchedRules: [],
triggeredHandlers: []
};
// Apply pattern matching
for (const [patternName, pattern] of this.eventPatterns) {
if (this.matchPattern(event, pattern)) {
context.processedPatterns.push(patternName);
// Apply rules for this pattern
for (const [ruleName, rule] of this.eventRules) {
if (rule.patterns.includes(patternName)) {
const ruleResult = await this.applyRule(event, rule, context);
if (ruleResult.matched) {
context.matchedRules.push(ruleName);
// Trigger handlers
for (const handlerName of rule.handlers || []) {
const handler = this.eventHandlers.get(handlerName);
if (handler) {
const handlerResult = await handler(event, context);
results.push(handlerResult);
context.triggeredHandlers.push(handlerName);
}
}
}
}
}
}
}
return {
results,
context
};
}
matchPattern(event, pattern) {
// Simple pattern matching - in real implementation, use more sophisticated pattern matching
if (pattern.type && event.type !== pattern.type) return false;
if (pattern.source && event.source !== pattern.source) return false;
if (pattern.conditions) {
return pattern.conditions.every(condition => {
const value = this.getNestedValue(event, condition.field);
return this.compareValues(value, condition.operator, condition.value);
});
}
return true;
}
applyRule(event, rule, context) {
// Apply rule conditions
if (rule.conditions) {
const matches = rule.conditions.every(condition => {
const value = this.getNestedValue(event, condition.field);
return this.compareValues(value, condition.operator, condition.value);
});
return { matched: matches, rule };
}
return { matched: true, rule };
}
getNestedValue(obj, path) {
return path.split('.').reduce((current, key) => {
return current && current[key] !== undefined ? current[key] : undefined;
}, obj);
}
compareValues(value1, operator, value2) {
switch (operator) {
case 'equals': return value1 === value2;
case 'not_equals': return value1 !== value2;
case 'greater_than': return Number(value1) > Number(value2);
case 'less_than': return Number(value1) < Number(value2);
case 'contains': return String(value1).includes(String(value2));
case 'in': return Array.isArray(value2) ? value2.includes(value1) : false;
case 'exists': return value1 !== undefined && value1 !== null;
default: return true;
}
}
}
// 5. Usage Examples
// main.js
const streamProcessor = new StreamProcessor({
clientId: 'stream-processor-app',
brokers: ['localhost:9092']
});
// Register processors
streamProcessor.registerProcessor('user-events', new AnalyticsProcessor());
// Create transformation pipeline
const pipeline = new DataTransformationPipeline();
pipeline.addTransformation('normalize', DataTransformationPipeline.transformations.normalize);
pipeline.addTransformation('enrich', DataTransformationPipeline.transformations.enrich);
pipeline.addTransformation('filter', DataTransformationPipeline.transformations.filterSensitive);
pipeline.addTransformation('validate', DataTransformationPipeline.transformations.validate);
pipeline.addTransformation('aggregate', DataTransformationPipeline.transformations.aggregate);
// Register with stream processor
streamProcessor.registerProcessor('raw-events', async (input) => {
const result = await pipeline.transform(input);
return {
topic: 'processed-events',
data: result.data
};
});
// Start processing
await streamProcessor.start();
💻 Управление кластером Kafka и операции yaml
🔴 complex
⭐⭐⭐⭐⭐
Настройка производственного кластера Kafka, мониторинг, безопасность и лучшие операционные практики
⏱️ 60 min
🏷️ kafka, cluster, operations, production
Prerequisites:
Docker, Kafka basics, System administration
# Kafka Cluster Management and Operations
# 1. Production Kafka Cluster Docker Compose
# kafka-cluster.yml
version: '3.8'
services:
# Zookeeper Ensemble
zookeeper-1:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper-1
container_name: zookeeper-1
ports:
- "2181:2181"
- "2888:2888"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_TIME: 2
ZOOKEEPER_MAX_CLIENT_CNXNS: 60
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KAFKA_OPTS: "-Dzookeeper.4lw.commands.kv.path=/var/lib/zookeeper/data"
volumes:
- zookeeper-1-data:/var/lib/zookeeper/data
- zookeeper-1-logs:/var/lib/zookeeper/log
zookeeper-2:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper-2
container_name: zookeeper-2
ports:
- "2182:2181"
- "2889:2888"
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_TIME: 2
ZOOKEEPER_MAX_CLIENT_CNXNS: 60
ZOOKEEPER_SERVER_QUORUM_VOTES: 2
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KOOKEEPER_SERVERS: zookeeper-1:2888,zookeeper-2:2888
volumes:
- zookeeper-2-data:/var/lib/zookeeper/data
- zookeeper-2-logs:/var/lib/zookeeper/log
zookeeper-3:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper-3
container_name: zookeeper-3
ports:
- "2183:2181"
- "2890:2888"
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_TIME: 2
ZOOKEEPER_MAX_CLIENT_CNXNS: 60
ZOOKEEPER_SERVER_QUORUM_VOTES: 2
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: localhost
KOOKEEPER_SERVERS: zookeeper-1:2888,zookeeper-2:2888
volumes:
- zookeeper-3-data:/var/lib/zookeeper/data
- zookeeper-3-logs:/var/lib/zookeeper/log
# Kafka Brokers
kafka-1:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-1
container_name: kafka-1
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- "9092:9092"
- "29092:29092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://host.docker.internal:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_MESSAGE_MAX_BYTES: 1000000
KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
KAFKA_LOG_CLEANUP_POLICY: 'delete'
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-1'
KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
volumes:
- kafka-1-data:/var/lib/kafka/data
kafka-2:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-2
container_name: kafka-2
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- "9093:9092"
- "29093:29092"
- "9102:9101"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://host.docker.internal:29093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_MESSAGE_MAX_BYTES: 1000000
KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
KAFKA_LOG_CLEANUP_POLICY: 'delete'
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-2'
KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
volumes:
- kafka-2-data:/var/lib/kafka/data
kafka-3:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-3
container_name: kafka-3
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- "9094:9092"
- "29094:29092"
- "9103:9101"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092,PLAINTEXT_HOST://host.docker.internal:29094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_MESSAGE_MAX_BYTES: 1000000
KAFKA_SOCKET_REQUEST_MAX_BYTES: 104857600
KAFKA_LOG_CLEANUP_POLICY: 'delete'
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CONFLUENT_METRICS_REPORTER_EXPORTER_JMX_ENABLED: 'true'
KAFKA_CONFLUENT_BALANCER_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CONFLUENT_BALANCER_HOST_NAME: localhost
KAFKA_CONFLUENT_BALANCER_CLIENT_ID: 'load-balancer-3'
KAFKA_CONFLUENT_BALANCER_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:8091,PLAINTEXT://kafka-2:8091,PLAINTEXT://kafka-3:8091
KAFKA_CONFLUENT_BALANCER_SECURITY_PROTOCOL: PLAINTEXT
KAFKA_CONFLUENT_BALANCER_LISTENER_NAME: internal
volumes:
- kafka-3-data:/var/lib/kafka/data
# Load Balancer
kafka-lb:
image: nginx:alpine
container_name: kafka-lb
ports:
- "8091:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- kafka-1
- kafka-2
- kafka-3
# Schema Registry
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
SCHEMA_REGISTRY_DEBUG: 'true'
# Kafka UI
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: production
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_CLUSTERS_0_SCHEMA_REGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0: http://kafka-connect:8083
# Kafka Connect
kafka-connect:
image: confluentinc/cp-kafka-connect:7.4.0
hostname: kafka-connect
container_name: kafka-connect
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka-1:9092,kafka-2:9092,kafka-3:9092
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-statuses
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_REST_ADVERTISED_LISTENER: 'PLAINTEXT://kafka-connect:8083'
CONNECT_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
CONNECT_SECURITY_PROTOCOL: PLAINTEXT
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper,org.apache.kafka.connect.rest,org.apache.kafka.connect
CONNECT_AUTO_CREATE_TOPICS_ENABLE: 'true'
# Monitoring
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
depends_on:
- kafka-1
- kafka-2
- kafka-3
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana-data:/var/lib/grafana
depends_on:
- prometheus
# Elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
- "xpack.security.enabled=false"
ports:
- "9200:9200"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
# Kibana
kibana:
image: docker.elastic.co/kibana/kibana:8.8.0
container_name: kibana
ports:
- "5601:5601"
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
depends_on:
- elasticsearch
volumes:
zookeeper-1-data:
zookeeper-1-logs:
zookeeper-2-data:
zookeeper-2-logs:
zookeeper-3-data:
zookeeper-3-logs:
kafka-1-data:
kafka-2-data:
kafka-3-data:
prometheus-data:
grafana-data:
elasticsearch-data:
networks:
default:
driver: bridge
# 2. Nginx Configuration for Load Balancer
# nginx.conf
events {}
http {
upstream kafka_backend {
least_conn;
server kafka-1:8091 max_fails=3 fail_timeout=30s;
server kafka-2:8091 max_fails=3 fail_timeout=30s;
server kafka-3:8091 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://kafka_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /kafka-1 {
proxy_pass http://kafka-1:8091;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location /kafka-2 {
proxy_pass http://kafka-2:8091;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location /kafka-3 {
proxy_pass http://kafka-3:8091;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# Health check endpoints
location /health {
access_log off;
return 200 "healthy\n";
add_header Content-Type text/plain;
}
# Kafka Admin API proxy (if needed)
location /admin/ {
proxy_pass http://kafka-1:8090;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
}
# 3. Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "kafka_rules.yml"
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets:
- 'kafka-1:9101'
- 'kafka-2:9102'
- 'kafka-3:9103'
metrics_path: /metrics
scrape_interval: 10s
- job_name: 'schema-registry'
static_configs:
- targets: ['schema-registry:8081']
metrics_path: /metrics
- job_name: 'kafka-connect'
static_configs:
- targets: ['kafka-connect:8083']
metrics_path: /metrics
# 4. Kafka Alerting Rules
# kafka_rules.yml
groups:
- name: kafka
rules:
- alert: UnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 0m
labels:
severity: critical
annotations:
summary: "Under-replicated partitions detected"
description: "Kafka cluster has under-replicated partitions which may lead to data loss"
- alert: OfflinePartitionsCount
expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
for: 0m
labels:
severity: critical
annotations:
summary: "Offline partitions detected"
description: "Controller reports offline partitions"
- alert: ActiveControllerCount
expr: kafka_controller_kafkacontroller_activecontrollercount != 1
for: 0m
labels:
severity: critical
annotations:
summary: "Controller count is not 1"
description: "Active controller count should be exactly 1"
- alert: BrokerDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka broker is down"
description: "One or more Kafka brokers are not responding"
# 5. Kafka Cluster Management Scripts
# cluster-manager.sh
#!/bin/bash
# Kafka Cluster Management Script
# Usage: ./cluster-manager.sh [start|stop|restart|status|scale|backup|monitor]
KAFKA_HOME="/opt/kafka"
ZOOKEEPER_HOME="/opt/zookeeper"
KAFKA_BROKERS="kafka1:9092,kafka2:9092,kafka3:9092"
TOPICS=("user-events" "order-events" "product-updates" "error-logs")
start_cluster() {
echo "Starting Kafka cluster..."
# Start Zookeeper
for i in {1..3}; do
echo "Starting Zookeeper $i..."
$ZOOKEEPER_HOME/bin/zookeeper-server-start.sh -daemon $ZOOKEEPER_HOME/config/zookeeper$i.properties
sleep 5
done
# Start Kafka brokers
for i in {1..3}; do
echo "Starting Kafka broker $i..."
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties
sleep 10
done
echo "Kafka cluster started successfully!"
}
stop_cluster() {
echo "Stopping Kafka cluster..."
# Stop Kafka brokers
for i in {3..1}; do
echo "Stopping Kafka broker $i..."
$KAFKA_HOME/bin/kafka-server-stop.sh $KAFKA_HOME/config/server$i.properties
done
# Stop Zookeeper
for i in {3..1}; do
echo "Stopping Zookeeper $i..."
$ZOOKEEPER_HOME/bin/zookeeper-server-stop.sh
sleep 2
done
echo "Kafka cluster stopped!"
}
check_status() {
echo "Checking Kafka cluster status..."
# Check Zookeeper
echo "Checking Zookeeper status..."
for i in {1..3}; do
if pgrep -f "zookeeper.*server$i.properties" > /dev/null; then
echo "✓ Zookeeper $i is running"
else
echo "✗ Zookeeper $i is not running"
fi
done
# Check Kafka brokers
echo "Checking Kafka broker status..."
for i in {1..3}; do
if pgrep -f "kafka.*server$i.properties" > /dev/null; then
echo "✓ Kafka broker $i is running"
else
echo "✗ Kafka broker $i is not running"
fi
done
# Check topic status
echo "Checking topic status..."
for topic in "${TOPICS[@]}"; do
if $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --describe --topic $topic >/dev/null 2>&1; then
echo "✓ Topic '$topic' exists"
else
echo "✗ Topic '$topic' does not exist"
fi
done
}
create_topics() {
echo "Creating Kafka topics..."
for topic in "${TOPICS[@]}"; do
echo "Creating topic: $topic"
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --create --topic $topic --partitions 3 --replication-factor 2
echo "✓ Topic '$topic' created with 3 partitions and replication factor 2"
done
}
backup_cluster() {
echo "Starting Kafka cluster backup..."
BACKUP_DIR="/backup/kafka/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$BACKUP_DIR"
# Backup configuration files
echo "Backing up configuration files..."
cp -r $KAFKA_HOME/config/ "$BACKUP_DIR/kafka-config/"
cp -r $ZOOKEEPER_HOME/config/ "$BACKUP_DIR/zookeeper-config/"
# Backup topics data (metadata)
echo "Backing up topic metadata..."
for topic in "${TOPICS[@]}"; do
echo "Backing up topic metadata for: $topic"
$KAFKA_HOME/bin/kafka-configs.sh --bootstrap-server $KAFKA_BROKERS --entity-type topics --entity-name $topic --describe > "$BACKUP_DIR/topic-${topic}.json"
done
# Backup consumer group offsets
echo "Backing up consumer group offsets..."
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server $KAFKA_BROKERS --describe --all-groups > "$BACKUP_DIR/consumer-groups.json"
echo "Backup completed: $BACKUP_DIR"
}
monitor_cluster() {
echo "Starting cluster monitoring..."
# JMX monitoring setup would go here
echo "Enabling JMX monitoring on all brokers..."
# Check if Kafka Exporter is running
if pgrep -f "kafka-exporter" > /dev/null; then
echo "✓ Kafka Exporter is running"
else
echo "Starting Kafka Exporter..."
# Start Kafka Exporter for Prometheus
$KAFKA_HOME/bin/kafka-exporter.sh --bootstrap-server $KAFKA_BROKERS --producer.config $KAFKA_HOME/config/producer.properties --kafka-prometheus-sink
fi
echo "Cluster monitoring enabled"
echo "Access Grafana at: http://localhost:3000"
echo "Access Prometheus at: http://localhost:9090"
}
scale_brokers() {
local new_brokers=$1
echo "Scaling Kafka cluster to $new_brokers brokers..."
# Add new broker configuration
for ((i=4; i<=new_brokers; i++)); do
echo "Configuring broker $i..."
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server$i.properties
# Update broker ID
sed -i "s/broker.id=1/broker.id=$i/" $KAFKA_HOME/config/server$i.properties
# Start new broker
echo "Starting Kafka broker $i..."
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server$i.properties
sleep 10
done
# Rebalance topics for new brokers
echo "Rebalancing topics for new brokers..."
for topic in "${TOPICS[@]}"; do
echo "Increasing partition count for topic: $topic"
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BROKERS --alter --topic $topic --partitions 6
done
echo "Cluster scaled to $new_broker_count brokers successfully!"
}
# Main execution
case "${1:-}" in
"start")
start_cluster
;;
"stop")
stop_cluster
;;
"restart")
stop_cluster
sleep 5
start_cluster
;;
"status")
check_status
;;
"create-topics")
create_topics
;;
"backup")
backup_cluster
;;
"monitor")
monitor_cluster
;;
"scale")
if [ -z "${2:-}" ]; then
echo "Usage: $0 scale <number_of_brokers>"
exit 1
fi
scale_brokers "$2"
;;
*)
echo "Usage: $0 {start|stop|restart|status|create-topics|backup|monitor|scale}"
echo " start - Start the Kafka cluster"
echo " stop - Stop the Kafka cluster"
echo " restart - Restart the Kafka cluster"
echo " status - Check cluster status"
echo " create-topics - Create predefined topics"
echo " backup - Backup cluster configuration and data"
echo " monitor - Enable monitoring"
echo " scale <number> - Scale cluster to specified number of brokers"
exit 1
;;
esac
# 6. Topic Configuration Templates
# topic-configs.json
{
"topic_templates": {
"high-throughput": {
"partitions": 12,
"replication_factor": 3,
"config": {
"retention.ms": "604800000",
"cleanup.policy": "delete",
"segment.ms": "86400000",
"max.message.bytes": "10485760",
"compression.type": "lz4",
"unclean.leader.election.enable": "true",
"min.insync.replicas": "2"
}
},
"low-latency": {
"partitions": 6,
"replication_factor": 3,
"config": {
"retention.ms": "86400000",
"cleanup.policy": "delete",
"segment.ms": "3600000",
"max.message.bytes": "1000000",
"compression.type": "none",
"unclean.leader.election.enable": "true",
"min.insync.replicas": "2",
"linger.ms": "0"
}
},
"event-sourcing": {
"partitions": 3,
"replication_factor": 3,
"config": {
"retention.ms": "-1",
"cleanup.policy": "compact",
"segment.ms": "86400000",
"max.message.bytes": "10485760",
"compression.type": "gzip",
"unclean.leader.election.enable": "true",
"min.insync.replicas": "2",
"min.cleanable.dirty.ratio": "0.01"
}
},
"logs": {
"partitions": 3,
"replication_factor": 2,
"config": {
"retention.ms": "604800000",
"cleanup.policy": "delete",
"segment.ms": "86400000",
"max.message.bytes": "10485760",
"compression.type": "gzip",
"unclean.leader.election.enable": "true",
"min.insync.replicas": "1"
}
}
}
}