Exemples RocketMQ Message Queue

Exemples Apache RocketMQ incluant producteurs, consommateurs, messages ordonnés, messages retardés, consommation de diffusion et messages transactionnels pour messagerie haute performance

Key Facts

Category
Messaging
Items
5
Format Families
sample

Sample Overview

Exemples Apache RocketMQ incluant producteurs, consommateurs, messages ordonnés, messages retardés, consommation de diffusion et messages transactionnels pour messagerie haute performance This sample set belongs to Messaging and can be used to test related workflows inside Elysia Tools.

💻 RocketMQ Hello World - Producteur/Consommateur de Base javascript

🟢 simple

Exemple simple de producteur et consommateur Apache RocketMQ démontrant des opérations de base de file de messages

⏱️ 10 min 🏷️ rocketmq, messaging, beginner
Prerequisites: RocketMQ server running, Node.js
// RocketMQ Hello World - Basic Producer/Consumer Example
// Install: npm install apache-rocketmq

const { Producer, Consumer, MessageModel } = require('apache-rocketmq');

async function helloWorld() {
  // RocketMQ configuration
  const config = {
    endpoints: 'localhost:9876',
    accessKey: 'rocketmq',
    secretKey: '12345678',
    instanceName: 'hello-world-example',
  };

  console.log('=== RocketMQ Hello World Example ===');

  // Create consumer
  const consumer = new Consumer({
    ...config,
    consumerGroup: 'hello-consumer-group',
    topic: 'hello-topic',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
  });

  // Create producer
  const producer = new Producer({
    ...config,
    producerGroup: 'hello-producer-group',
  });

  // Start consumer
  console.log('Starting consumer...');
  await consumer.start();

  // Set up message listener
  consumer.subscribe((msg, ack) => {
    console.log(`Received message: ${msg.body.toString()}`);
    console.log(`Message ID: ${msg.id}`);
    console.log(`Topic: ${msg.topic}`);
    console.log(`Tag: ${msg.tag}`);
    console.log('---');

    // Acknowledge message
    ack();
  });

  // Start producer
  console.log('Starting producer...');
  await producer.start();

  // Wait a moment for consumer to be ready
  await new Promise(resolve => setTimeout(resolve, 2000));

  // Send messages
  console.log('Sending messages...');
  const messages = [
    'Hello RocketMQ!',
    'Apache RocketMQ is powerful!',
    'Message queue made easy!',
    'High performance messaging!',
  ];

  for (let i = 0; i < messages.length; i++) {
    const message = {
      topic: 'hello-topic',
      tag: 'hello',
      body: Buffer.from(messages[i]),
      keys: ['key-' + (i + 1)],
      properties: {
        source: 'hello-world-example',
        timestamp: new Date().toISOString(),
      },
    };

    const result = await producer.send(message);
    console.log(`Message sent: ${messages[i]}`);
    console.log(`Message ID: ${result.messageId}`);
    console.log('---');
  }

  // Wait for messages to be consumed
  console.log('Waiting for messages to be consumed...');
  await new Promise(resolve => setTimeout(resolve, 5000));

  // Shutdown
  console.log('Shutting down...');
  await producer.shutdown();
  await consumer.shutdown();

  console.log('RocketMQ Hello World example completed!');
}

helloWorld().catch(err => {
  console.error('Error:', err);
});

💻 Messages Ordonnés RocketMQ javascript

🟡 intermediate ⭐⭐⭐

Implémenter la livraison de messages ordonnés en utilisant des clés de partitionnement avec des files de messages

⏱️ 20 min 🏷️ rocketmq, ordered, sequencing
Prerequisites: RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Ordered Messages Example
// Ensures message order within the same sharding key

const { Producer, Consumer, MessageModel } = require('apache-rocketmq');

// Order event producer
async function orderEventProducer() {
  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'order-producer-group',
    instanceName: 'order-event-producer',
  });

  await producer.start();
  console.log('Order event producer started');

  // Simulate order events for multiple users
  const orders = [
    // User 1 - Order lifecycle (should be processed in order)
    { userId: 'user001', orderId: 'ORD001', event: 'order.created', amount: 99.99 },
    { userId: 'user001', orderId: 'ORD001', event: 'payment.processed', amount: 99.99 },
    { userId: 'user001', orderId: 'ORD001', event: 'order.confirmed', amount: 99.99 },
    { userId: 'user001', orderId: 'ORD001', event: 'order.shipped', amount: 99.99 },
    { userId: 'user001', orderId: 'ORD001', event: 'order.delivered', amount: 99.99 },

    // User 2 - Order lifecycle (should be processed in order)
    { userId: 'user002', orderId: 'ORD002', event: 'order.created', amount: 149.99 },
    { userId: 'user002', orderId: 'ORD002', event: 'payment.processed', amount: 149.99 },
    { userId: 'user002', orderId: 'ORD002', event: 'order.confirmed', amount: 149.99 },

    // User 3 - Order lifecycle (should be processed in order)
    { userId: 'user003', orderId: 'ORD003', event: 'order.created', amount: 79.99 },
    { userId: 'user003', orderId: 'ORD003', event: 'payment.failed', amount: 79.99 },
    { userId: 'user003', orderId: 'ORD003', event: 'order.cancelled', amount: 79.99 },

    // More events for user 1 and 2
    { userId: 'user001', orderId: 'ORD004', event: 'order.created', amount: 199.99 },
    { userId: 'user002', orderId: 'ORD005', event: 'order.created', amount: 59.99 },
  ];

  console.log('Sending order events...');
  for (let i = 0; i < orders.length; i++) {
    const order = orders[i];

    const message = {
      topic: 'order-events',
      tag: order.event,
      body: Buffer.from(JSON.stringify({
        ...order,
        timestamp: new Date().toISOString(),
        sequence: i + 1,
      })),
      // Use userId as sharding key to ensure order for each user
      shardingKey: order.userId,
      keys: [order.orderId, order.userId],
      properties: {
        eventType: order.event,
        userId: order.userId,
        orderId: order.orderId,
      },
    };

    const result = await producer.send(message);
    console.log(`[${order.userId}] ${order.event}: ${order.orderId} (Message ID: ${result.messageId})`);

    // Small delay to simulate real-world scenario
    await new Promise(resolve => setTimeout(resolve, 100));
  }

  await producer.shutdown();
  console.log('Order event producer stopped');
}

