🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples RocketMQ Message Queue
Exemples Apache RocketMQ incluant producteurs, consommateurs, messages ordonnés, messages retardés, consommation de diffusion et messages transactionnels pour messagerie haute performance
💻 RocketMQ Hello World - Producteur/Consommateur de Base javascript
Exemple simple de producteur et consommateur Apache RocketMQ démontrant des opérations de base de file de messages
// RocketMQ Hello World - Basic Producer/Consumer Example
// Install: npm install apache-rocketmq
const { Producer, Consumer, MessageModel } = require('apache-rocketmq');
async function helloWorld() {
// RocketMQ configuration
const config = {
endpoints: 'localhost:9876',
accessKey: 'rocketmq',
secretKey: '12345678',
instanceName: 'hello-world-example',
};
console.log('=== RocketMQ Hello World Example ===');
// Create consumer
const consumer = new Consumer({
...config,
consumerGroup: 'hello-consumer-group',
topic: 'hello-topic',
tag: '*',
messageModel: MessageModel.CLUSTERING,
});
// Create producer
const producer = new Producer({
...config,
producerGroup: 'hello-producer-group',
});
// Start consumer
console.log('Starting consumer...');
await consumer.start();
// Set up message listener
consumer.subscribe((msg, ack) => {
console.log(`Received message: ${msg.body.toString()}`);
console.log(`Message ID: ${msg.id}`);
console.log(`Topic: ${msg.topic}`);
console.log(`Tag: ${msg.tag}`);
console.log('---');
// Acknowledge message
ack();
});
// Start producer
console.log('Starting producer...');
await producer.start();
// Wait a moment for consumer to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Send messages
console.log('Sending messages...');
const messages = [
'Hello RocketMQ!',
'Apache RocketMQ is powerful!',
'Message queue made easy!',
'High performance messaging!',
];
for (let i = 0; i < messages.length; i++) {
const message = {
topic: 'hello-topic',
tag: 'hello',
body: Buffer.from(messages[i]),
keys: ['key-' + (i + 1)],
properties: {
source: 'hello-world-example',
timestamp: new Date().toISOString(),
},
};
const result = await producer.send(message);
console.log(`Message sent: ${messages[i]}`);
console.log(`Message ID: ${result.messageId}`);
console.log('---');
}
// Wait for messages to be consumed
console.log('Waiting for messages to be consumed...');
await new Promise(resolve => setTimeout(resolve, 5000));
// Shutdown
console.log('Shutting down...');
await producer.shutdown();
await consumer.shutdown();
console.log('RocketMQ Hello World example completed!');
}
helloWorld().catch(err => {
console.error('Error:', err);
});
💻 Messages Ordonnés RocketMQ javascript
Implémenter la livraison de messages ordonnés en utilisant des clés de partitionnement avec des files de messages
// RocketMQ Ordered Messages Example
// Ensures message order within the same sharding key
const { Producer, Consumer, MessageModel } = require('apache-rocketmq');
// Order event producer
async function orderEventProducer() {
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'order-producer-group',
instanceName: 'order-event-producer',
});
await producer.start();
console.log('Order event producer started');
// Simulate order events for multiple users
const orders = [
// User 1 - Order lifecycle (should be processed in order)
{ userId: 'user001', orderId: 'ORD001', event: 'order.created', amount: 99.99 },
{ userId: 'user001', orderId: 'ORD001', event: 'payment.processed', amount: 99.99 },
{ userId: 'user001', orderId: 'ORD001', event: 'order.confirmed', amount: 99.99 },
{ userId: 'user001', orderId: 'ORD001', event: 'order.shipped', amount: 99.99 },
{ userId: 'user001', orderId: 'ORD001', event: 'order.delivered', amount: 99.99 },
// User 2 - Order lifecycle (should be processed in order)
{ userId: 'user002', orderId: 'ORD002', event: 'order.created', amount: 149.99 },
{ userId: 'user002', orderId: 'ORD002', event: 'payment.processed', amount: 149.99 },
{ userId: 'user002', orderId: 'ORD002', event: 'order.confirmed', amount: 149.99 },
// User 3 - Order lifecycle (should be processed in order)
{ userId: 'user003', orderId: 'ORD003', event: 'order.created', amount: 79.99 },
{ userId: 'user003', orderId: 'ORD003', event: 'payment.failed', amount: 79.99 },
{ userId: 'user003', orderId: 'ORD003', event: 'order.cancelled', amount: 79.99 },
// More events for user 1 and 2
{ userId: 'user001', orderId: 'ORD004', event: 'order.created', amount: 199.99 },
{ userId: 'user002', orderId: 'ORD005', event: 'order.created', amount: 59.99 },
];
console.log('Sending order events...');
for (let i = 0; i < orders.length; i++) {
const order = orders[i];
const message = {
topic: 'order-events',
tag: order.event,
body: Buffer.from(JSON.stringify({
...order,
timestamp: new Date().toISOString(),
sequence: i + 1,
})),
// Use userId as sharding key to ensure order for each user
shardingKey: order.userId,
keys: [order.orderId, order.userId],
properties: {
eventType: order.event,
userId: order.userId,
orderId: order.orderId,
},
};
const result = await producer.send(message);
console.log(`[${order.userId}] ${order.event}: ${order.orderId} (Message ID: ${result.messageId})`);
// Small delay to simulate real-world scenario
await new Promise(resolve => setTimeout(resolve, 100));
}
await producer.shutdown();
console.log('Order event producer stopped');
}
// Order event consumer with ordered processing
async function orderEventConsumer(consumerId) {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: `order-consumer-group-${consumerId}`,
topic: 'order-events',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: `order-consumer-${consumerId}`,
// Enable ordered message consumption
orderly: true,
});
await consumer.start();
console.log(`Order event consumer ${consumerId} started`);
// Track order states for each user
const orderStates = new Map();
consumer.subscribe((msg, ack) => {
try {
const orderEvent = JSON.parse(msg.body.toString());
console.log(`[CONSUMER ${consumerId}] Processing: ${orderEvent.userId} -> ${orderEvent.event}`);
// Validate order sequence for each user
const userState = orderStates.get(orderEvent.userId) || { lastEvent: null, expectedEvent: 'order.created' };
// Simple order state validation
const validTransitions = {
'order.created': ['payment.processed', 'payment.failed'],
'payment.processed': ['order.confirmed'],
'payment.failed': ['order.cancelled'],
'order.confirmed': ['order.shipped'],
'order.shipped': ['order.delivered'],
'order.cancelled': ['order.created'], // Can create new order after cancellation
'order.delivered': ['order.created'], // Can create new order after delivery
};
if (userState.lastEvent) {
const allowedNextEvents = validTransitions[userState.lastEvent] || [];
if (!allowedNextEvents.includes(orderEvent.event)) {
console.log(`[CONSUMER ${consumerId}] WARNING: Invalid order transition for ${orderEvent.userId}: ${userState.lastEvent} -> ${orderEvent.event}`);
}
}
// Update order state
orderStates.set(orderEvent.userId, {
lastEvent: orderEvent.event,
lastOrderId: orderEvent.orderId,
});
console.log(`[CONSUMER ${consumerId}] Processed: ${orderEvent.userId} -> ${orderEvent.event} -> ${orderEvent.orderId}`);
console.log(`[CONSUMER ${consumerId}] Current state for ${orderEvent.userId}: ${orderStates.get(orderEvent.userId).lastEvent}`);
console.log('---');
// Acknowledge message
ack();
} catch (err) {
console.error(`[CONSUMER ${consumerId}] Error processing message:`, err);
ack(); // Acknowledge even on error to avoid blocking the queue
}
});
return consumer;
}
// Demo ordered vs unordered messages
async function demoOrderingComparison() {
console.log('\n=== Ordered vs Unordered Comparison ===');
// Unordered producer (random order)
const unorderedProducer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'unordered-producer-group',
instanceName: 'unordered-producer',
});
await unorderedProducer.start();
console.log('Sending unordered messages...');
const unorderedMessages = ['A', 'B', 'C', 'D', 'E'];
for (const msg of unorderedMessages) {
await unorderedProducer.send({
topic: 'unordered-test',
body: Buffer.from(`Unordered message: ${msg}`),
});
console.log(`Sent unordered: ${msg}`);
}
await unorderedProducer.shutdown();
}
// Main function
async function main() {
console.log('=== RocketMQ Ordered Messages Example ===');
try {
// Step 1: Start multiple consumers to show load balancing
console.log('\n1. Starting consumers...');
const consumers = [];
for (let i = 1; i <= 3; i++) {
consumers.push(await orderEventConsumer(i));
}
// Step 2: Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Step 3: Start producer to send ordered messages
console.log('\n2. Sending ordered messages...');
const producerPromise = orderEventProducer();
// Step 4: Wait for message processing
await producerPromise;
console.log('\n3. Waiting for message processing...');
await new Promise(resolve => setTimeout(resolve, 8000));
// Step 5: Demo comparison
await demoOrderingComparison();
// Step 6: Shutdown
console.log('\n4. Shutting down consumers...');
for (const consumer of consumers) {
await consumer.shutdown();
}
console.log('Ordered messages example completed!');
} catch (err) {
console.error('Error in ordered messages example:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Messages Retardés RocketMQ javascript
Implémenter la livraison de messages retardés et programmés en utilisant la fonction de retard intégrée de RocketMQ
// RocketMQ Delayed Messages Example
// Demonstrates various types of delayed and scheduled message delivery
const { Producer, Consumer, MessageModel } = require('apache-rocketmq');
// Delay levels in RocketMQ (seconds)
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
const DELAY_LEVELS = {
'1s': 1,
'5s': 2,
'10s': 3,
'30s': 4,
'1m': 5,
'2m': 6,
'3m': 7,
'4m': 8,
'5m': 9,
'6m': 10,
'7m': 11,
'8m': 12,
'9m': 13,
'10m': 14,
'20m': 15,
'30m': 16,
'1h': 17,
'2h': 18,
};
// Reminder service producer
async function reminderProducer() {
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'reminder-producer-group',
instanceName: 'reminder-producer',
});
await producer.start();
console.log('Reminder producer started');
// Different types of reminders with various delays
const reminders = [
{
id: 'rem001',
type: 'meeting',
message: 'Team standup meeting starts now!',
delay: '10s',
recipient: '[email protected]',
},
{
id: 'rem002',
type: 'deadline',
message: 'Project submission deadline approaching!',
delay: '1m',
recipient: '[email protected]',
},
{
id: 'rem003',
type: 'birthday',
message: 'Happy birthday! Don't forget to wish them well!',
delay: '2m',
recipient: '[email protected]',
},
{
id: 'rem004',
type: 'maintenance',
message: 'System maintenance scheduled for tonight',
delay: '5m',
recipient: '[email protected]',
},
{
id: 'rem005',
type: 'follow-up',
message: 'Follow up with client about proposal',
delay: '30s',
recipient: '[email protected]',
},
];
console.log('Sending delayed reminders...');
const sendTime = new Date();
for (const reminder of reminders) {
const delayLevel = DELAY_LEVELS[reminder.delay];
if (!delayLevel) {
console.log(`Warning: Unknown delay level '${reminder.delay}' for reminder ${reminder.id}`);
continue;
}
const message = {
topic: 'reminder-topic',
tag: reminder.type,
body: Buffer.from(JSON.stringify({
...reminder,
scheduledAt: sendTime.toISOString(),
delayLevel: delayLevel,
expectedAt: new Date(sendTime.getTime() + (delayLevel * 1000)).toISOString(),
})),
keys: [reminder.id],
properties: {
reminderId: reminder.id,
reminderType: reminder.type,
delay: reminder.delay,
delayLevel: delayLevel.toString(),
scheduledAt: sendTime.toISOString(),
},
// Set delay level
delayTimeLevel: delayLevel,
};
const result = await producer.send(message);
console.log(`[${reminder.delay.padEnd(3)}] ${reminder.type.padEnd(12)} -> ${reminder.recipient.padEnd(25)} (ID: ${reminder.id})`);
console.log(` Expected at: ${new Date(sendTime.getTime() + (delayLevel * 1000)).toLocaleTimeString()}`);
console.log(` Message ID: ${result.messageId}`);
console.log('---');
}
await producer.shutdown();
console.log('Reminder producer stopped');
}
// Notification service for handling reminders
async function reminderConsumer() {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'reminder-consumer-group',
topic: 'reminder-topic',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: 'reminder-consumer',
});
await consumer.start();
console.log('Reminder consumer started');
consumer.subscribe((msg, ack) => {
try {
const reminder = JSON.parse(msg.body.toString());
const receivedTime = new Date();
const scheduledTime = new Date(reminder.scheduledAt);
const expectedTime = new Date(reminder.expectedAt);
const actualDelay = (receivedTime.getTime() - scheduledTime.getTime()) / 1000;
const expectedDelay = (expectedTime.getTime() - scheduledTime.getTime()) / 1000;
console.log('\n🔔 REMINDER RECEIVED!');
console.log(` ID: ${reminder.id}`);
console.log(` Type: ${reminder.type}`);
console.log(` Recipient: ${reminder.recipient}`);
console.log(` Message: ${reminder.message}`);
console.log(` Scheduled: ${scheduledTime.toLocaleTimeString()}`);
console.log(` Expected: ${expectedTime.toLocaleTimeString()}`);
console.log(` Received: ${receivedTime.toLocaleTimeString()}`);
console.log(` Delay: ${actualDelay.toFixed(2)}s (expected: ${expectedDelay}s)`);
console.log(` Accuracy: ${Math.abs(actualDelay - expectedDelay) < 1 ? '✓ Good' : '⚠ Delayed'}`);
console.log('---');
// Here you would send actual notification (email, SMS, push notification, etc.)
// For demo purposes, we just log it
ack();
} catch (err) {
console.error('Error processing reminder:', err);
ack();
}
});
return consumer;
}
// Order cancellation with grace period
async function orderCancellationDemo() {
console.log('\n=== Order Cancellation with Grace Period ===');
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'order-cancellation-producer',
instanceName: 'order-cancellation-producer',
});
await producer.start();
// Create an order
const order = {
orderId: 'ORD-CANCEL-001',
userId: 'user-cancel-demo',
amount: 299.99,
items: ['Premium Widget', 'Extended Warranty'],
createdAt: new Date().toISOString(),
};
console.log(`Creating order: ${order.orderId}`);
// Send order created message
await producer.send({
topic: 'order-lifecycle',
tag: 'order.created',
body: Buffer.from(JSON.stringify(order)),
keys: [order.orderId],
});
// Send delayed cancellation check
const cancellationCheck = {
orderId: order.orderId,
userId: order.userId,
type: 'auto-cancel-check',
scheduledAt: new Date().toISOString(),
gracePeriod: '30s',
};
await producer.send({
topic: 'order-lifecycle',
tag: 'cancellation.check',
body: Buffer.from(JSON.stringify(cancellationCheck)),
keys: [order.orderId, 'cancel-check'],
delayTimeLevel: DELAY_LEVELS['30s'], // Check after 30 seconds
});
console.log(`Scheduled cancellation check for ${order.orderId} in 30s`);
await producer.shutdown();
}
// Lifecycle consumer for orders
async function orderLifecycleConsumer() {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'order-lifecycle-consumer',
topic: 'order-lifecycle',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: 'order-lifecycle-consumer',
});
await consumer.start();
// Track order states
const orderStates = new Map();
consumer.subscribe((msg, ack) => {
try {
if (msg.tag === 'cancellation.check') {
const check = JSON.parse(msg.body.toString());
console.log(
⏰ CANCELLATION CHECK FOR ORDER: ${check.orderId}`);
// In a real system, you would check if payment was received
// For demo, we'll randomly decide to cancel or not
const shouldCancel = Math.random() > 0.5;
if (shouldCancel) {
console.log(` → Cancelling unpaid order ${check.orderId}`);
// Here you would send a cancellation message
} else {
console.log(` → Order ${check.orderId} is confirmed`);
// Here you might schedule fulfillment
}
} else {
const order = JSON.parse(msg.body.toString());
console.log(`📦 Order Event: ${msg.tag} -> ${order.orderId}`);
orderStates.set(order.orderId, msg.tag);
}
ack();
} catch (err) {
console.error('Error in lifecycle consumer:', err);
ack();
}
});
return consumer;
}
// Main function
async function main() {
console.log('=== RocketMQ Delayed Messages Example ===');
try {
// Step 1: Start consumers
console.log('\n1. Starting consumers...');
const reminderConsumerPromise = reminderConsumer();
const orderLifecyclePromise = orderLifecycleConsumer();
// Step 2: Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Step 3: Send reminders
console.log('\n2. Sending delayed reminders...');
await reminderProducer();
// Step 4: Demo order cancellation
await new Promise(resolve => setTimeout(resolve, 1000));
await orderCancellationDemo();
// Step 5: Wait for all delayed messages to be processed
console.log('\n3. Waiting for delayed messages to be delivered...');
await new Promise(resolve => setTimeout(resolve, 35000)); // Wait for longest delay (2m + buffer)
// Step 6: Shutdown consumers
console.log('\n4. Shutting down consumers...');
const reminderConsumer = await reminderConsumerPromise;
const orderConsumer = await orderLifecyclePromise;
await reminderConsumer.shutdown();
await orderConsumer.shutdown();
console.log('Delayed messages example completed!');
} catch (err) {
console.error('Error in delayed messages example:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Consommation de Diffusion RocketMQ javascript
Implémenter un modèle de consommation de diffusion où tous les consommateurs reçoivent le même message
// RocketMQ Broadcast Consumption Example
// Demonstrates broadcasting messages to all consumers in a consumer group
const { Producer, Consumer, MessageModel } = require('apache-rocketmq');
// Configuration source producer
async function configurationProducer() {
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'config-producer-group',
instanceName: 'config-producer',
});
await producer.start();
console.log('Configuration producer started');
// Configuration updates to broadcast
const configUpdates = [
{
id: 'cfg001',
type: 'feature_flag',
key: 'new_dashboard_enabled',
value: 'true',
version: '1.0.0',
description: 'Enable new dashboard UI for all users',
},
{
id: 'cfg002',
type: 'maintenance',
key: 'system_maintenance',
value: JSON.stringify({
scheduled: '2025-12-15T02:00:00Z',
duration: 2, // hours
affectedServices: ['api', 'web', 'mobile'],
}),
version: '1.0.0',
description: 'Scheduled system maintenance window',
},
{
id: 'cfg003',
type: 'rate_limit',
key: 'api_rate_limit',
value: '1000',
version: '1.1.0',
description: 'Update API rate limit to 1000 requests per minute',
},
{
id: 'cfg004',
type: 'security',
key: 'password_policy',
value: JSON.stringify({
minLength: 8,
requireUppercase: true,
requireLowercase: true,
requireNumbers: true,
requireSpecialChars: true,
}),
version: '2.0.0',
description: 'Enhanced password security requirements',
},
{
id: 'cfg005',
type: 'feature_flag',
key: 'beta_features_enabled',
value: 'false',
version: '1.0.1',
description: 'Disable beta features temporarily',
},
];
console.log('Broadcasting configuration updates...');
for (const config of configUpdates) {
const message = {
topic: 'configuration-updates',
tag: config.type,
body: Buffer.from(JSON.stringify({
...config,
timestamp: new Date().toISOString(),
publisher: 'config-service',
})),
keys: [config.id, config.key],
properties: {
configId: config.id,
configType: config.type,
configVersion: config.version,
priority: config.type === 'security' ? 'high' : 'normal',
},
};
const result = await producer.send(message);
console.log(`📡 Broadcast: ${config.type.padEnd(15)} ${config.key.padEnd(25)} -> v${config.version}`);
console.log(` ${config.description}`);
console.log(` Message ID: ${result.messageId}`);
console.log('---');
}
await producer.shutdown();
console.log('Configuration producer stopped');
}
// Service instance that receives configuration broadcasts
class ServiceInstance {
constructor(name, instanceId) {
this.name = name;
this.instanceId = instanceId;
this.config = new Map();
this.startTime = new Date();
}
async startConsumer() {
this.consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'config-consumer-group', // Same group for broadcast
topic: 'configuration-updates',
tag: '*',
messageModel: MessageModel.BROADCASTING, // Key for broadcast consumption
instanceName: `${this.name}-${this.instanceId}`,
});
await this.consumer.start();
console.log(`[${this.name}-${this.instanceId}] Consumer started`);
this.consumer.subscribe((msg, ack) => {
this.handleConfigurationUpdate(msg, ack);
});
return this.consumer;
}
handleConfigurationUpdate(msg, ack) {
try {
const configUpdate = JSON.parse(msg.body.toString());
const receivedTime = new Date();
console.log(`\n📥 [${this.name}-${this.instanceId}] Configuration Update Received`);
console.log(` Config ID: ${configUpdate.id}`);
console.log(` Type: ${configUpdate.type}`);
console.log(` Key: ${configUpdate.key}`);
console.log(` New Value: ${configUpdate.value}`);
console.log(` Version: ${configUpdate.version}`);
console.log(` Description: ${configUpdate.description}`);
console.log(` Published: ${new Date(configUpdate.timestamp).toLocaleTimeString()}`);
console.log(` Received: ${receivedTime.toLocaleTimeString()}`);
console.log(` Latency: ${(receivedTime.getTime() - new Date(configUpdate.timestamp).getTime())}ms`);
// Apply configuration update
const previousValue = this.config.get(configUpdate.key);
this.config.set(configUpdate.key, {
value: configUpdate.value,
version: configUpdate.version,
updatedAt: configUpdate.timestamp,
updatedAt: receivedTime.toISOString(),
});
console.log(` Previous value: ${previousValue ? previousValue.value : 'NOT_SET'}`);
console.log(` Status: ${this.applyConfiguration(configUpdate)}`);
// Show current configuration count
console.log(` Total configs: ${this.config.size}`);
console.log('---');
ack();
} catch (err) {
console.error(`[${this.name}-${this.instanceId}] Error handling config update:`, err);
ack();
}
}
applyConfiguration(configUpdate) {
try {
switch (configUpdate.type) {
case 'feature_flag':
const flagEnabled = configUpdate.value === 'true';
console.log(` → Feature flag '${configUpdate.key}' ${flagEnabled ? 'ENABLED' : 'DISABLED'}`);
this.updateFeatureFlag(configUpdate.key, flagEnabled);
break;
case 'rate_limit':
console.log(` → Rate limit updated to ${configUpdate.value} requests/min`);
this.updateRateLimit(parseInt(configUpdate.value));
break;
case 'security':
console.log(` → Security policy updated for '${configUpdate.key}'`);
this.updateSecurityPolicy(configUpdate.key, JSON.parse(configUpdate.value));
break;
case 'maintenance':
const maintenance = JSON.parse(configUpdate.value);
console.log(` → Maintenance scheduled: ${maintenance.scheduled}`);
this.scheduleMaintenance(maintenance);
break;
default:
console.log(` → Generic configuration applied`);
}
return 'SUCCESS';
} catch (err) {
console.error(` → Failed to apply configuration: ${err.message}`);
return 'ERROR';
}
}
// Simulate applying configurations
updateFeatureFlag(key, enabled) {
// In real service, this would toggle features
console.log(` [${this.name}] Feature '${key}' is now ${enabled ? 'ACTIVE' : 'INACTIVE'}`);
}
updateRateLimit(limit) {
// In real service, this would update rate limiting
console.log(` [${this.name}] Rate limit set to ${limit}/min`);
}
updateSecurityPolicy(key, policy) {
// In real service, this would update security policies
console.log(` [${this.name}] Security policy '${key}' updated`);
}
scheduleMaintenance(maintenance) {
// In real service, this would prepare for maintenance
console.log(` [${this.name}] Preparing for maintenance window`);
}
async shutdown() {
if (this.consumer) {
await this.consumer.shutdown();
console.log(`[${this.name}-${this.instanceId}] Consumer stopped`);
}
}
printStatus() {
console.log(`\n📊 [${this.name}-${this.instanceId}] Status:`);
console.log(` Uptime: ${Math.round((new Date() - this.startTime) / 1000)}s`);
console.log(` Configurations: ${this.config.size}`);
if (this.config.size > 0) {
console.log(' Active configs:');
for (const [key, config] of this.config.entries()) {
console.log(` - ${key}: v${config.version}`);
}
}
}
}
// Service health monitor
async function serviceHealthMonitor(services) {
console.log('\n=== Service Health Monitor ===');
const monitorInterval = setInterval(() => {
console.log('\n' + '='.repeat(50));
console.log(`Health Check - ${new Date().toLocaleTimeString()}`);
console.log('='.repeat(50));
services.forEach(service => {
service.printStatus();
});
}, 8000);
// Stop monitoring after 30 seconds
setTimeout(() => {
clearInterval(monitorInterval);
console.log('\nHealth monitoring stopped');
}, 30000);
}
// Demo cluster consumption vs broadcast consumption
async function demoConsumptionPatterns() {
console.log('\n=== Consumption Patterns Demo ===');
// Cluster consumer (load balancing)
const clusterConsumer1 = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'cluster-consumer-group',
topic: 'pattern-demo',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: 'cluster-consumer-1',
});
const clusterConsumer2 = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'cluster-consumer-group',
topic: 'pattern-demo',
tag: '*',
messageModel: MessageModel.CLUSTERING,
instanceName: 'cluster-consumer-2',
});
// Broadcast consumers (all get all messages)
const broadcastConsumer1 = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'broadcast-consumer-group',
topic: 'pattern-demo',
tag: '*',
messageModel: MessageModel.BROADCASTING,
instanceName: 'broadcast-consumer-1',
});
const broadcastConsumer2 = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'broadcast-consumer-group',
topic: 'pattern-demo',
tag: '*',
messageModel: MessageModel.BROADCASTING,
instanceName: 'broadcast-consumer-2',
});
await clusterConsumer1.start();
await clusterConsumer2.start();
await broadcastConsumer1.start();
await broadcastConsumer2.start();
// Set up message handlers
let clusterMessageCount = 0;
let broadcast1MessageCount = 0;
let broadcast2MessageCount = 0;
clusterConsumer1.subscribe((msg, ack) => {
clusterMessageCount++;
console.log(`[CLUSTER-1] Got message ${clusterMessageCount}: ${msg.body.toString()}`);
ack();
});
clusterConsumer2.subscribe((msg, ack) => {
console.log(`[CLUSTER-2] Got message: ${msg.body.toString()}`);
ack();
});
broadcastConsumer1.subscribe((msg, ack) => {
broadcast1MessageCount++;
console.log(`[BROADCAST-1] Got message ${broadcast1MessageCount}: ${msg.body.toString()}`);
ack();
});
broadcastConsumer2.subscribe((msg, ack) => {
broadcast2MessageCount++;
console.log(`[BROADCAST-2] Got message ${broadcast2MessageCount}: ${msg.body.toString()}`);
ack();
});
// Producer for demo messages
const producer = new Producer({
endpoints: 'localhost:9876',
producerGroup: 'pattern-demo-producer',
});
await producer.start();
// Send messages
for (let i = 1; i <= 5; i++) {
await producer.send({
topic: 'pattern-demo',
body: Buffer.from(`Demo message ${i}`),
});
await new Promise(resolve => setTimeout(resolve, 500));
}
// Wait for processing
await new Promise(resolve => setTimeout(resolve, 3000));
console.log('\n📈 Consumption Pattern Results:');
console.log(`Cluster consumers: ${clusterMessageCount} total messages (load balanced)`);
console.log(`Broadcast consumer 1: ${broadcast1MessageCount} messages (all received)`);
console.log(`Broadcast consumer 2: ${broadcast2MessageCount} messages (all received)`);
// Cleanup
await producer.shutdown();
await clusterConsumer1.shutdown();
await clusterConsumer2.shutdown();
await broadcastConsumer1.shutdown();
await broadcastConsumer2.shutdown();
}
// Main function
async function main() {
console.log('=== RocketMQ Broadcast Consumption Example ===');
try {
// Step 1: Create multiple service instances
console.log('\n1. Creating service instances...');
const services = [
new ServiceInstance('api-service', 'instance-1'),
new ServiceInstance('api-service', 'instance-2'),
new ServiceInstance('api-service', 'instance-3'),
new ServiceInstance('web-service', 'instance-1'),
new ServiceInstance('web-service', 'instance-2'),
];
// Step 2: Start consumers for all services
console.log('\n2. Starting broadcast consumers...');
const consumerPromises = services.map(service => service.startConsumer());
await Promise.all(consumerPromises);
// Step 3: Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Step 4: Start health monitor
console.log('\n3. Starting service health monitor...');
serviceHealthMonitor(services);
// Step 5: Send configuration updates
console.log('\n4. Broadcasting configuration updates...');
await configurationProducer();
// Step 6: Demo consumption patterns
await new Promise(resolve => setTimeout(resolve, 3000));
await demoConsumptionPatterns();
// Step 7: Wait for all processing to complete
console.log('\n5. Waiting for processing to complete...');
await new Promise(resolve => setTimeout(resolve, 35000));
// Step 8: Shutdown all services
console.log('\n6. Shutting down all services...');
const shutdownPromises = services.map(service => service.shutdown());
await Promise.all(shutdownPromises);
console.log('\nBroadcast consumption example completed!');
console.log('\nKey Points:');
console.log('- All instances in the same consumer group received every message');
console.log('- Each service instance applied configurations independently');
console.log('- Useful for configuration updates, cache invalidation, and notifications');
console.log('- Contrast with clustering which load balances messages');
} catch (err) {
console.error('Error in broadcast consumption example:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Messages Transactionnels RocketMQ javascript
Implémenter des transactions distribuées en utilisant la fonction de messages transactionnels de RocketMQ pour la cohérence éventuelle
// RocketMQ Transactional Messages Example
// Demonstrates distributed transactions with local and remote operations
const { Producer, Consumer, MessageModel, TransactionMQProducer } = require('apache-rocketmq');
// Simulated database for orders
class OrderDatabase {
constructor() {
this.orders = new Map();
this.payments = new Map();
this.inventories = new Map();
}
async createOrder(order) {
console.log(`[DB] Creating order: ${order.orderId}`);
// Simulate database operation
await new Promise(resolve => setTimeout(resolve, 100));
this.orders.set(order.orderId, { ...order, status: 'created' });
return true;
}
async processPayment(payment) {
console.log(`[DB] Processing payment: ${payment.paymentId}`);
// Simulate payment processing
await new Promise(resolve => setTimeout(resolve, 200));
// Simulate occasional payment failures
if (Math.random() < 0.2) {
console.log(`[DB] Payment failed: ${payment.paymentId}`);
return false;
}
this.payments.set(payment.paymentId, { ...payment, status: 'completed' });
return true;
}
async updateInventory(inventoryUpdate) {
console.log(`[DB] Updating inventory for: ${inventoryUpdate.productId}`);
// Simulate inventory update
await new Promise(resolve => setTimeout(resolve, 50));
this.inventories.set(inventoryUpdate.productId, inventoryUpdate);
return true;
}
async rollbackOrder(orderId) {
console.log(`[DB] Rolling back order: ${orderId}`);
const order = this.orders.get(orderId);
if (order) {
order.status = 'cancelled';
this.orders.set(orderId, order);
}
}
getOrderStatus(orderId) {
const order = this.orders.get(orderId);
return order ? order.status : 'not_found';
}
}
// Transactional producer for order processing
async function createTransactionalProducer() {
const producer = new TransactionMQProducer({
endpoints: 'localhost:9876',
producerGroup: 'order-transaction-producer',
instanceName: 'order-transaction-producer',
// Transaction check listener
checkListener: async (msg) => {
try {
const transactionId = msg.properties.transactionId;
const orderData = JSON.parse(msg.body.toString());
console.log(`[TRANSACTION CHECK] Checking transaction ${transactionId} for order ${orderData.orderId}`);
// Check local transaction status
// In real application, you would query your database
const orderStatus = global.orderDB.getOrderStatus(orderData.orderId);
console.log(`[TRANSACTION CHECK] Order ${orderData.orderId} status: ${orderStatus}`);
// Return transaction status
if (orderStatus === 'created' || orderStatus === 'processing') {
return { state: 'COMMIT' };
} else if (orderStatus === 'cancelled') {
return { state: 'ROLLBACK' };
} else {
return { state: 'UNKNOWN' };
}
} catch (err) {
console.error('Transaction check error:', err);
return { state: 'UNKNOWN' };
}
},
});
await producer.start();
console.log('Transactional producer started');
return producer;
}
// Order service with transactional messaging
class OrderService {
constructor(producer, database) {
this.producer = producer;
this.db = database;
}
async processOrder(orderRequest) {
const transactionId = `tx_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
console.log(`\n🔄 Starting transaction ${transactionId} for order ${orderRequest.orderId}`);
// Step 1: Send half message (prepared message)
const halfMessage = {
topic: 'order-events',
tag: 'order.created',
body: Buffer.from(JSON.stringify({
...orderRequest,
transactionId: transactionId,
timestamp: new Date().toISOString(),
})),
keys: [orderRequest.orderId, transactionId],
properties: {
transactionId: transactionId,
orderId: orderRequest.orderId,
},
};
console.log(`[1/4] Sending half message for order ${orderRequest.orderId}`);
const sendResult = await this.producer.sendTransactionMessage(halfMessage);
try {
// Step 2: Execute local transaction
console.log(`[2/4] Executing local transaction for order ${orderRequest.orderId}`);
const localTxSuccess = await this.executeLocalTransaction(orderRequest, transactionId);
if (localTxSuccess) {
// Step 3: Commit transaction
console.log(`[3/4] Committing transaction ${transactionId}`);
await this.producer.commitTransaction(sendResult.transactionId);
console.log(`[4/4] ✅ Transaction ${transactionId} COMMITTED for order ${orderRequest.orderId}`);
return { success: true, transactionId };
} else {
// Step 4: Rollback transaction
console.log(`[3/4] Rolling back transaction ${transactionId}`);
await this.producer.rollbackTransaction(sendResult.transactionId);
console.log(`[4/4] ❌ Transaction ${transactionId} ROLLED BACK for order ${orderRequest.orderId}`);
return { success: false, transactionId };
}
} catch (err) {
console.error(`Local transaction failed for ${transactionId}:`, err);
console.log(`[3/4] Rolling back transaction ${transactionId} due to error`);
await this.producer.rollbackTransaction(sendResult.transactionId);
throw err;
}
}
async executeLocalTransaction(orderRequest, transactionId) {
try {
// Create order in local database
const orderSuccess = await this.db.createOrder({
...orderRequest,
status: 'processing',
transactionId: transactionId,
});
if (!orderSuccess) {
await this.db.rollbackOrder(orderRequest.orderId);
return false;
}
// Process payment
const paymentSuccess = await this.db.processPayment({
paymentId: `pay_${orderRequest.orderId}`,
orderId: orderRequest.orderId,
amount: orderRequest.amount,
transactionId: transactionId,
});
if (!paymentSuccess) {
await this.db.rollbackOrder(orderRequest.orderId);
return false;
}
// Update inventory
for (const item of orderRequest.items) {
const inventorySuccess = await this.db.updateInventory({
productId: item.productId,
quantity: -item.quantity, // Decrease inventory
orderId: orderRequest.orderId,
transactionId: transactionId,
});
if (!inventorySuccess) {
await this.db.rollbackOrder(orderRequest.orderId);
return false;
}
}
// Mark order as completed
const order = this.db.orders.get(orderRequest.orderId);
order.status = 'completed';
this.db.orders.set(orderRequest.orderId, order);
console.log(`[LOCAL TX] All operations completed for order ${orderRequest.orderId}`);
return true;
} catch (err) {
console.error(`[LOCAL TX] Error executing transaction for order ${orderRequest.orderId}:`, err);
await this.db.rollbackOrder(orderRequest.orderId);
return false;
}
}
}
// Order event consumer
async function orderEventConsumer() {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'order-event-consumer',
topic: 'order-events',
tag: '*',
messageModel: MessageModel.BROADCASTING, // All consumers get all messages
instanceName: 'order-event-consumer',
});
await consumer.start();
console.log('Order event consumer started');
consumer.subscribe((msg, ack) => {
try {
const orderEvent = JSON.parse(msg.body.toString());
const now = new Date();
console.log('\n📦 ORDER EVENT RECEIVED');
console.log(` Order ID: ${orderEvent.orderId}`);
console.log(` Transaction ID: ${orderEvent.transactionId}`);
console.log(` Amount: $${orderEvent.amount}`);
console.log(` Items: ${orderEvent.items.map(i => i.productId).join(', ')}`);
console.log(` Created At: ${new Date(orderEvent.timestamp).toLocaleTimeString()}`);
console.log(` Received At: ${now.toLocaleTimeString()}`);
console.log(` Processing Delay: ${(now.getTime() - new Date(orderEvent.timestamp).getTime())}ms`);
// Here you would:
// 1. Update search indexes
// 2. Send notifications
// 3. Update analytics
// 4. Trigger other downstream services
console.log(' → Processing downstream services...');
// Simulate downstream processing
setTimeout(() => {
console.log(` → Downstream processing completed for ${orderEvent.orderId}`);
}, 100);
console.log('---');
ack();
} catch (err) {
console.error('Error processing order event:', err);
ack();
}
});
return consumer;
}
// Payment verification service (consumer)
async function paymentVerificationConsumer() {
const consumer = new Consumer({
endpoints: 'localhost:9876',
consumerGroup: 'payment-verification-consumer',
topic: 'order-events',
tag: 'order.created',
messageModel: MessageModel.CLUSTERING,
instanceName: 'payment-verification-consumer',
});
await consumer.start();
console.log('Payment verification consumer started');
consumer.subscribe((msg, ack) => {
try {
const orderEvent = JSON.parse(msg.body.toString());
console.log(`\n💳 Payment verification for order ${orderEvent.orderId}`);
// In a real system, you might:
// 1. Verify payment with external payment gateway
// 2. Update fraud detection systems
// 3. Send payment confirmation to customer
const paymentStatus = Math.random() > 0.1 ? 'VERIFIED' : 'REQUIRES_REVIEW';
console.log(` → Payment status: ${paymentStatus}`);
if (paymentStatus === 'REQUIRES_REVIEW') {
console.log(` → Order ${orderEvent.orderId} flagged for manual review`);
}
ack();
} catch (err) {
console.error('Error in payment verification:', err);
ack();
}
});
return consumer;
}
// Main function
async function main() {
console.log('=== RocketMQ Transactional Messages Example ===');
// Initialize database
global.orderDB = new OrderDatabase();
try {
// Step 1: Create transactional producer
console.log('\n1. Setting up transactional producer...');
const producer = await createTransactionalProducer();
// Step 2: Start consumers
console.log('\n2. Starting consumers...');
const orderConsumerPromise = orderEventConsumer();
const paymentConsumerPromise = paymentVerificationConsumer();
// Step 3: Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 2000));
// Step 4: Create order service
console.log('\n3. Creating order service...');
const orderService = new OrderService(producer, global.orderDB);
// Step 5: Process multiple orders
console.log('\n4. Processing orders...');
const orders = [
{
orderId: 'TX-ORDER-001',
userId: 'user-tx-001',
amount: 299.99,
items: [
{ productId: 'LAPTOP-001', quantity: 1, price: 199.99 },
{ productId: 'MOUSE-001', quantity: 1, price: 49.99 },
{ productId: 'KEYBOARD-001', quantity: 1, price: 50.01 },
],
},
{
orderId: 'TX-ORDER-002',
userId: 'user-tx-002',
amount: 149.99,
items: [
{ productId: 'MONITOR-001', quantity: 1, price: 149.99 },
],
},
{
orderId: 'TX-ORDER-003',
userId: 'user-tx-003',
amount: 89.97,
items: [
{ productId: 'HEADPHONE-001', quantity: 2, price: 44.99 },
],
},
];
// Process orders with delays
for (let i = 0; i < orders.length; i++) {
const order = orders[i];
console.log(`\n--- Processing Order ${i + 1}/${orders.length} ---`);
const result = await orderService.processOrder(order);
if (result.success) {
console.log(`✅ Order ${order.orderId} processed successfully`);
} else {
console.log(`❌ Order ${order.orderId} processing failed`);
}
// Wait between orders
if (i < orders.length - 1) {
await new Promise(resolve => setTimeout(resolve, 3000));
}
}
// Step 6: Wait for message processing
console.log('\n5. Waiting for message processing...');
await new Promise(resolve => setTimeout(resolve, 10000));
// Step 7: Shutdown
console.log('\n6. Shutting down...');
await producer.shutdown();
const orderConsumer = await orderConsumerPromise;
const paymentConsumer = await paymentConsumerPromise;
await orderConsumer.shutdown();
await paymentConsumer.shutdown();
console.log('\nTransactional messages example completed!');
console.log('\nSummary:');
console.log('- Demonstrated distributed transactions with local DB ops and remote messaging');
console.log('- Transaction consistency maintained even with failures');
console.log('- Downstream services only receive committed transactions');
} catch (err) {
console.error('Error in transactional messages example:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});