RocketMQ 消息队列示例
Apache RocketMQ 示例,包括生产者、消费者、顺序消息、延迟消息、广播消费和事务消息,适用于高吞吐量消息传递
💻 RocketMQ Hello World - 基础生产者消费者 javascript
🟢 simple
⭐
简单的 Apache RocketMQ 生产者和消费者示例,演示基础的消息队列操作
⏱️ 10 min
🏷️ rocketmq, messaging, beginner
Prerequisites:
RocketMQ server running, Node.js
// RocketMQ Hello World - Basic Producer/Consumer Example
// Install: npm install apache-rocketmq
const { Producer, Consumer, MessageModel } = require('apache-rocketmq');
async function helloWorld() {
// RocketMQ configuration
const config = {
endpoints: 'localhost:9876',
accessKey: 'rocketmq',
secretKey: '12345678',
instanceName: 'hello-world-example',
};
console.log('=== RocketMQ Hello World Example ===');
// Create consumer
const consumer = new Consumer({
...config,
consumerGroup: 'hello-consumer-group',
topic: 'hello-topic',
tag: '*',
messageModel: MessageModel.CLUSTERING,
});
// Create producer
const producer = new Producer({
...config,
producerGroup: 'hello-producer-group',
});
// Start consumer
console.log('Starting consumer...');
await consumer.start();
// Set up message listener
consumer.subscribe((msg, ack) => {
console.log(`Received message: ${msg.body.toString()}`);
console.log(`Message ID: ${msg.id}`);
console.log(`Topic: ${msg.topic}`);
console.log(`Tag: ${msg.tag}`);
console.log('---');
// Acknowledge message
ack();
});
// Start producer
console.log('Starting producer...');
await producer.start();
// Wait a moment for consumer to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Send messages
console.log('Sending messages...');
const messages = [
'Hello RocketMQ!',
'Apache RocketMQ is powerful!',
'Message queue made easy!',
'High performance messaging!',
];
for (let i = 0; i < messages.length; i++) {
const message = {
topic: 'hello-topic',
tag: 'hello',
body: Buffer.from(messages[i]),
keys: ['key-' + (i + 1)],
properties: {
source: 'hello-world-example',
timestamp: new Date().toISOString(),
},
};
const result = await producer.send(message);
console.log(`Message sent: ${messages[i]}`);
console.log(`Message ID: ${result.messageId}`);
console.log('---');
}
// Wait for messages to be consumed
console.log('Waiting for messages to be consumed...');
await new Promise(resolve => setTimeout(resolve, 5000));
// Shutdown
console.log('Shutting down...');
await producer.shutdown();
await consumer.shutdown();
console.log('RocketMQ Hello World example completed!');
}
helloWorld().catch(err => {
console.error('Error:', err);
});
💻 RocketMQ 顺序消息 javascript
🟡 intermediate
⭐⭐⭐
使用分片键实现有序消息传递,确保同一分片键的消息按顺序处理
⏱️ 20 min
🏷️ rocketmq, ordered, sequencing
Prerequisites:
RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Ordered Messages Example
// Ensures message order within the same sharding key
const { Producer, Consumer, MessageModel } = require('apache-rocketmq');
// Order event producer
async function orderEventProducer() {
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'order-producer-group',
instanceName: 'order-event-producer',
});
await producer.start();
console.log('Order event producer started');
// Simulate order events for multiple users
const orders = [
// User 1 - Order lifecycle (should be processed in order)
{ userId: 'user001', orderId: 'ORD001', event: 'order.created', amount: 99.99 },
{ userId: 'user001', orderId: 'ORD001', event: 'payment.processed', amount: 99.99 },
{ userId: 'user001', orderId: 'ORD001', event: 'order.confirmed', amount: 99.99 },
{ userId: 'user001', orderId: 'ORD001', event: 'order.shipped', amount: 99.99 },
{ userId: 'user001', orderId: 'ORD001', event: 'order.delivered', amount: 99.99 },
// User 2 - Order lifecycle (should be processed in order)
{ userId: 'user002', orderId: 'ORD002', event: 'order.created', amount: 149.99 },
{ userId: 'user002', orderId: 'ORD002', event: 'payment.processed', amount: 149.99 },
{ userId: 'user002', orderId: 'ORD002', event: 'order.confirmed', amount: 149.99 },
// User 3 - Order lifecycle (should be processed in order)
{ userId: 'user003', orderId: 'ORD003', event: 'order.created', amount: 79.99 },
{ userId: 'user003', orderId: 'ORD003', event: 'payment.failed', amount: 79.99 },
{ userId: 'user003', orderId: 'ORD003', event: 'order.cancelled', amount: 79.99 },
// More events for user 1 and 2
{ userId: 'user001', orderId: 'ORD004', event: 'order.created', amount: 199.99 },
{ userId: 'user002', orderId: 'ORD005', event: 'order.created', amount: 59.99 },
];
console.log('Sending order events...');
for (let i = 0; i < orders.length; i++) {
const order = orders[i];
const message = {
topic: 'order-events',
tag: order.event,
body: Buffer.from(JSON.stringify({
...order,
timestamp: new Date().toISOString(),
sequence: i + 1,
})),
// Use userId as sharding key to ensure order for each user
shardingKey: order.userId,
keys: [order.orderId, order.userId],
properties: {
eventType: order.event,
userId: order.userId,
orderId: order.orderId,
},
};
const result = await producer.send(message);
console.log(`[${order.userId}] ${order.event}: ${order.orderId} (Message ID: ${result.messageId})`);
// Small delay to simulate real-world scenario
await new Promise(resolve => setTimeout(resolve, 100));
}
await producer.shutdown();
console.log('Order event producer stopped');
}
// Order event consumer with ordered processing
async function orderEventConsumer(consumerId) {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: `order-consumer-group-${consumerId}`,
topic: 'order-events',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: `order-consumer-${consumerId}`,
// Enable ordered message consumption
orderly: true,
});
await consumer.start();
console.log(`Order event consumer ${consumerId} started`);
// Track order states for each user
const orderStates = new Map();
consumer.subscribe((msg, ack) => {
try {
const orderEvent = JSON.parse(msg.body.toString());
console.log(`[CONSUMER ${consumerId}] Processing: ${orderEvent.userId} -> ${orderEvent.event}`);
// Validate order sequence for each user
const userState = orderStates.get(orderEvent.userId) || { lastEvent: null, expectedEvent: 'order.created' };
// Simple order state validation
const validTransitions = {
'order.created': ['payment.processed', 'payment.failed'],
'payment.processed': ['order.confirmed'],
'payment.failed': ['order.cancelled'],
'order.confirmed': ['order.shipped'],
'order.shipped': ['order.delivered'],
'order.cancelled': ['order.created'], // Can create new order after cancellation
'order.delivered': ['order.created'], // Can create new order after delivery
};
if (userState.lastEvent) {
const allowedNextEvents = validTransitions[userState.lastEvent] || [];
if (!allowedNextEvents.includes(orderEvent.event)) {
console.log(`[CONSUMER ${consumerId}] WARNING: Invalid order transition for ${orderEvent.userId}: ${userState.lastEvent} -> ${orderEvent.event}`);
}
}
// Update order state
orderStates.set(orderEvent.userId, {
lastEvent: orderEvent.event,
lastOrderId: orderEvent.orderId,
});
console.log(`[CONSUMER ${consumerId}] Processed: ${orderEvent.userId} -> ${orderEvent.event} -> ${orderEvent.orderId}`);
console.log(`[CONSUMER ${consumerId}] Current state for ${orderEvent.userId}: ${orderStates.get(orderEvent.userId).lastEvent}`);
console.log('---');
// Acknowledge message
ack();
} catch (err) {
console.error(`[CONSUMER ${consumerId}] Error processing message:`, err);
ack(); // Acknowledge even on error to avoid blocking the queue
}
});
return consumer;
}
// Demo ordered vs unordered messages
async function demoOrderingComparison() {
console.log('\n=== Ordered vs Unordered Comparison ===');
// Unordered producer (random order)
const unorderedProducer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'unordered-producer-group',
instanceName: 'unordered-producer',
});
await unorderedProducer.start();
console.log('Sending unordered messages...');
const unorderedMessages = ['A', 'B', 'C', 'D', 'E'];
for (const msg of unorderedMessages) {
await unorderedProducer.send({
topic: 'unordered-test',
body: Buffer.from(`Unordered message: ${msg}`),
});
console.log(`Sent unordered: ${msg}`);
}
await unorderedProducer.shutdown();
}
// Main function
async function main() {
console.log('=== RocketMQ Ordered Messages Example ===');
try {
// Step 1: Start multiple consumers to show load balancing
console.log('\n1. Starting consumers...');
const consumers = [];
for (let i = 1; i <= 3; i++) {
consumers.push(await orderEventConsumer(i));
}
// Step 2: Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Step 3: Start producer to send ordered messages
console.log('\n2. Sending ordered messages...');
const producerPromise = orderEventProducer();
// Step 4: Wait for message processing
await producerPromise;
console.log('\n3. Waiting for message processing...');
await new Promise(resolve => setTimeout(resolve, 8000));
// Step 5: Demo comparison
await demoOrderingComparison();
// Step 6: Shutdown
console.log('\n4. Shutting down consumers...');
for (const consumer of consumers) {
await consumer.shutdown();
}
console.log('Ordered messages example completed!');
} catch (err) {
console.error('Error in ordered messages example:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 RocketMQ 延迟消息 javascript
🟡 intermediate
⭐⭐⭐
使用 RocketMQ 内置的延迟功能实现延迟和定时消息传递
⏱️ 25 min
🏷️ rocketmq, delayed, scheduled, timing
Prerequisites:
RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Delayed Messages Example
// Demonstrates various types of delayed and scheduled message delivery
const { Producer, Consumer, MessageModel } = require('apache-rocketmq');
// Delay levels in RocketMQ (seconds)
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
const DELAY_LEVELS = {
'1s': 1,
'5s': 2,
'10s': 3,
'30s': 4,
'1m': 5,
'2m': 6,
'3m': 7,
'4m': 8,
'5m': 9,
'6m': 10,
'7m': 11,
'8m': 12,
'9m': 13,
'10m': 14,
'20m': 15,
'30m': 16,
'1h': 17,
'2h': 18,
};
// Reminder service producer
async function reminderProducer() {
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'reminder-producer-group',
instanceName: 'reminder-producer',
});
await producer.start();
console.log('Reminder producer started');
// Different types of reminders with various delays
const reminders = [
{
id: 'rem001',
type: 'meeting',
message: 'Team standup meeting starts now!',
delay: '10s',
recipient: '[email protected]',
},
{
id: 'rem002',
type: 'deadline',
message: 'Project submission deadline approaching!',
delay: '1m',
recipient: '[email protected]',
},
{
id: 'rem003',
type: 'birthday',
message: 'Happy birthday! Don't forget to wish them well!',
delay: '2m',
recipient: '[email protected]',
},
{
id: 'rem004',
type: 'maintenance',
message: 'System maintenance scheduled for tonight',
delay: '5m',
recipient: '[email protected]',
},
{
id: 'rem005',
type: 'follow-up',
message: 'Follow up with client about proposal',
delay: '30s',
recipient: '[email protected]',
},
];
console.log('Sending delayed reminders...');
const sendTime = new Date();
for (const reminder of reminders) {
const delayLevel = DELAY_LEVELS[reminder.delay];
if (!delayLevel) {
console.log(`Warning: Unknown delay level '${reminder.delay}' for reminder ${reminder.id}`);
continue;
}
const message = {
topic: 'reminder-topic',
tag: reminder.type,
body: Buffer.from(JSON.stringify({
...reminder,
scheduledAt: sendTime.toISOString(),
delayLevel: delayLevel,
expectedAt: new Date(sendTime.getTime() + (delayLevel * 1000)).toISOString(),
})),
keys: [reminder.id],
properties: {
reminderId: reminder.id,
reminderType: reminder.type,
delay: reminder.delay,
delayLevel: delayLevel.toString(),
scheduledAt: sendTime.toISOString(),
},
// Set delay level
delayTimeLevel: delayLevel,
};
const result = await producer.send(message);
console.log(`[${reminder.delay.padEnd(3)}] ${reminder.type.padEnd(12)} -> ${reminder.recipient.padEnd(25)} (ID: ${reminder.id})`);
console.log(` Expected at: ${new Date(sendTime.getTime() + (delayLevel * 1000)).toLocaleTimeString()}`);
console.log(` Message ID: ${result.messageId}`);
console.log('---');
}
await producer.shutdown();
console.log('Reminder producer stopped');
}
// Notification service for handling reminders
async function reminderConsumer() {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'reminder-consumer-group',
topic: 'reminder-topic',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: 'reminder-consumer',
});
await consumer.start();
console.log('Reminder consumer started');
consumer.subscribe((msg, ack) => {
try {
const reminder = JSON.parse(msg.body.toString());
const receivedTime = new Date();
const scheduledTime = new Date(reminder.scheduledAt);
const expectedTime = new Date(reminder.expectedAt);
const actualDelay = (receivedTime.getTime() - scheduledTime.getTime()) / 1000;
const expectedDelay = (expectedTime.getTime() - scheduledTime.getTime()) / 1000;
console.log('\n🔔 REMINDER RECEIVED!');
console.log(` ID: ${reminder.id}`);
console.log(` Type: ${reminder.type}`);
console.log(` Recipient: ${reminder.recipient}`);
console.log(` Message: ${reminder.message}`);
console.log(` Scheduled: ${scheduledTime.toLocaleTimeString()}`);
console.log(` Expected: ${expectedTime.toLocaleTimeString()}`);
console.log(` Received: ${receivedTime.toLocaleTimeString()}`);
console.log(` Delay: ${actualDelay.toFixed(2)}s (expected: ${expectedDelay}s)`);
console.log(` Accuracy: ${Math.abs(actualDelay - expectedDelay) < 1 ? '✓ Good' : '⚠ Delayed'}`);
console.log('---');
// Here you would send actual notification (email, SMS, push notification, etc.)
// For demo purposes, we just log it
ack();
} catch (err) {
console.error('Error processing reminder:', err);
ack();
}
});
return consumer;
}
// Order cancellation with grace period
async function orderCancellationDemo() {
console.log('\n=== Order Cancellation with Grace Period ===');
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'order-cancellation-producer',
instanceName: 'order-cancellation-producer',
});
await producer.start();
// Create an order
const order = {
orderId: 'ORD-CANCEL-001',
userId: 'user-cancel-demo',
amount: 299.99,
items: ['Premium Widget', 'Extended Warranty'],
createdAt: new Date().toISOString(),
};
console.log(`Creating order: ${order.orderId}`);
// Send order created message
await producer.send({
topic: 'order-lifecycle',
tag: 'order.created',
body: Buffer.from(JSON.stringify(order)),
keys: [order.orderId],
});
// Send delayed cancellation check
const cancellationCheck = {
orderId: order.orderId,
userId: order.userId,
type: 'auto-cancel-check',
scheduledAt: new Date().toISOString(),
gracePeriod: '30s',
};
await producer.send({
topic: 'order-lifecycle',
tag: 'cancellation.check',
body: Buffer.from(JSON.stringify(cancellationCheck)),
keys: [order.orderId, 'cancel-check'],
delayTimeLevel: DELAY_LEVELS['30s'], // Check after 30 seconds
});
console.log(`Scheduled cancellation check for ${order.orderId} in 30s`);
await producer.shutdown();
}
// Lifecycle consumer for orders
async function orderLifecycleConsumer() {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'order-lifecycle-consumer',
topic: 'order-lifecycle',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: 'order-lifecycle-consumer',
});
await consumer.start();
// Track order states
const orderStates = new Map();
consumer.subscribe((msg, ack) => {
try {
if (msg.tag === 'cancellation.check') {
const check = JSON.parse(msg.body.toString());
console.log(
⏰ CANCELLATION CHECK FOR ORDER: ${check.orderId}`);
// In a real system, you would check if payment was received
// For demo, we'll randomly decide to cancel or not
const shouldCancel = Math.random() > 0.5;
if (shouldCancel) {
console.log(` → Cancelling unpaid order ${check.orderId}`);
// Here you would send a cancellation message
} else {
console.log(` → Order ${check.orderId} is confirmed`);
// Here you might schedule fulfillment
}
} else {
const order = JSON.parse(msg.body.toString());
console.log(`📦 Order Event: ${msg.tag} -> ${order.orderId}`);
orderStates.set(order.orderId, msg.tag);
}
ack();
} catch (err) {
console.error('Error in lifecycle consumer:', err);
ack();
}
});
return consumer;
}
// Main function
async function main() {
console.log('=== RocketMQ Delayed Messages Example ===');
try {
// Step 1: Start consumers
console.log('\n1. Starting consumers...');
const reminderConsumerPromise = reminderConsumer();
const orderLifecyclePromise = orderLifecycleConsumer();
// Step 2: Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Step 3: Send reminders
console.log('\n2. Sending delayed reminders...');
await reminderProducer();
// Step 4: Demo order cancellation
await new Promise(resolve => setTimeout(resolve, 1000));
await orderCancellationDemo();
// Step 5: Wait for all delayed messages to be processed
console.log('\n3. Waiting for delayed messages to be delivered...');
await new Promise(resolve => setTimeout(resolve, 35000)); // Wait for longest delay (2m + buffer)
// Step 6: Shutdown consumers
console.log('\n4. Shutting down consumers...');
const reminderConsumer = await reminderConsumerPromise;
const orderConsumer = await orderLifecyclePromise;
await reminderConsumer.shutdown();
await orderConsumer.shutdown();
console.log('Delayed messages example completed!');
} catch (err) {
console.error('Error in delayed messages example:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 RocketMQ 广播消费 javascript
🟡 intermediate
⭐⭐⭐
实现广播消费模式,消费者组中的所有消费者都接收相同的消息
⏱️ 20 min
🏷️ rocketmq, broadcast, fanout
Prerequisites:
RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Broadcast Consumption Example
// Demonstrates broadcasting messages to all consumers in a consumer group
const { Producer, Consumer, MessageModel } = require('apache-rocketmq');
// Configuration source producer
async function configurationProducer() {
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'config-producer-group',
instanceName: 'config-producer',
});
await producer.start();
console.log('Configuration producer started');
// Configuration updates to broadcast
const configUpdates = [
{
id: 'cfg001',
type: 'feature_flag',
key: 'new_dashboard_enabled',
value: 'true',
version: '1.0.0',
description: 'Enable new dashboard UI for all users',
},
{
id: 'cfg002',
type: 'maintenance',
key: 'system_maintenance',
value: JSON.stringify({
scheduled: '2025-12-15T02:00:00Z',
duration: 2, // hours
affectedServices: ['api', 'web', 'mobile'],
}),
version: '1.0.0',
description: 'Scheduled system maintenance window',
},
{
id: 'cfg003',
type: 'rate_limit',
key: 'api_rate_limit',
value: '1000',
version: '1.1.0',
description: 'Update API rate limit to 1000 requests per minute',
},
{
id: 'cfg004',
type: 'security',
key: 'password_policy',
value: JSON.stringify({
minLength: 8,
requireUppercase: true,
requireLowercase: true,
requireNumbers: true,
requireSpecialChars: true,
}),
version: '2.0.0',
description: 'Enhanced password security requirements',
},
{
id: 'cfg005',
type: 'feature_flag',
key: 'beta_features_enabled',
value: 'false',
version: '1.0.1',
description: 'Disable beta features temporarily',
},
];
console.log('Broadcasting configuration updates...');
for (const config of configUpdates) {
const message = {
topic: 'configuration-updates',
tag: config.type,
body: Buffer.from(JSON.stringify({
...config,
timestamp: new Date().toISOString(),
publisher: 'config-service',
})),
keys: [config.id, config.key],
properties: {
configId: config.id,
configType: config.type,
configVersion: config.version,
priority: config.type === 'security' ? 'high' : 'normal',
},
};
const result = await producer.send(message);
console.log(`📡 Broadcast: ${config.type.padEnd(15)} ${config.key.padEnd(25)} -> v${config.version}`);
console.log(` ${config.description}`);
console.log(` Message ID: ${result.messageId}`);
console.log('---');
}
await producer.shutdown();
console.log('Configuration producer stopped');
}
// Service instance that receives configuration broadcasts
class ServiceInstance {
constructor(name, instanceId) {
this.name = name;
this.instanceId = instanceId;
this.config = new Map();
this.startTime = new Date();
}
async startConsumer() {
this.consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'config-consumer-group', // Same group for broadcast
topic: 'configuration-updates',
tag: '*',
messageModel: MessageModel.BROADCASTING, // Key for broadcast consumption
instanceName: `${this.name}-${this.instanceId}`,
});
await this.consumer.start();
console.log(`[${this.name}-${this.instanceId}] Consumer started`);
this.consumer.subscribe((msg, ack) => {
this.handleConfigurationUpdate(msg, ack);
});
return this.consumer;
}
handleConfigurationUpdate(msg, ack) {
try {
const configUpdate = JSON.parse(msg.body.toString());
const receivedTime = new Date();
console.log(`\n📥 [${this.name}-${this.instanceId}] Configuration Update Received`);
console.log(` Config ID: ${configUpdate.id}`);
console.log(` Type: ${configUpdate.type}`);
console.log(` Key: ${configUpdate.key}`);
console.log(` New Value: ${configUpdate.value}`);
console.log(` Version: ${configUpdate.version}`);
console.log(` Description: ${configUpdate.description}`);
console.log(` Published: ${new Date(configUpdate.timestamp).toLocaleTimeString()}`);
console.log(` Received: ${receivedTime.toLocaleTimeString()}`);
console.log(` Latency: ${(receivedTime.getTime() - new Date(configUpdate.timestamp).getTime())}ms`);
// Apply configuration update
const previousValue = this.config.get(configUpdate.key);
this.config.set(configUpdate.key, {
value: configUpdate.value,
version: configUpdate.version,
updatedAt: configUpdate.timestamp,
updatedAt: receivedTime.toISOString(),
});
console.log(` Previous value: ${previousValue ? previousValue.value : 'NOT_SET'}`);
console.log(` Status: ${this.applyConfiguration(configUpdate)}`);
// Show current configuration count
console.log(` Total configs: ${this.config.size}`);
console.log('---');
ack();
} catch (err) {
console.error(`[${this.name}-${this.instanceId}] Error handling config update:`, err);
ack();
}
}
applyConfiguration(configUpdate) {
try {
switch (configUpdate.type) {
case 'feature_flag':
const flagEnabled = configUpdate.value === 'true';
console.log(` → Feature flag '${configUpdate.key}' ${flagEnabled ? 'ENABLED' : 'DISABLED'}`);
this.updateFeatureFlag(configUpdate.key, flagEnabled);
break;
case 'rate_limit':
console.log(` → Rate limit updated to ${configUpdate.value} requests/min`);
this.updateRateLimit(parseInt(configUpdate.value));
break;
case 'security':
console.log(` → Security policy updated for '${configUpdate.key}'`);
this.updateSecurityPolicy(configUpdate.key, JSON.parse(configUpdate.value));
break;
case 'maintenance':
const maintenance = JSON.parse(configUpdate.value);
console.log(` → Maintenance scheduled: ${maintenance.scheduled}`);
this.scheduleMaintenance(maintenance);
break;
default:
console.log(` → Generic configuration applied`);
}
return 'SUCCESS';
} catch (err) {
console.error(` → Failed to apply configuration: ${err.message}`);
return 'ERROR';
}
}
// Simulate applying configurations
updateFeatureFlag(key, enabled) {
// In real service, this would toggle features
console.log(` [${this.name}] Feature '${key}' is now ${enabled ? 'ACTIVE' : 'INACTIVE'}`);
}
updateRateLimit(limit) {
// In real service, this would update rate limiting
console.log(` [${this.name}] Rate limit set to ${limit}/min`);
}
updateSecurityPolicy(key, policy) {
// In real service, this would update security policies
console.log(` [${this.name}] Security policy '${key}' updated`);
}
scheduleMaintenance(maintenance) {
// In real service, this would prepare for maintenance
console.log(` [${this.name}] Preparing for maintenance window`);
}
async shutdown() {
if (this.consumer) {
await this.consumer.shutdown();
console.log(`[${this.name}-${this.instanceId}] Consumer stopped`);
}
}
printStatus() {
console.log(`\n📊 [${this.name}-${this.instanceId}] Status:`);
console.log(` Uptime: ${Math.round((new Date() - this.startTime) / 1000)}s`);
console.log(` Configurations: ${this.config.size}`);
if (this.config.size > 0) {
console.log(' Active configs:');
for (const [key, config] of this.config.entries()) {
console.log(` - ${key}: v${config.version}`);
}
}
}
}
// Service health monitor
async function serviceHealthMonitor(services) {
console.log('\n=== Service Health Monitor ===');
const monitorInterval = setInterval(() => {
console.log('\n' + '='.repeat(50));
console.log(`Health Check - ${new Date().toLocaleTimeString()}`);
console.log('='.repeat(50));
services.forEach(service => {
service.printStatus();
});
}, 8000);
// Stop monitoring after 30 seconds
setTimeout(() => {
clearInterval(monitorInterval);
console.log('\nHealth monitoring stopped');
}, 30000);
}
// Demo cluster consumption vs broadcast consumption
async function demoConsumptionPatterns() {
console.log('\n=== Consumption Patterns Demo ===');
// Cluster consumer (load balancing)
const clusterConsumer1 = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'cluster-consumer-group',
topic: 'pattern-demo',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: 'cluster-consumer-1',
});
const clusterConsumer2 = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'cluster-consumer-group',
topic: 'pattern-demo',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: 'cluster-consumer-2',
});
// Broadcast consumers (all get all messages)
const broadcastConsumer1 = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'broadcast-consumer-group',
topic: 'pattern-demo',
tag: '*',
messageModel: MessageModel.BROADCASTING,
instanceName: 'broadcast-consumer-1',
});
const broadcastConsumer2 = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'broadcast-consumer-group',
topic: 'pattern-demo',
tag: '*',
messageModel: MessageModel.BROADCASTING,
instanceName: 'broadcast-consumer-2',
});
await clusterConsumer1.start();
await clusterConsumer2.start();
await broadcastConsumer1.start();
await broadcastConsumer2.start();
// Set up message handlers
let clusterMessageCount = 0;
let broadcast1MessageCount = 0;
let broadcast2MessageCount = 0;
clusterConsumer1.subscribe((msg, ack) => {
clusterMessageCount++;
console.log(`[CLUSTER-1] Got message ${clusterMessageCount}: ${msg.body.toString()}`);
ack();
});
clusterConsumer2.subscribe((msg, ack) => {
console.log(`[CLUSTER-2] Got message: ${msg.body.toString()}`);
ack();
});
broadcastConsumer1.subscribe((msg, ack) => {
broadcast1MessageCount++;
console.log(`[BROADCAST-1] Got message ${broadcast1MessageCount}: ${msg.body.toString()}`);
ack();
});
broadcastConsumer2.subscribe((msg, ack) => {
broadcast2MessageCount++;
console.log(`[BROADCAST-2] Got message ${broadcast2MessageCount}: ${msg.body.toString()}`);
ack();
});
// Producer for demo messages
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'pattern-demo-producer',
});
await producer.start();
// Send messages
for (let i = 1; i <= 5; i++) {
await producer.send({
topic: 'pattern-demo',
body: Buffer.from(`Demo message ${i}`),
});
await new Promise(resolve => setTimeout(resolve, 500));
}
// Wait for processing
await new Promise(resolve => setTimeout(resolve, 3000));
console.log('\n📈 Consumption Pattern Results:');
console.log(`Cluster consumers: ${clusterMessageCount} total messages (load balanced)`);
console.log(`Broadcast consumer 1: ${broadcast1MessageCount} messages (all received)`);
console.log(`Broadcast consumer 2: ${broadcast2MessageCount} messages (all received)`);
// Cleanup
await producer.shutdown();
await clusterConsumer1.shutdown();
await clusterConsumer2.shutdown();
await broadcastConsumer1.shutdown();
await broadcastConsumer2.shutdown();
}
// Main function
async function main() {
console.log('=== RocketMQ Broadcast Consumption Example ===');
try {
// Step 1: Create multiple service instances
console.log('\n1. Creating service instances...');
const services = [
new ServiceInstance('api-service', 'instance-1'),
new ServiceInstance('api-service', 'instance-2'),
new ServiceInstance('api-service', 'instance-3'),
new ServiceInstance('web-service', 'instance-1'),
new ServiceInstance('web-service', 'instance-2'),
];
// Step 2: Start consumers for all services
console.log('\n2. Starting broadcast consumers...');
const consumerPromises = services.map(service => service.startConsumer());
await Promise.all(consumerPromises);
// Step 3: Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Step 4: Start health monitor
console.log('\n3. Starting service health monitor...');
serviceHealthMonitor(services);
// Step 5: Send configuration updates
console.log('\n4. Broadcasting configuration updates...');
await configurationProducer();
// Step 6: Demo consumption patterns
await new Promise(resolve => setTimeout(resolve, 3000));
await demoConsumptionPatterns();
// Step 7: Wait for all processing to complete
console.log('\n5. Waiting for processing to complete...');
await new Promise(resolve => setTimeout(resolve, 35000));
// Step 8: Shutdown all services
console.log('\n6. Shutting down all services...');
const shutdownPromises = services.map(service => service.shutdown());
await Promise.all(shutdownPromises);
console.log('\nBroadcast consumption example completed!');
console.log('\nKey Points:');
console.log('- All instances in the same consumer group received every message');
console.log('- Each service instance applied configurations independently');
console.log('- Useful for configuration updates, cache invalidation, and notifications');
console.log('- Contrast with clustering which load balances messages');
} catch (err) {
console.error('Error in broadcast consumption example:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 RocketMQ 事务消息 javascript
🔴 complex
⭐⭐⭐⭐⭐
使用 RocketMQ 的事务消息功能实现分布式事务,确保最终一致性
⏱️ 30 min
🏷️ rocketmq, transaction, distributed, acid
Prerequisites:
RocketMQ server running, Node.js, Advanced RocketMQ knowledge, Understanding of distributed transactions
// RocketMQ Transactional Messages Example
// Demonstrates distributed transactions with local and remote operations
const { Producer, Consumer, MessageModel, TransactionMQProducer } = require('apache-rocketmq');
// Simulated database for orders
class OrderDatabase {
constructor() {
this.orders = new Map();
this.payments = new Map();
this.inventories = new Map();
}
async createOrder(order) {
console.log(`[DB] Creating order: ${order.orderId}`);
// Simulate database operation
await new Promise(resolve => setTimeout(resolve, 100));
this.orders.set(order.orderId, { ...order, status: 'created' });
return true;
}
async processPayment(payment) {
console.log(`[DB] Processing payment: ${payment.paymentId}`);
// Simulate payment processing
await new Promise(resolve => setTimeout(resolve, 200));
// Simulate occasional payment failures
if (Math.random() < 0.2) {
console.log(`[DB] Payment failed: ${payment.paymentId}`);
return false;
}
this.payments.set(payment.paymentId, { ...payment, status: 'completed' });
return true;
}
async updateInventory(inventoryUpdate) {
console.log(`[DB] Updating inventory for: ${inventoryUpdate.productId}`);
// Simulate inventory update
await new Promise(resolve => setTimeout(resolve, 50));
this.inventories.set(inventoryUpdate.productId, inventoryUpdate);
return true;
}
async rollbackOrder(orderId) {
console.log(`[DB] Rolling back order: ${orderId}`);
const order = this.orders.get(orderId);
if (order) {
order.status = 'cancelled';
this.orders.set(orderId, order);
}
}
getOrderStatus(orderId) {
const order = this.orders.get(orderId);
return order ? order.status : 'not_found';
}
}
// Transactional producer for order processing
async function createTransactionalProducer() {
const producer = new TransactionMQProducer({
endpoints: 'localhost:9876',
producerGroup: 'order-transaction-producer',
instanceName: 'order-transaction-producer',
// Transaction check listener
checkListener: async (msg) => {
try {
const transactionId = msg.properties.transactionId;
const orderData = JSON.parse(msg.body.toString());
console.log(`[TRANSACTION CHECK] Checking transaction ${transactionId} for order ${orderData.orderId}`);
// Check local transaction status
// In real application, you would query your database
const orderStatus = global.orderDB.getOrderStatus(orderData.orderId);
console.log(`[TRANSACTION CHECK] Order ${orderData.orderId} status: ${orderStatus}`);
// Return transaction status
if (orderStatus === 'created' || orderStatus === 'processing') {
return { state: 'COMMIT' };
} else if (orderStatus === 'cancelled') {
return { state: 'ROLLBACK' };
} else {
return { state: 'UNKNOWN' };
}
} catch (err) {
console.error('Transaction check error:', err);
return { state: 'UNKNOWN' };
}
},
});
await producer.start();
console.log('Transactional producer started');
return producer;
}
// Order service with transactional messaging
class OrderService {
constructor(producer, database) {
this.producer = producer;
this.db = database;
}
async processOrder(orderRequest) {
const transactionId = `tx_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
console.log(`\n🔄 Starting transaction ${transactionId} for order ${orderRequest.orderId}`);
// Step 1: Send half message (prepared message)
const halfMessage = {
topic: 'order-events',
tag: 'order.created',
body: Buffer.from(JSON.stringify({
...orderRequest,
transactionId: transactionId,
timestamp: new Date().toISOString(),
})),
keys: [orderRequest.orderId, transactionId],
properties: {
transactionId: transactionId,
orderId: orderRequest.orderId,
},
};
console.log(`[1/4] Sending half message for order ${orderRequest.orderId}`);
const sendResult = await this.producer.sendTransactionMessage(halfMessage);
try {
// Step 2: Execute local transaction
console.log(`[2/4] Executing local transaction for order ${orderRequest.orderId}`);
const localTxSuccess = await this.executeLocalTransaction(orderRequest, transactionId);
if (localTxSuccess) {
// Step 3: Commit transaction
console.log(`[3/4] Committing transaction ${transactionId}`);
await this.producer.commitTransaction(sendResult.transactionId);
console.log(`[4/4] ✅ Transaction ${transactionId} COMMITTED for order ${orderRequest.orderId}`);
return { success: true, transactionId };
} else {
// Step 4: Rollback transaction
console.log(`[3/4] Rolling back transaction ${transactionId}`);
await this.producer.rollbackTransaction(sendResult.transactionId);
console.log(`[4/4] ❌ Transaction ${transactionId} ROLLED BACK for order ${orderRequest.orderId}`);
return { success: false, transactionId };
}
} catch (err) {
console.error(`Local transaction failed for ${transactionId}:`, err);
console.log(`[3/4] Rolling back transaction ${transactionId} due to error`);
await this.producer.rollbackTransaction(sendResult.transactionId);
throw err;
}
}
async executeLocalTransaction(orderRequest, transactionId) {
try {
// Create order in local database
const orderSuccess = await this.db.createOrder({
...orderRequest,
status: 'processing',
transactionId: transactionId,
});
if (!orderSuccess) {
await this.db.rollbackOrder(orderRequest.orderId);
return false;
}
// Process payment
const paymentSuccess = await this.db.processPayment({
paymentId: `pay_${orderRequest.orderId}`,
orderId: orderRequest.orderId,
amount: orderRequest.amount,
transactionId: transactionId,
});
if (!paymentSuccess) {
await this.db.rollbackOrder(orderRequest.orderId);
return false;
}
// Update inventory
for (const item of orderRequest.items) {
const inventorySuccess = await this.db.updateInventory({
productId: item.productId,
quantity: -item.quantity, // Decrease inventory
orderId: orderRequest.orderId,
transactionId: transactionId,
});
if (!inventorySuccess) {
await this.db.rollbackOrder(orderRequest.orderId);
return false;
}
}
// Mark order as completed
const order = this.db.orders.get(orderRequest.orderId);
order.status = 'completed';
this.db.orders.set(orderRequest.orderId, order);
console.log(`[LOCAL TX] All operations completed for order ${orderRequest.orderId}`);
return true;
} catch (err) {
console.error(`[LOCAL TX] Error executing transaction for order ${orderRequest.orderId}:`, err);
await this.db.rollbackOrder(orderRequest.orderId);
return false;
}
}
}
// Order event consumer
async function orderEventConsumer() {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'order-event-consumer',
topic: 'order-events',
tag: '*',
messageModel: MessageModel.BROADCASTING, // All consumers get all messages
instanceName: 'order-event-consumer',
});
await consumer.start();
console.log('Order event consumer started');
consumer.subscribe((msg, ack) => {
try {
const orderEvent = JSON.parse(msg.body.toString());
const now = new Date();
console.log('\n📦 ORDER EVENT RECEIVED');
console.log(` Order ID: ${orderEvent.orderId}`);
console.log(` Transaction ID: ${orderEvent.transactionId}`);
console.log(` Amount: $${orderEvent.amount}`);
console.log(` Items: ${orderEvent.items.map(i => i.productId).join(', ')}`);
console.log(` Created At: ${new Date(orderEvent.timestamp).toLocaleTimeString()}`);
console.log(` Received At: ${now.toLocaleTimeString()}`);
console.log(` Processing Delay: ${(now.getTime() - new Date(orderEvent.timestamp).getTime())}ms`);
// Here you would:
// 1. Update search indexes
// 2. Send notifications
// 3. Update analytics
// 4. Trigger other downstream services
console.log(' → Processing downstream services...');
// Simulate downstream processing
setTimeout(() => {
console.log(` → Downstream processing completed for ${orderEvent.orderId}`);
}, 100);
console.log('---');
ack();
} catch (err) {
console.error('Error processing order event:', err);
ack();
}
});
return consumer;
}
// Payment verification service (consumer)
async function paymentVerificationConsumer() {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'payment-verification-consumer',
topic: 'order-events',
tag: 'order.created',
messageModel: MessageModel.CLUSTERING,
instanceName: 'payment-verification-consumer',
});
await consumer.start();
console.log('Payment verification consumer started');
consumer.subscribe((msg, ack) => {
try {
const orderEvent = JSON.parse(msg.body.toString());
console.log(`\n💳 Payment verification for order ${orderEvent.orderId}`);
// In a real system, you might:
// 1. Verify payment with external payment gateway
// 2. Update fraud detection systems
// 3. Send payment confirmation to customer
const paymentStatus = Math.random() > 0.1 ? 'VERIFIED' : 'REQUIRES_REVIEW';
console.log(` → Payment status: ${paymentStatus}`);
if (paymentStatus === 'REQUIRES_REVIEW') {
console.log(` → Order ${orderEvent.orderId} flagged for manual review`);
}
ack();
} catch (err) {
console.error('Error in payment verification:', err);
ack();
}
});
return consumer;
}
// Main function
async function main() {
console.log('=== RocketMQ Transactional Messages Example ===');
// Initialize database
global.orderDB = new OrderDatabase();
try {
// Step 1: Create transactional producer
console.log('\n1. Setting up transactional producer...');
const producer = await createTransactionalProducer();
// Step 2: Start consumers
console.log('\n2. Starting consumers...');
const orderConsumerPromise = orderEventConsumer();
const paymentConsumerPromise = paymentVerificationConsumer();
// Step 3: Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Step 4: Create order service
console.log('\n3. Creating order service...');
const orderService = new OrderService(producer, global.orderDB);
// Step 5: Process multiple orders
console.log('\n4. Processing orders...');
const orders = [
{
orderId: 'TX-ORDER-001',
userId: 'user-tx-001',
amount: 299.99,
items: [
{ productId: 'LAPTOP-001', quantity: 1, price: 199.99 },
{ productId: 'MOUSE-001', quantity: 1, price: 49.99 },
{ productId: 'KEYBOARD-001', quantity: 1, price: 50.01 },
],
},
{
orderId: 'TX-ORDER-002',
userId: 'user-tx-002',
amount: 149.99,
items: [
{ productId: 'MONITOR-001', quantity: 1, price: 149.99 },
],
},
{
orderId: 'TX-ORDER-003',
userId: 'user-tx-003',
amount: 89.97,
items: [
{ productId: 'HEADPHONE-001', quantity: 2, price: 44.99 },
],
},
];
// Process orders with delays
for (let i = 0; i < orders.length; i++) {
const order = orders[i];
console.log(`\n--- Processing Order ${i + 1}/${orders.length} ---`);
const result = await orderService.processOrder(order);
if (result.success) {
console.log(`✅ Order ${order.orderId} processed successfully`);
} else {
console.log(`❌ Order ${order.orderId} processing failed`);
}
// Wait between orders
if (i < orders.length - 1) {
await new Promise(resolve => setTimeout(resolve, 3000));
}
}
// Step 6: Wait for message processing
console.log('\n5. Waiting for message processing...');
await new Promise(resolve => setTimeout(resolve, 10000));
// Step 7: Shutdown
console.log('\n6. Shutting down...');
await producer.shutdown();
const orderConsumer = await orderConsumerPromise;
const paymentConsumer = await paymentConsumerPromise;
await orderConsumer.shutdown();
await paymentConsumer.shutdown();
console.log('\nTransactional messages example completed!');
console.log('\nSummary:');
console.log('- Demonstrated distributed transactions with local DB ops and remote messaging');
console.log('- Transaction consistency maintained even with failures');
console.log('- Downstream services only receive committed transactions');
} catch (err) {
console.error('Error in transactional messages example:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});