// Order event consumer with ordered processing
async function orderEventConsumer(consumerId) {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: `order-consumer-group-${consumerId}`,
    topic: 'order-events',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: `order-consumer-${consumerId}`,
    // Enable ordered message consumption
    orderly: true,
  });

  await consumer.start();
  console.log(`Order event consumer ${consumerId} started`);

  // Track order states for each user
  const orderStates = new Map();

  consumer.subscribe((msg, ack) => {
    try {
      const orderEvent = JSON.parse(msg.body.toString());

      console.log(`[CONSUMER ${consumerId}] Processing: ${orderEvent.userId} -> ${orderEvent.event}`);

      // Validate order sequence for each user
      const userState = orderStates.get(orderEvent.userId) || { lastEvent: null, expectedEvent: 'order.created' };

      // Simple order state validation
      const validTransitions = {
        'order.created': ['payment.processed', 'payment.failed'],
        'payment.processed': ['order.confirmed'],
        'payment.failed': ['order.cancelled'],
        'order.confirmed': ['order.shipped'],
        'order.shipped': ['order.delivered'],
        'order.cancelled': ['order.created'], // Can create new order after cancellation
        'order.delivered': ['order.created'], // Can create new order after delivery
      };

      if (userState.lastEvent) {
        const allowedNextEvents = validTransitions[userState.lastEvent] || [];
        if (!allowedNextEvents.includes(orderEvent.event)) {
          console.log(`[CONSUMER ${consumerId}] WARNING: Invalid order transition for ${orderEvent.userId}: ${userState.lastEvent} -> ${orderEvent.event}`);
        }
      }

      // Update order state
      orderStates.set(orderEvent.userId, {
        lastEvent: orderEvent.event,
        lastOrderId: orderEvent.orderId,
      });

      console.log(`[CONSUMER ${consumerId}] Processed: ${orderEvent.userId} -> ${orderEvent.event} -> ${orderEvent.orderId}`);
      console.log(`[CONSUMER ${consumerId}] Current state for ${orderEvent.userId}: ${orderStates.get(orderEvent.userId).lastEvent}`);
      console.log('---');

      // Acknowledge message
      ack();
    } catch (err) {
      console.error(`[CONSUMER ${consumerId}] Error processing message:`, err);
      ack(); // Acknowledge even on error to avoid blocking the queue
    }
  });

  return consumer;
}

// Demo ordered vs unordered messages
async function demoOrderingComparison() {
  console.log('\n=== Ordered vs Unordered Comparison ===');

  // Unordered producer (random order)
  const unorderedProducer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'unordered-producer-group',
    instanceName: 'unordered-producer',
  });

  await unorderedProducer.start();

  console.log('Sending unordered messages...');
  const unorderedMessages = ['A', 'B', 'C', 'D', 'E'];

  for (const msg of unorderedMessages) {
    await unorderedProducer.send({
      topic: 'unordered-test',
      body: Buffer.from(`Unordered message: ${msg}`),
    });
    console.log(`Sent unordered: ${msg}`);
  }

  await unorderedProducer.shutdown();
}

// Main function
async function main() {
  console.log('=== RocketMQ Ordered Messages Example ===');

  try {
    // Step 1: Start multiple consumers to show load balancing
    console.log('\n1. Starting consumers...');
    const consumers = [];
    for (let i = 1; i <= 3; i++) {
      consumers.push(await orderEventConsumer(i));
    }

    // Step 2: Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Step 3: Start producer to send ordered messages
    console.log('\n2. Sending ordered messages...');
    const producerPromise = orderEventProducer();

    // Step 4: Wait for message processing
    await producerPromise;
    console.log('\n3. Waiting for message processing...');
    await new Promise(resolve => setTimeout(resolve, 8000));

    // Step 5: Demo comparison
    await demoOrderingComparison();

    // Step 6: Shutdown
    console.log('\n4. Shutting down consumers...');
    for (const consumer of consumers) {
      await consumer.shutdown();
    }

    console.log('Ordered messages example completed!');
  } catch (err) {
    console.error('Error in ordered messages example:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Messages Retardés RocketMQ javascript

🟡 intermediate ⭐⭐⭐

Implémenter la livraison de messages retardés et programmés en utilisant la fonction de retard intégrée de RocketMQ

⏱️ 25 min 🏷️ rocketmq, delayed, scheduled, timing
Prerequisites: RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Delayed Messages Example
// Demonstrates various types of delayed and scheduled message delivery

const { Producer, Consumer, MessageModel } = require('apache-rocketmq');

// Delay levels in RocketMQ (seconds)
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
const DELAY_LEVELS = {
  '1s': 1,
  '5s': 2,
  '10s': 3,
  '30s': 4,
  '1m': 5,
  '2m': 6,
  '3m': 7,
  '4m': 8,
  '5m': 9,
  '6m': 10,
  '7m': 11,
  '8m': 12,
  '9m': 13,
  '10m': 14,
  '20m': 15,
  '30m': 16,
  '1h': 17,
  '2h': 18,
};

// Reminder service producer
async function reminderProducer() {
  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'reminder-producer-group',
    instanceName: 'reminder-producer',
  });

  await producer.start();
  console.log('Reminder producer started');

  // Different types of reminders with various delays
  const reminders = [
    {
      id: 'rem001',
      type: 'meeting',
      message: 'Team standup meeting starts now!',
      delay: '10s',
      recipient: '[email protected]',
    },
    {
      id: 'rem002',
      type: 'deadline',
      message: 'Project submission deadline approaching!',
      delay: '1m',
      recipient: '[email protected]',
    },
    {
      id: 'rem003',
      type: 'birthday',
      message: 'Happy birthday! Don't forget to wish them well!',
      delay: '2m',
      recipient: '[email protected]',
    },
    {
      id: 'rem004',
      type: 'maintenance',
      message: 'System maintenance scheduled for tonight',
      delay: '5m',
      recipient: '[email protected]',
    },
    {
      id: 'rem005',
      type: 'follow-up',
      message: 'Follow up with client about proposal',
      delay: '30s',
      recipient: '[email protected]',
    },
  ];

  console.log('Sending delayed reminders...');
  const sendTime = new Date();

  for (const reminder of reminders) {
    const delayLevel = DELAY_LEVELS[reminder.delay];
    if (!delayLevel) {
      console.log(`Warning: Unknown delay level '${reminder.delay}' for reminder ${reminder.id}`);
      continue;
    }

    const message = {
      topic: 'reminder-topic',
      tag: reminder.type,
      body: Buffer.from(JSON.stringify({
        ...reminder,
        scheduledAt: sendTime.toISOString(),
        delayLevel: delayLevel,
        expectedAt: new Date(sendTime.getTime() + (delayLevel * 1000)).toISOString(),
      })),
      keys: [reminder.id],
      properties: {
        reminderId: reminder.id,
        reminderType: reminder.type,
        delay: reminder.delay,
        delayLevel: delayLevel.toString(),
        scheduledAt: sendTime.toISOString(),
      },
      // Set delay level
      delayTimeLevel: delayLevel,
    };

    const result = await producer.send(message);
    console.log(`[${reminder.delay.padEnd(3)}] ${reminder.type.padEnd(12)} -> ${reminder.recipient.padEnd(25)} (ID: ${reminder.id})`);
    console.log(`      Expected at: ${new Date(sendTime.getTime() + (delayLevel * 1000)).toLocaleTimeString()}`);
    console.log(`      Message ID: ${result.messageId}`);
    console.log('---');
  }

  await producer.shutdown();
  console.log('Reminder producer stopped');
}

// Notification service for handling reminders
async function reminderConsumer() {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'reminder-consumer-group',
    topic: 'reminder-topic',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'reminder-consumer',
  });

  await consumer.start();
  console.log('Reminder consumer started');

  consumer.subscribe((msg, ack) => {
    try {
      const reminder = JSON.parse(msg.body.toString());
      const receivedTime = new Date();
      const scheduledTime = new Date(reminder.scheduledAt);
      const expectedTime = new Date(reminder.expectedAt);
      const actualDelay = (receivedTime.getTime() - scheduledTime.getTime()) / 1000;
      const expectedDelay = (expectedTime.getTime() - scheduledTime.getTime()) / 1000;

      console.log('\n🔔 REMINDER RECEIVED!');
      console.log(`   ID: ${reminder.id}`);
      console.log(`   Type: ${reminder.type}`);
      console.log(`   Recipient: ${reminder.recipient}`);
      console.log(`   Message: ${reminder.message}`);
      console.log(`   Scheduled: ${scheduledTime.toLocaleTimeString()}`);
      console.log(`   Expected:   ${expectedTime.toLocaleTimeString()}`);
      console.log(`   Received:   ${receivedTime.toLocaleTimeString()}`);
      console.log(`   Delay: ${actualDelay.toFixed(2)}s (expected: ${expectedDelay}s)`);
      console.log(`   Accuracy: ${Math.abs(actualDelay - expectedDelay) < 1 ? '✓ Good' : '⚠ Delayed'}`);
      console.log('---');

      // Here you would send actual notification (email, SMS, push notification, etc.)
      // For demo purposes, we just log it

      ack();
    } catch (err) {
      console.error('Error processing reminder:', err);
      ack();
    }
  });

  return consumer;
}

// Order cancellation with grace period
async function orderCancellationDemo() {
  console.log('\n=== Order Cancellation with Grace Period ===');

  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'order-cancellation-producer',
    instanceName: 'order-cancellation-producer',
  });

  await producer.start();

  // Create an order
  const order = {
    orderId: 'ORD-CANCEL-001',
    userId: 'user-cancel-demo',
    amount: 299.99,
    items: ['Premium Widget', 'Extended Warranty'],
    createdAt: new Date().toISOString(),
  };

  console.log(`Creating order: ${order.orderId}`);

  // Send order created message
  await producer.send({
    topic: 'order-lifecycle',
    tag: 'order.created',
    body: Buffer.from(JSON.stringify(order)),
    keys: [order.orderId],
  });

  // Send delayed cancellation check
  const cancellationCheck = {
    orderId: order.orderId,
    userId: order.userId,
    type: 'auto-cancel-check',
    scheduledAt: new Date().toISOString(),
    gracePeriod: '30s',
  };

  await producer.send({
    topic: 'order-lifecycle',
    tag: 'cancellation.check',
    body: Buffer.from(JSON.stringify(cancellationCheck)),
    keys: [order.orderId, 'cancel-check'],
    delayTimeLevel: DELAY_LEVELS['30s'], // Check after 30 seconds
  });

  console.log(`Scheduled cancellation check for ${order.orderId} in 30s`);

  await producer.shutdown();
}

// Lifecycle consumer for orders
async function orderLifecycleConsumer() {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'order-lifecycle-consumer',
    topic: 'order-lifecycle',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'order-lifecycle-consumer',
  });

  await consumer.start();

  // Track order states
  const orderStates = new Map();

  consumer.subscribe((msg, ack) => {
    try {
      if (msg.tag === 'cancellation.check') {
        const check = JSON.parse(msg.body.toString());
        console.log(`\n⏰ CANCELLATION CHECK FOR ORDER: ${check.orderId}`);

        // In a real system, you would check if payment was received
        // For demo, we'll randomly decide to cancel or not
        const shouldCancel = Math.random() > 0.5;

        if (shouldCancel) {
          console.log(`   → Cancelling unpaid order ${check.orderId}`);
          // Here you would send a cancellation message
        } else {
          console.log(`   → Order ${check.orderId} is confirmed`);
          // Here you might schedule fulfillment
        }
      } else {
        const order = JSON.parse(msg.body.toString());
        console.log(`📦 Order Event: ${msg.tag} -> ${order.orderId}`);
        orderStates.set(order.orderId, msg.tag);
      }

      ack();
    } catch (err) {
      console.error('Error in lifecycle consumer:', err);
      ack();
    }
  });

  return consumer;
}

// Main function
async function main() {
  console.log('=== RocketMQ Delayed Messages Example ===');

  try {
    // Step 1: Start consumers
    console.log('\n1. Starting consumers...');
    const reminderConsumerPromise = reminderConsumer();
    const orderLifecyclePromise = orderLifecycleConsumer();

    // Step 2: Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Step 3: Send reminders
    console.log('\n2. Sending delayed reminders...');
    await reminderProducer();

    // Step 4: Demo order cancellation
    await new Promise(resolve => setTimeout(resolve, 1000));
    await orderCancellationDemo();

    // Step 5: Wait for all delayed messages to be processed
    console.log('\n3. Waiting for delayed messages to be delivered...');
    await new Promise(resolve => setTimeout(resolve, 35000)); // Wait for longest delay (2m + buffer)

    // Step 6: Shutdown consumers
    console.log('\n4. Shutting down consumers...');
    const reminderConsumer = await reminderConsumerPromise;
    const orderConsumer = await orderLifecyclePromise;
    await reminderConsumer.shutdown();
    await orderConsumer.shutdown();

    console.log('Delayed messages example completed!');
  } catch (err) {
    console.error('Error in delayed messages example:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Consommation de Diffusion RocketMQ javascript

🟡 intermediate ⭐⭐⭐

Implémenter un modèle de consommation de diffusion où tous les consommateurs reçoivent le même message

⏱️ 20 min 🏷️ rocketmq, broadcast, fanout
Prerequisites: RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Broadcast Consumption Example
// Demonstrates broadcasting messages to all consumers in a consumer group

const { Producer, Consumer, MessageModel } = require('apache-rocketmq');

// Configuration source producer
async function configurationProducer() {
  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'config-producer-group',
    instanceName: 'config-producer',
  });

  await producer.start();
  console.log('Configuration producer started');

  // Configuration updates to broadcast
  const configUpdates = [
    {
      id: 'cfg001',
      type: 'feature_flag',
      key: 'new_dashboard_enabled',
      value: 'true',
      version: '1.0.0',
      description: 'Enable new dashboard UI for all users',
    },
    {
      id: 'cfg002',
      type: 'maintenance',
      key: 'system_maintenance',
      value: JSON.stringify({
        scheduled: '2025-12-15T02:00:00Z',
        duration: 2, // hours
        affectedServices: ['api', 'web', 'mobile'],
      }),
      version: '1.0.0',
      description: 'Scheduled system maintenance window',
    },
    {
      id: 'cfg003',
      type: 'rate_limit',
      key: 'api_rate_limit',
      value: '1000',
      version: '1.1.0',
      description: 'Update API rate limit to 1000 requests per minute',
    },
    {
      id: 'cfg004',
      type: 'security',
      key: 'password_policy',
      value: JSON.stringify({
        minLength: 8,
        requireUppercase: true,
        requireLowercase: true,
        requireNumbers: true,
        requireSpecialChars: true,
      }),
      version: '2.0.0',
      description: 'Enhanced password security requirements',
    },
    {
      id: 'cfg005',
      type: 'feature_flag',
      key: 'beta_features_enabled',
      value: 'false',
      version: '1.0.1',
      description: 'Disable beta features temporarily',
    },
  ];

  console.log('Broadcasting configuration updates...');

  for (const config of configUpdates) {
    const message = {
      topic: 'configuration-updates',
      tag: config.type,
      body: Buffer.from(JSON.stringify({
        ...config,
        timestamp: new Date().toISOString(),
        publisher: 'config-service',
      })),
      keys: [config.id, config.key],
      properties: {
        configId: config.id,
        configType: config.type,
        configVersion: config.version,
        priority: config.type === 'security' ? 'high' : 'normal',
      },
    };

    const result = await producer.send(message);
    console.log(`📡 Broadcast: ${config.type.padEnd(15)} ${config.key.padEnd(25)} -> v${config.version}`);
    console.log(`     ${config.description}`);
    console.log(`     Message ID: ${result.messageId}`);
    console.log('---');
  }

  await producer.shutdown();
  console.log('Configuration producer stopped');
}

// Service instance that receives configuration broadcasts
class ServiceInstance {
  constructor(name, instanceId) {
    this.name = name;
    this.instanceId = instanceId;
    this.config = new Map();
    this.startTime = new Date();
  }

  async startConsumer() {
    this.consumer = new Consumer({
      endpoints: 'localhost:9876',
      consumerGroup: 'config-consumer-group', // Same group for broadcast
      topic: 'configuration-updates',
      tag: '*',
      messageModel: MessageModel.BROADCASTING, // Key for broadcast consumption
      instanceName: `${this.name}-${this.instanceId}`,
    });

    await this.consumer.start();
    console.log(`[${this.name}-${this.instanceId}] Consumer started`);

    this.consumer.subscribe((msg, ack) => {
      this.handleConfigurationUpdate(msg, ack);
    });

    return this.consumer;
  }

  handleConfigurationUpdate(msg, ack) {
    try {
      const configUpdate = JSON.parse(msg.body.toString());
      const receivedTime = new Date();

      console.log(`\n📥 [${this.name}-${this.instanceId}] Configuration Update Received`);
      console.log(`    Config ID: ${configUpdate.id}`);
      console.log(`    Type: ${configUpdate.type}`);
      console.log(`    Key: ${configUpdate.key}`);
      console.log(`    New Value: ${configUpdate.value}`);
      console.log(`    Version: ${configUpdate.version}`);
      console.log(`    Description: ${configUpdate.description}`);
      console.log(`    Published: ${new Date(configUpdate.timestamp).toLocaleTimeString()}`);
      console.log(`    Received: ${receivedTime.toLocaleTimeString()}`);
      console.log(`    Latency: ${(receivedTime.getTime() - new Date(configUpdate.timestamp).getTime())}ms`);

      // Apply configuration update
      const previousValue = this.config.get(configUpdate.key);
      this.config.set(configUpdate.key, {
        value: configUpdate.value,
        version: configUpdate.version,
        updatedAt: configUpdate.timestamp,
        updatedAt: receivedTime.toISOString(),
      });

      console.log(`    Previous value: ${previousValue ? previousValue.value : 'NOT_SET'}`);
      console.log(`    Status: ${this.applyConfiguration(configUpdate)}`);

      // Show current configuration count
      console.log(`    Total configs: ${this.config.size}`);
      console.log('---');

      ack();
    } catch (err) {
      console.error(`[${this.name}-${this.instanceId}] Error handling config update:`, err);
      ack();
    }
  }

  applyConfiguration(configUpdate) {
    try {
      switch (configUpdate.type) {
        case 'feature_flag':
          const flagEnabled = configUpdate.value === 'true';
          console.log(`    → Feature flag '${configUpdate.key}' ${flagEnabled ? 'ENABLED' : 'DISABLED'}`);
          this.updateFeatureFlag(configUpdate.key, flagEnabled);
          break;

        case 'rate_limit':
          console.log(`    → Rate limit updated to ${configUpdate.value} requests/min`);
          this.updateRateLimit(parseInt(configUpdate.value));
          break;

        case 'security':
          console.log(`    → Security policy updated for '${configUpdate.key}'`);
          this.updateSecurityPolicy(configUpdate.key, JSON.parse(configUpdate.value));
          break;

        case 'maintenance':
          const maintenance = JSON.parse(configUpdate.value);
          console.log(`    → Maintenance scheduled: ${maintenance.scheduled}`);
          this.scheduleMaintenance(maintenance);
          break;

        default:
          console.log(`    → Generic configuration applied`);
      }

      return 'SUCCESS';
    } catch (err) {
      console.error(`    → Failed to apply configuration: ${err.message}`);
      return 'ERROR';
    }
  }

  // Simulate applying configurations
  updateFeatureFlag(key, enabled) {
    // In real service, this would toggle features
    console.log(`      [${this.name}] Feature '${key}' is now ${enabled ? 'ACTIVE' : 'INACTIVE'}`);
  }

  updateRateLimit(limit) {
    // In real service, this would update rate limiting
    console.log(`      [${this.name}] Rate limit set to ${limit}/min`);
  }

  updateSecurityPolicy(key, policy) {
    // In real service, this would update security policies
    console.log(`      [${this.name}] Security policy '${key}' updated`);
  }

  scheduleMaintenance(maintenance) {
    // In real service, this would prepare for maintenance
    console.log(`      [${this.name}] Preparing for maintenance window`);
  }

  async shutdown() {
    if (this.consumer) {
      await this.consumer.shutdown();
      console.log(`[${this.name}-${this.instanceId}] Consumer stopped`);
    }
  }

  printStatus() {
    console.log(`\n📊 [${this.name}-${this.instanceId}] Status:`);
    console.log(`    Uptime: ${Math.round((new Date() - this.startTime) / 1000)}s`);
    console.log(`    Configurations: ${this.config.size}`);

    if (this.config.size > 0) {
      console.log('    Active configs:');
      for (const [key, config] of this.config.entries()) {
        console.log(`      - ${key}: v${config.version}`);
      }
    }
  }
}

// Service health monitor
async function serviceHealthMonitor(services) {
  console.log('\n=== Service Health Monitor ===');

  const monitorInterval = setInterval(() => {
    console.log('\n' + '='.repeat(50));
    console.log(`Health Check - ${new Date().toLocaleTimeString()}`);
    console.log('='.repeat(50));

    services.forEach(service => {
      service.printStatus();
    });
  }, 8000);

  // Stop monitoring after 30 seconds
  setTimeout(() => {
    clearInterval(monitorInterval);
    console.log('\nHealth monitoring stopped');
  }, 30000);
}

// Demo cluster consumption vs broadcast consumption
async function demoConsumptionPatterns() {
  console.log('\n=== Consumption Patterns Demo ===');

  // Cluster consumer (load balancing)
  const clusterConsumer1 = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'cluster-consumer-group',
    topic: 'pattern-demo',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'cluster-consumer-1',
  });

  const clusterConsumer2 = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'cluster-consumer-group',
    topic: 'pattern-demo',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'cluster-consumer-2',
  });

  // Broadcast consumers (all get all messages)
  const broadcastConsumer1 = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'broadcast-consumer-group',
    topic: 'pattern-demo',
    tag: '*',
    messageModel: MessageModel.BROADCASTING,
    instanceName: 'broadcast-consumer-1',
  });

  const broadcastConsumer2 = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'broadcast-consumer-group',
    topic: 'pattern-demo',
    tag: '*',
    messageModel: MessageModel.BROADCASTING,
    instanceName: 'broadcast-consumer-2',
  });

  await clusterConsumer1.start();
  await clusterConsumer2.start();
  await broadcastConsumer1.start();
  await broadcastConsumer2.start();

  // Set up message handlers
  let clusterMessageCount = 0;
  let broadcast1MessageCount = 0;
  let broadcast2MessageCount = 0;

  clusterConsumer1.subscribe((msg, ack) => {
    clusterMessageCount++;
    console.log(`[CLUSTER-1] Got message ${clusterMessageCount}: ${msg.body.toString()}`);
    ack();
  });

  clusterConsumer2.subscribe((msg, ack) => {
    console.log(`[CLUSTER-2] Got message: ${msg.body.toString()}`);
    ack();
  });

  broadcastConsumer1.subscribe((msg, ack) => {
    broadcast1MessageCount++;
    console.log(`[BROADCAST-1] Got message ${broadcast1MessageCount}: ${msg.body.toString()}`);
    ack();
  });

  broadcastConsumer2.subscribe((msg, ack) => {
    broadcast2MessageCount++;
    console.log(`[BROADCAST-2] Got message ${broadcast2MessageCount}: ${msg.body.toString()}`);
    ack();
  });

  // Producer for demo messages
  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'pattern-demo-producer',
  });

  await producer.start();

  // Send messages
  for (let i = 1; i <= 5; i++) {
    await producer.send({
      topic: 'pattern-demo',
      body: Buffer.from(`Demo message ${i}`),
    });
    await new Promise(resolve => setTimeout(resolve, 500));
  }

  // Wait for processing
  await new Promise(resolve => setTimeout(resolve, 3000));

  console.log('\n📈 Consumption Pattern Results:');
  console.log(`Cluster consumers: ${clusterMessageCount} total messages (load balanced)`);
  console.log(`Broadcast consumer 1: ${broadcast1MessageCount} messages (all received)`);
  console.log(`Broadcast consumer 2: ${broadcast2MessageCount} messages (all received)`);

  // Cleanup
  await producer.shutdown();
  await clusterConsumer1.shutdown();
  await clusterConsumer2.shutdown();
  await broadcastConsumer1.shutdown();
  await broadcastConsumer2.shutdown();
}

// Main function
async function main() {
  console.log('=== RocketMQ Broadcast Consumption Example ===');

  try {
    // Step 1: Create multiple service instances
    console.log('\n1. Creating service instances...');
    const services = [
      new ServiceInstance('api-service', 'instance-1'),
      new ServiceInstance('api-service', 'instance-2'),
      new ServiceInstance('api-service', 'instance-3'),
      new ServiceInstance('web-service', 'instance-1'),
      new ServiceInstance('web-service', 'instance-2'),
    ];

    // Step 2: Start consumers for all services
    console.log('\n2. Starting broadcast consumers...');
    const consumerPromises = services.map(service => service.startConsumer());
    await Promise.all(consumerPromises);

    // Step 3: Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Step 4: Start health monitor
    console.log('\n3. Starting service health monitor...');
    serviceHealthMonitor(services);

    // Step 5: Send configuration updates
    console.log('\n4. Broadcasting configuration updates...');
    await configurationProducer();

    // Step 6: Demo consumption patterns
    await new Promise(resolve => setTimeout(resolve, 3000));
    await demoConsumptionPatterns();

    // Step 7: Wait for all processing to complete
    console.log('\n5. Waiting for processing to complete...');
    await new Promise(resolve => setTimeout(resolve, 35000));

    // Step 8: Shutdown all services
    console.log('\n6. Shutting down all services...');
    const shutdownPromises = services.map(service => service.shutdown());
    await Promise.all(shutdownPromises);

    console.log('\nBroadcast consumption example completed!');
    console.log('\nKey Points:');
    console.log('- All instances in the same consumer group received every message');
    console.log('- Each service instance applied configurations independently');
    console.log('- Useful for configuration updates, cache invalidation, and notifications');
    console.log('- Contrast with clustering which load balances messages');
  } catch (err) {
    console.error('Error in broadcast consumption example:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Messages Transactionnels RocketMQ javascript

🔴 complex ⭐⭐⭐⭐⭐

Implémenter des transactions distribuées en utilisant la fonction de messages transactionnels de RocketMQ pour la cohérence éventuelle

⏱️ 30 min 🏷️ rocketmq, transaction, distributed, acid
Prerequisites: RocketMQ server running, Node.js, Advanced RocketMQ knowledge, Understanding of distributed transactions
// RocketMQ Transactional Messages Example
// Demonstrates distributed transactions with local and remote operations

const { Producer, Consumer, MessageModel, TransactionMQProducer } = require('apache-rocketmq');

// Simulated database for orders
class OrderDatabase {
  constructor() {
    this.orders = new Map();
    this.payments = new Map();
    this.inventories = new Map();
  }

  async createOrder(order) {
    console.log(`[DB] Creating order: ${order.orderId}`);
    // Simulate database operation
    await new Promise(resolve => setTimeout(resolve, 100));
    this.orders.set(order.orderId, { ...order, status: 'created' });
    return true;
  }

  async processPayment(payment) {
    console.log(`[DB] Processing payment: ${payment.paymentId}`);
    // Simulate payment processing
    await new Promise(resolve => setTimeout(resolve, 200));

    // Simulate occasional payment failures
    if (Math.random() < 0.2) {
      console.log(`[DB] Payment failed: ${payment.paymentId}`);
      return false;
    }

    this.payments.set(payment.paymentId, { ...payment, status: 'completed' });
    return true;
  }

  async updateInventory(inventoryUpdate) {
    console.log(`[DB] Updating inventory for: ${inventoryUpdate.productId}`);
    // Simulate inventory update
    await new Promise(resolve => setTimeout(resolve, 50));
    this.inventories.set(inventoryUpdate.productId, inventoryUpdate);
    return true;
  }

  async rollbackOrder(orderId) {
    console.log(`[DB] Rolling back order: ${orderId}`);
    const order = this.orders.get(orderId);
    if (order) {
      order.status = 'cancelled';
      this.orders.set(orderId, order);
    }
  }

  getOrderStatus(orderId) {
    const order = this.orders.get(orderId);
    return order ? order.status : 'not_found';
  }
}

// Transactional producer for order processing
async function createTransactionalProducer() {
  const producer = new TransactionMQProducer({
    endpoints: 'localhost:9876',
    producerGroup: 'order-transaction-producer',
    instanceName: 'order-transaction-producer',
    // Transaction check listener
    checkListener: async (msg) => {
      try {
        const transactionId = msg.properties.transactionId;
        const orderData = JSON.parse(msg.body.toString());

        console.log(`[TRANSACTION CHECK] Checking transaction ${transactionId} for order ${orderData.orderId}`);

        // Check local transaction status
        // In real application, you would query your database
        const orderStatus = global.orderDB.getOrderStatus(orderData.orderId);

        console.log(`[TRANSACTION CHECK] Order ${orderData.orderId} status: ${orderStatus}`);

        // Return transaction status
        if (orderStatus === 'created' || orderStatus === 'processing') {
          return { state: 'COMMIT' };
        } else if (orderStatus === 'cancelled') {
          return { state: 'ROLLBACK' };
        } else {
          return { state: 'UNKNOWN' };
        }
      } catch (err) {
        console.error('Transaction check error:', err);
        return { state: 'UNKNOWN' };
      }
    },
  });

  await producer.start();
  console.log('Transactional producer started');
  return producer;
}

// Order service with transactional messaging
class OrderService {
  constructor(producer, database) {
    this.producer = producer;
    this.db = database;
  }

  async processOrder(orderRequest) {
    const transactionId = `tx_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
    console.log(`\n🔄 Starting transaction ${transactionId} for order ${orderRequest.orderId}`);

    // Step 1: Send half message (prepared message)
    const halfMessage = {
      topic: 'order-events',
      tag: 'order.created',
      body: Buffer.from(JSON.stringify({
        ...orderRequest,
        transactionId: transactionId,
        timestamp: new Date().toISOString(),
      })),
      keys: [orderRequest.orderId, transactionId],
      properties: {
        transactionId: transactionId,
        orderId: orderRequest.orderId,
      },
    };

    console.log(`[1/4] Sending half message for order ${orderRequest.orderId}`);
    const sendResult = await this.producer.sendTransactionMessage(halfMessage);

    try {
      // Step 2: Execute local transaction
      console.log(`[2/4] Executing local transaction for order ${orderRequest.orderId}`);

      const localTxSuccess = await this.executeLocalTransaction(orderRequest, transactionId);

      if (localTxSuccess) {
        // Step 3: Commit transaction
        console.log(`[3/4] Committing transaction ${transactionId}`);
        await this.producer.commitTransaction(sendResult.transactionId);
        console.log(`[4/4] ✅ Transaction ${transactionId} COMMITTED for order ${orderRequest.orderId}`);
        return { success: true, transactionId };
      } else {
        // Step 4: Rollback transaction
        console.log(`[3/4] Rolling back transaction ${transactionId}`);
        await this.producer.rollbackTransaction(sendResult.transactionId);
        console.log(`[4/4] ❌ Transaction ${transactionId} ROLLED BACK for order ${orderRequest.orderId}`);
        return { success: false, transactionId };
      }
    } catch (err) {
      console.error(`Local transaction failed for ${transactionId}:`, err);
      console.log(`[3/4] Rolling back transaction ${transactionId} due to error`);
      await this.producer.rollbackTransaction(sendResult.transactionId);
      throw err;
    }
  }

  async executeLocalTransaction(orderRequest, transactionId) {
    try {
      // Create order in local database
      const orderSuccess = await this.db.createOrder({
        ...orderRequest,
        status: 'processing',
        transactionId: transactionId,
      });

      if (!orderSuccess) {
        await this.db.rollbackOrder(orderRequest.orderId);
        return false;
      }

      // Process payment
      const paymentSuccess = await this.db.processPayment({
        paymentId: `pay_${orderRequest.orderId}`,
        orderId: orderRequest.orderId,
        amount: orderRequest.amount,
        transactionId: transactionId,
      });

      if (!paymentSuccess) {
        await this.db.rollbackOrder(orderRequest.orderId);
        return false;
      }

      // Update inventory
      for (const item of orderRequest.items) {
        const inventorySuccess = await this.db.updateInventory({
          productId: item.productId,
          quantity: -item.quantity, // Decrease inventory
          orderId: orderRequest.orderId,
          transactionId: transactionId,
        });

        if (!inventorySuccess) {
          await this.db.rollbackOrder(orderRequest.orderId);
          return false;
        }
      }

      // Mark order as completed
      const order = this.db.orders.get(orderRequest.orderId);
      order.status = 'completed';
      this.db.orders.set(orderRequest.orderId, order);

      console.log(`[LOCAL TX] All operations completed for order ${orderRequest.orderId}`);
      return true;

    } catch (err) {
      console.error(`[LOCAL TX] Error executing transaction for order ${orderRequest.orderId}:`, err);
      await this.db.rollbackOrder(orderRequest.orderId);
      return false;
    }
  }
}

// Order event consumer
async function orderEventConsumer() {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'order-event-consumer',
    topic: 'order-events',
    tag: '*',
    messageModel: MessageModel.BROADCASTING, // All consumers get all messages
    instanceName: 'order-event-consumer',
  });

  await consumer.start();
  console.log('Order event consumer started');

  consumer.subscribe((msg, ack) => {
    try {
      const orderEvent = JSON.parse(msg.body.toString());
      const now = new Date();

      console.log('\n📦 ORDER EVENT RECEIVED');
      console.log(`   Order ID: ${orderEvent.orderId}`);
      console.log(`   Transaction ID: ${orderEvent.transactionId}`);
      console.log(`   Amount: $${orderEvent.amount}`);
      console.log(`   Items: ${orderEvent.items.map(i => i.productId).join(', ')}`);
      console.log(`   Created At: ${new Date(orderEvent.timestamp).toLocaleTimeString()}`);
      console.log(`   Received At: ${now.toLocaleTimeString()}`);
      console.log(`   Processing Delay: ${(now.getTime() - new Date(orderEvent.timestamp).getTime())}ms`);

      // Here you would:
      // 1. Update search indexes
      // 2. Send notifications
      // 3. Update analytics
      // 4. Trigger other downstream services
      console.log('   → Processing downstream services...');

      // Simulate downstream processing
      setTimeout(() => {
        console.log(`   → Downstream processing completed for ${orderEvent.orderId}`);
      }, 100);

      console.log('---');
      ack();
    } catch (err) {
      console.error('Error processing order event:', err);
      ack();
    }
  });

  return consumer;
}

// Payment verification service (consumer)
async function paymentVerificationConsumer() {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'payment-verification-consumer',
    topic: 'order-events',
    tag: 'order.created',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'payment-verification-consumer',
  });

  await consumer.start();
  console.log('Payment verification consumer started');

  consumer.subscribe((msg, ack) => {
    try {
      const orderEvent = JSON.parse(msg.body.toString());

      console.log(`\n💳 Payment verification for order ${orderEvent.orderId}`);

      // In a real system, you might:
      // 1. Verify payment with external payment gateway
      // 2. Update fraud detection systems
      // 3. Send payment confirmation to customer

      const paymentStatus = Math.random() > 0.1 ? 'VERIFIED' : 'REQUIRES_REVIEW';
      console.log(`   → Payment status: ${paymentStatus}`);

      if (paymentStatus === 'REQUIRES_REVIEW') {
        console.log(`   → Order ${orderEvent.orderId} flagged for manual review`);
      }

      ack();
    } catch (err) {
      console.error('Error in payment verification:', err);
      ack();
    }
  });

  return consumer;
}

// Main function
async function main() {
  console.log('=== RocketMQ Transactional Messages Example ===');

  // Initialize database
  global.orderDB = new OrderDatabase();

  try {
    // Step 1: Create transactional producer
    console.log('\n1. Setting up transactional producer...');
    const producer = await createTransactionalProducer();

    // Step 2: Start consumers
    console.log('\n2. Starting consumers...');
    const orderConsumerPromise = orderEventConsumer();
    const paymentConsumerPromise = paymentVerificationConsumer();

    // Step 3: Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Step 4: Create order service
    console.log('\n3. Creating order service...');
    const orderService = new OrderService(producer, global.orderDB);

    // Step 5: Process multiple orders
    console.log('\n4. Processing orders...');
    const orders = [
      {
        orderId: 'TX-ORDER-001',
        userId: 'user-tx-001',
        amount: 299.99,
        items: [
          { productId: 'LAPTOP-001', quantity: 1, price: 199.99 },
          { productId: 'MOUSE-001', quantity: 1, price: 49.99 },
          { productId: 'KEYBOARD-001', quantity: 1, price: 50.01 },
        ],
      },
      {
        orderId: 'TX-ORDER-002',
        userId: 'user-tx-002',
        amount: 149.99,
        items: [
          { productId: 'MONITOR-001', quantity: 1, price: 149.99 },
        ],
      },
      {
        orderId: 'TX-ORDER-003',
        userId: 'user-tx-003',
        amount: 89.97,
        items: [
          { productId: 'HEADPHONE-001', quantity: 2, price: 44.99 },
        ],
      },
    ];

    // Process orders with delays
    for (let i = 0; i < orders.length; i++) {
      const order = orders[i];
      console.log(`\n--- Processing Order ${i + 1}/${orders.length} ---`);

      const result = await orderService.processOrder(order);

      if (result.success) {
        console.log(`✅ Order ${order.orderId} processed successfully`);
      } else {
        console.log(`❌ Order ${order.orderId} processing failed`);
      }

      // Wait between orders
      if (i < orders.length - 1) {
        await new Promise(resolve => setTimeout(resolve, 3000));
      }
    }

    // Step 6: Wait for message processing
    console.log('\n5. Waiting for message processing...');
    await new Promise(resolve => setTimeout(resolve, 10000));

    // Step 7: Shutdown
    console.log('\n6. Shutting down...');
    await producer.shutdown();

    const orderConsumer = await orderConsumerPromise;
    const paymentConsumer = await paymentConsumerPromise;
    await orderConsumer.shutdown();
    await paymentConsumer.shutdown();

    console.log('\nTransactional messages example completed!');
    console.log('\nSummary:');
    console.log('- Demonstrated distributed transactions with local DB ops and remote messaging');
    console.log('- Transaction consistency maintained even with failures');
    console.log('- Downstream services only receive committed transactions');
  } catch (err) {
    console.error('Error in transactional messages example:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});