Exemples RocketMQ Message Queue

Exemples Apache RocketMQ incluant producteurs, consommateurs, messages ordonnés, messages retardés, consommation de diffusion et messages transactionnels pour messagerie haute performance

💻 RocketMQ Hello World - Producteur/Consommateur de Base javascript

🟢 simple

Exemple simple de producteur et consommateur Apache RocketMQ démontrant des opérations de base de file de messages

⏱️ 10 min 🏷️ rocketmq, messaging, beginner
Prerequisites: RocketMQ server running, Node.js
// RocketMQ Hello World - Basic Producer/Consumer Example
// Install: npm install apache-rocketmq

const { Producer, Consumer, MessageModel } = require('apache-rocketmq');

async function helloWorld() {
  // RocketMQ configuration
  const config = {
    endpoints: 'localhost:9876',
    accessKey: 'rocketmq',
    secretKey: '12345678',
    instanceName: 'hello-world-example',
  };

  console.log('=== RocketMQ Hello World Example ===');

  // Create consumer
  const consumer = new Consumer({
    ...config,
    consumerGroup: 'hello-consumer-group',
    topic: 'hello-topic',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
  });

  // Create producer
  const producer = new Producer({
    ...config,
    producerGroup: 'hello-producer-group',
  });

  // Start consumer
  console.log('Starting consumer...');
  await consumer.start();

  // Set up message listener
  consumer.subscribe((msg, ack) => {
    console.log(`Received message: ${msg.body.toString()}`);
    console.log(`Message ID: ${msg.id}`);
    console.log(`Topic: ${msg.topic}`);
    console.log(`Tag: ${msg.tag}`);
    console.log('---');

    // Acknowledge message
    ack();
  });

  // Start producer
  console.log('Starting producer...');
  await producer.start();

  // Wait a moment for consumer to be ready
  await new Promise(resolve => setTimeout(resolve, 2000));

  // Send messages
  console.log('Sending messages...');
  const messages = [
    'Hello RocketMQ!',
    'Apache RocketMQ is powerful!',
    'Message queue made easy!',
    'High performance messaging!',
  ];

  for (let i = 0; i < messages.length; i++) {
    const message = {
      topic: 'hello-topic',
      tag: 'hello',
      body: Buffer.from(messages[i]),
      keys: ['key-' + (i + 1)],
      properties: {
        source: 'hello-world-example',
        timestamp: new Date().toISOString(),
      },
    };

    const result = await producer.send(message);
    console.log(`Message sent: ${messages[i]}`);
    console.log(`Message ID: ${result.messageId}`);
    console.log('---');
  }

  // Wait for messages to be consumed
  console.log('Waiting for messages to be consumed...');
  await new Promise(resolve => setTimeout(resolve, 5000));

  // Shutdown
  console.log('Shutting down...');
  await producer.shutdown();
  await consumer.shutdown();

  console.log('RocketMQ Hello World example completed!');
}

helloWorld().catch(err => {
  console.error('Error:', err);
});

💻 Messages Ordonnés RocketMQ javascript

🟡 intermediate ⭐⭐⭐

Implémenter la livraison de messages ordonnés en utilisant des clés de partitionnement avec des files de messages

⏱️ 20 min 🏷️ rocketmq, ordered, sequencing
Prerequisites: RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Ordered Messages Example
// Ensures message order within the same sharding key

const { Producer, Consumer, MessageModel } = require('apache-rocketmq');

// Order event producer
async function orderEventProducer() {
  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'order-producer-group',
    instanceName: 'order-event-producer',
  });

  await producer.start();
  console.log('Order event producer started');

  // Simulate order events for multiple users
  const orders = [
    // User 1 - Order lifecycle (should be processed in order)
    { userId: 'user001', orderId: 'ORD001', event: 'order.created', amount: 99.99 },
    { userId: 'user001', orderId: 'ORD001', event: 'payment.processed', amount: 99.99 },
    { userId: 'user001', orderId: 'ORD001', event: 'order.confirmed', amount: 99.99 },
    { userId: 'user001', orderId: 'ORD001', event: 'order.shipped', amount: 99.99 },
    { userId: 'user001', orderId: 'ORD001', event: 'order.delivered', amount: 99.99 },

    // User 2 - Order lifecycle (should be processed in order)
    { userId: 'user002', orderId: 'ORD002', event: 'order.created', amount: 149.99 },
    { userId: 'user002', orderId: 'ORD002', event: 'payment.processed', amount: 149.99 },
    { userId: 'user002', orderId: 'ORD002', event: 'order.confirmed', amount: 149.99 },

    // User 3 - Order lifecycle (should be processed in order)
    { userId: 'user003', orderId: 'ORD003', event: 'order.created', amount: 79.99 },
    { userId: 'user003', orderId: 'ORD003', event: 'payment.failed', amount: 79.99 },
    { userId: 'user003', orderId: 'ORD003', event: 'order.cancelled', amount: 79.99 },

    // More events for user 1 and 2
    { userId: 'user001', orderId: 'ORD004', event: 'order.created', amount: 199.99 },
    { userId: 'user002', orderId: 'ORD005', event: 'order.created', amount: 59.99 },
  ];

  console.log('Sending order events...');
  for (let i = 0; i < orders.length; i++) {
    const order = orders[i];

    const message = {
      topic: 'order-events',
      tag: order.event,
      body: Buffer.from(JSON.stringify({
        ...order,
        timestamp: new Date().toISOString(),
        sequence: i + 1,
      })),
      // Use userId as sharding key to ensure order for each user
      shardingKey: order.userId,
      keys: [order.orderId, order.userId],
      properties: {
        eventType: order.event,
        userId: order.userId,
        orderId: order.orderId,
      },
    };

    const result = await producer.send(message);
    console.log(`[${order.userId}] ${order.event}: ${order.orderId} (Message ID: ${result.messageId})`);

    // Small delay to simulate real-world scenario
    await new Promise(resolve => setTimeout(resolve, 100));
  }

  await producer.shutdown();
  console.log('Order event producer stopped');
}

// Order event consumer with ordered processing
async function orderEventConsumer(consumerId) {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: `order-consumer-group-${consumerId}`,
    topic: 'order-events',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: `order-consumer-${consumerId}`,
    // Enable ordered message consumption
    orderly: true,
  });

  await consumer.start();
  console.log(`Order event consumer ${consumerId} started`);

  // Track order states for each user
  const orderStates = new Map();

  consumer.subscribe((msg, ack) => {
    try {
      const orderEvent = JSON.parse(msg.body.toString());

      console.log(`[CONSUMER ${consumerId}] Processing: ${orderEvent.userId} -> ${orderEvent.event}`);

      // Validate order sequence for each user
      const userState = orderStates.get(orderEvent.userId) || { lastEvent: null, expectedEvent: 'order.created' };

      // Simple order state validation
      const validTransitions = {
        'order.created': ['payment.processed', 'payment.failed'],
        'payment.processed': ['order.confirmed'],
        'payment.failed': ['order.cancelled'],
        'order.confirmed': ['order.shipped'],
        'order.shipped': ['order.delivered'],
        'order.cancelled': ['order.created'], // Can create new order after cancellation
        'order.delivered': ['order.created'], // Can create new order after delivery
      };

      if (userState.lastEvent) {
        const allowedNextEvents = validTransitions[userState.lastEvent] || [];
        if (!allowedNextEvents.includes(orderEvent.event)) {
          console.log(`[CONSUMER ${consumerId}] WARNING: Invalid order transition for ${orderEvent.userId}: ${userState.lastEvent} -> ${orderEvent.event}`);
        }
      }

      // Update order state
      orderStates.set(orderEvent.userId, {
        lastEvent: orderEvent.event,
        lastOrderId: orderEvent.orderId,
      });

      console.log(`[CONSUMER ${consumerId}] Processed: ${orderEvent.userId} -> ${orderEvent.event} -> ${orderEvent.orderId}`);
      console.log(`[CONSUMER ${consumerId}] Current state for ${orderEvent.userId}: ${orderStates.get(orderEvent.userId).lastEvent}`);
      console.log('---');

      // Acknowledge message
      ack();
    } catch (err) {
      console.error(`[CONSUMER ${consumerId}] Error processing message:`, err);
      ack(); // Acknowledge even on error to avoid blocking the queue
    }
  });

  return consumer;
}

// Demo ordered vs unordered messages
async function demoOrderingComparison() {
  console.log('\n=== Ordered vs Unordered Comparison ===');

  // Unordered producer (random order)
  const unorderedProducer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'unordered-producer-group',
    instanceName: 'unordered-producer',
  });

  await unorderedProducer.start();

  console.log('Sending unordered messages...');
  const unorderedMessages = ['A', 'B', 'C', 'D', 'E'];

  for (const msg of unorderedMessages) {
    await unorderedProducer.send({
      topic: 'unordered-test',
      body: Buffer.from(`Unordered message: ${msg}`),
    });
    console.log(`Sent unordered: ${msg}`);
  }

  await unorderedProducer.shutdown();
}

// Main function
async function main() {
  console.log('=== RocketMQ Ordered Messages Example ===');

  try {
    // Step 1: Start multiple consumers to show load balancing
    console.log('\n1. Starting consumers...');
    const consumers = [];
    for (let i = 1; i <= 3; i++) {
      consumers.push(await orderEventConsumer(i));
    }

    // Step 2: Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Step 3: Start producer to send ordered messages
    console.log('\n2. Sending ordered messages...');
    const producerPromise = orderEventProducer();

    // Step 4: Wait for message processing
    await producerPromise;
    console.log('\n3. Waiting for message processing...');
    await new Promise(resolve => setTimeout(resolve, 8000));

    // Step 5: Demo comparison
    await demoOrderingComparison();

    // Step 6: Shutdown
    console.log('\n4. Shutting down consumers...');
    for (const consumer of consumers) {
      await consumer.shutdown();
    }

    console.log('Ordered messages example completed!');
  } catch (err) {
    console.error('Error in ordered messages example:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Messages Retardés RocketMQ javascript

🟡 intermediate ⭐⭐⭐

Implémenter la livraison de messages retardés et programmés en utilisant la fonction de retard intégrée de RocketMQ

⏱️ 25 min 🏷️ rocketmq, delayed, scheduled, timing
Prerequisites: RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Delayed Messages Example
// Demonstrates various types of delayed and scheduled message delivery

const { Producer, Consumer, MessageModel } = require('apache-rocketmq');

// Delay levels in RocketMQ (seconds)
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
const DELAY_LEVELS = {
  '1s': 1,
  '5s': 2,
  '10s': 3,
  '30s': 4,
  '1m': 5,
  '2m': 6,
  '3m': 7,
  '4m': 8,
  '5m': 9,
  '6m': 10,
  '7m': 11,
  '8m': 12,
  '9m': 13,
  '10m': 14,
  '20m': 15,
  '30m': 16,
  '1h': 17,
  '2h': 18,
};

// Reminder service producer
async function reminderProducer() {
  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'reminder-producer-group',
    instanceName: 'reminder-producer',
  });

  await producer.start();
  console.log('Reminder producer started');

  // Different types of reminders with various delays
  const reminders = [
    {
      id: 'rem001',
      type: 'meeting',
      message: 'Team standup meeting starts now!',
      delay: '10s',
      recipient: '[email protected]',
    },
    {
      id: 'rem002',
      type: 'deadline',
      message: 'Project submission deadline approaching!',
      delay: '1m',
      recipient: '[email protected]',
    },
    {
      id: 'rem003',
      type: 'birthday',
      message: 'Happy birthday! Don't forget to wish them well!',
      delay: '2m',
      recipient: '[email protected]',
    },
    {
      id: 'rem004',
      type: 'maintenance',
      message: 'System maintenance scheduled for tonight',
      delay: '5m',
      recipient: '[email protected]',
    },
    {
      id: 'rem005',
      type: 'follow-up',
      message: 'Follow up with client about proposal',
      delay: '30s',
      recipient: '[email protected]',
    },
  ];

  console.log('Sending delayed reminders...');
  const sendTime = new Date();

  for (const reminder of reminders) {
    const delayLevel = DELAY_LEVELS[reminder.delay];
    if (!delayLevel) {
      console.log(`Warning: Unknown delay level '${reminder.delay}' for reminder ${reminder.id}`);
      continue;
    }

    const message = {
      topic: 'reminder-topic',
      tag: reminder.type,
      body: Buffer.from(JSON.stringify({
        ...reminder,
        scheduledAt: sendTime.toISOString(),
        delayLevel: delayLevel,
        expectedAt: new Date(sendTime.getTime() + (delayLevel * 1000)).toISOString(),
      })),
      keys: [reminder.id],
      properties: {
        reminderId: reminder.id,
        reminderType: reminder.type,
        delay: reminder.delay,
        delayLevel: delayLevel.toString(),
        scheduledAt: sendTime.toISOString(),
      },
      // Set delay level
      delayTimeLevel: delayLevel,
    };

    const result = await producer.send(message);
    console.log(`[${reminder.delay.padEnd(3)}] ${reminder.type.padEnd(12)} -> ${reminder.recipient.padEnd(25)} (ID: ${reminder.id})`);
    console.log(`      Expected at: ${new Date(sendTime.getTime() + (delayLevel * 1000)).toLocaleTimeString()}`);
    console.log(`      Message ID: ${result.messageId}`);
    console.log('---');
  }

  await producer.shutdown();
  console.log('Reminder producer stopped');
}

// Notification service for handling reminders
async function reminderConsumer() {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'reminder-consumer-group',
    topic: 'reminder-topic',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'reminder-consumer',
  });

  await consumer.start();
  console.log('Reminder consumer started');

  consumer.subscribe((msg, ack) => {
    try {
      const reminder = JSON.parse(msg.body.toString());
      const receivedTime = new Date();
      const scheduledTime = new Date(reminder.scheduledAt);
      const expectedTime = new Date(reminder.expectedAt);
      const actualDelay = (receivedTime.getTime() - scheduledTime.getTime()) / 1000;
      const expectedDelay = (expectedTime.getTime() - scheduledTime.getTime()) / 1000;

      console.log('\n🔔 REMINDER RECEIVED!');
      console.log(`   ID: ${reminder.id}`);
      console.log(`   Type: ${reminder.type}`);
      console.log(`   Recipient: ${reminder.recipient}`);
      console.log(`   Message: ${reminder.message}`);
      console.log(`   Scheduled: ${scheduledTime.toLocaleTimeString()}`);
      console.log(`   Expected:   ${expectedTime.toLocaleTimeString()}`);
      console.log(`   Received:   ${receivedTime.toLocaleTimeString()}`);
      console.log(`   Delay: ${actualDelay.toFixed(2)}s (expected: ${expectedDelay}s)`);
      console.log(`   Accuracy: ${Math.abs(actualDelay - expectedDelay) < 1 ? '✓ Good' : '⚠ Delayed'}`);
      console.log('---');

      // Here you would send actual notification (email, SMS, push notification, etc.)
      // For demo purposes, we just log it

      ack();
    } catch (err) {
      console.error('Error processing reminder:', err);
      ack();
    }
  });

  return consumer;
}

// Order cancellation with grace period
async function orderCancellationDemo() {
  console.log('\n=== Order Cancellation with Grace Period ===');

  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'order-cancellation-producer',
    instanceName: 'order-cancellation-producer',
  });

  await producer.start();

  // Create an order
  const order = {
    orderId: 'ORD-CANCEL-001',
    userId: 'user-cancel-demo',
    amount: 299.99,
    items: ['Premium Widget', 'Extended Warranty'],
    createdAt: new Date().toISOString(),
  };

  console.log(`Creating order: ${order.orderId}`);

  // Send order created message
  await producer.send({
    topic: 'order-lifecycle',
    tag: 'order.created',
    body: Buffer.from(JSON.stringify(order)),
    keys: [order.orderId],
  });

  // Send delayed cancellation check
  const cancellationCheck = {
    orderId: order.orderId,
    userId: order.userId,
    type: 'auto-cancel-check',
    scheduledAt: new Date().toISOString(),
    gracePeriod: '30s',
  };

  await producer.send({
    topic: 'order-lifecycle',
    tag: 'cancellation.check',
    body: Buffer.from(JSON.stringify(cancellationCheck)),
    keys: [order.orderId, 'cancel-check'],
    delayTimeLevel: DELAY_LEVELS['30s'], // Check after 30 seconds
  });

  console.log(`Scheduled cancellation check for ${order.orderId} in 30s`);

  await producer.shutdown();
}

// Lifecycle consumer for orders
async function orderLifecycleConsumer() {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'order-lifecycle-consumer',
    topic: 'order-lifecycle',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'order-lifecycle-consumer',
  });

  await consumer.start();

  // Track order states
  const orderStates = new Map();

  consumer.subscribe((msg, ack) => {
    try {
      if (msg.tag === 'cancellation.check') {
        const check = JSON.parse(msg.body.toString());
        console.log(
⏰ CANCELLATION CHECK FOR ORDER: ${check.orderId}`);

        // In a real system, you would check if payment was received
        // For demo, we'll randomly decide to cancel or not
        const shouldCancel = Math.random() > 0.5;

        if (shouldCancel) {
          console.log(`   → Cancelling unpaid order ${check.orderId}`);
          // Here you would send a cancellation message
        } else {
          console.log(`   → Order ${check.orderId} is confirmed`);
          // Here you might schedule fulfillment
        }
      } else {
        const order = JSON.parse(msg.body.toString());
        console.log(`📦 Order Event: ${msg.tag} -> ${order.orderId}`);
        orderStates.set(order.orderId, msg.tag);
      }

      ack();
    } catch (err) {
      console.error('Error in lifecycle consumer:', err);
      ack();
    }
  });

  return consumer;
}

// Main function
async function main() {
  console.log('=== RocketMQ Delayed Messages Example ===');

  try {
    // Step 1: Start consumers
    console.log('\n1. Starting consumers...');
    const reminderConsumerPromise = reminderConsumer();
    const orderLifecyclePromise = orderLifecycleConsumer();

    // Step 2: Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Step 3: Send reminders
    console.log('\n2. Sending delayed reminders...');
    await reminderProducer();

    // Step 4: Demo order cancellation
    await new Promise(resolve => setTimeout(resolve, 1000));
    await orderCancellationDemo();

    // Step 5: Wait for all delayed messages to be processed
    console.log('\n3. Waiting for delayed messages to be delivered...');
    await new Promise(resolve => setTimeout(resolve, 35000)); // Wait for longest delay (2m + buffer)

    // Step 6: Shutdown consumers
    console.log('\n4. Shutting down consumers...');
    const reminderConsumer = await reminderConsumerPromise;
    const orderConsumer = await orderLifecyclePromise;
    await reminderConsumer.shutdown();
    await orderConsumer.shutdown();

    console.log('Delayed messages example completed!');
  } catch (err) {
    console.error('Error in delayed messages example:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Consommation de Diffusion RocketMQ javascript

🟡 intermediate ⭐⭐⭐

Implémenter un modèle de consommation de diffusion où tous les consommateurs reçoivent le même message

⏱️ 20 min 🏷️ rocketmq, broadcast, fanout
Prerequisites: RocketMQ server running, Node.js, RocketMQ basics
// RocketMQ Broadcast Consumption Example
// Demonstrates broadcasting messages to all consumers in a consumer group

const { Producer, Consumer, MessageModel } = require('apache-rocketmq');

// Configuration source producer
async function configurationProducer() {
  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'config-producer-group',
    instanceName: 'config-producer',
  });

  await producer.start();
  console.log('Configuration producer started');

  // Configuration updates to broadcast
  const configUpdates = [
    {
      id: 'cfg001',
      type: 'feature_flag',
      key: 'new_dashboard_enabled',
      value: 'true',
      version: '1.0.0',
      description: 'Enable new dashboard UI for all users',
    },
    {
      id: 'cfg002',
      type: 'maintenance',
      key: 'system_maintenance',
      value: JSON.stringify({
        scheduled: '2025-12-15T02:00:00Z',
        duration: 2, // hours
        affectedServices: ['api', 'web', 'mobile'],
      }),
      version: '1.0.0',
      description: 'Scheduled system maintenance window',
    },
    {
      id: 'cfg003',
      type: 'rate_limit',
      key: 'api_rate_limit',
      value: '1000',
      version: '1.1.0',
      description: 'Update API rate limit to 1000 requests per minute',
    },
    {
      id: 'cfg004',
      type: 'security',
      key: 'password_policy',
      value: JSON.stringify({
        minLength: 8,
        requireUppercase: true,
        requireLowercase: true,
        requireNumbers: true,
        requireSpecialChars: true,
      }),
      version: '2.0.0',
      description: 'Enhanced password security requirements',
    },
    {
      id: 'cfg005',
      type: 'feature_flag',
      key: 'beta_features_enabled',
      value: 'false',
      version: '1.0.1',
      description: 'Disable beta features temporarily',
    },
  ];

  console.log('Broadcasting configuration updates...');

  for (const config of configUpdates) {
    const message = {
      topic: 'configuration-updates',
      tag: config.type,
      body: Buffer.from(JSON.stringify({
        ...config,
        timestamp: new Date().toISOString(),
        publisher: 'config-service',
      })),
      keys: [config.id, config.key],
      properties: {
        configId: config.id,
        configType: config.type,
        configVersion: config.version,
        priority: config.type === 'security' ? 'high' : 'normal',
      },
    };

    const result = await producer.send(message);
    console.log(`📡 Broadcast: ${config.type.padEnd(15)} ${config.key.padEnd(25)} -> v${config.version}`);
    console.log(`     ${config.description}`);
    console.log(`     Message ID: ${result.messageId}`);
    console.log('---');
  }

  await producer.shutdown();
  console.log('Configuration producer stopped');
}

// Service instance that receives configuration broadcasts
class ServiceInstance {
  constructor(name, instanceId) {
    this.name = name;
    this.instanceId = instanceId;
    this.config = new Map();
    this.startTime = new Date();
  }

  async startConsumer() {
    this.consumer = new Consumer({
      endpoints: 'localhost:9876',
      consumerGroup: 'config-consumer-group', // Same group for broadcast
      topic: 'configuration-updates',
      tag: '*',
      messageModel: MessageModel.BROADCASTING, // Key for broadcast consumption
      instanceName: `${this.name}-${this.instanceId}`,
    });

    await this.consumer.start();
    console.log(`[${this.name}-${this.instanceId}] Consumer started`);

    this.consumer.subscribe((msg, ack) => {
      this.handleConfigurationUpdate(msg, ack);
    });

    return this.consumer;
  }

  handleConfigurationUpdate(msg, ack) {
    try {
      const configUpdate = JSON.parse(msg.body.toString());
      const receivedTime = new Date();

      console.log(`\n📥 [${this.name}-${this.instanceId}] Configuration Update Received`);
      console.log(`    Config ID: ${configUpdate.id}`);
      console.log(`    Type: ${configUpdate.type}`);
      console.log(`    Key: ${configUpdate.key}`);
      console.log(`    New Value: ${configUpdate.value}`);
      console.log(`    Version: ${configUpdate.version}`);
      console.log(`    Description: ${configUpdate.description}`);
      console.log(`    Published: ${new Date(configUpdate.timestamp).toLocaleTimeString()}`);
      console.log(`    Received: ${receivedTime.toLocaleTimeString()}`);
      console.log(`    Latency: ${(receivedTime.getTime() - new Date(configUpdate.timestamp).getTime())}ms`);

      // Apply configuration update
      const previousValue = this.config.get(configUpdate.key);
      this.config.set(configUpdate.key, {
        value: configUpdate.value,
        version: configUpdate.version,
        updatedAt: configUpdate.timestamp,
        updatedAt: receivedTime.toISOString(),
      });

      console.log(`    Previous value: ${previousValue ? previousValue.value : 'NOT_SET'}`);
      console.log(`    Status: ${this.applyConfiguration(configUpdate)}`);

      // Show current configuration count
      console.log(`    Total configs: ${this.config.size}`);
      console.log('---');

      ack();
    } catch (err) {
      console.error(`[${this.name}-${this.instanceId}] Error handling config update:`, err);
      ack();
    }
  }

  applyConfiguration(configUpdate) {
    try {
      switch (configUpdate.type) {
        case 'feature_flag':
          const flagEnabled = configUpdate.value === 'true';
          console.log(`    → Feature flag '${configUpdate.key}' ${flagEnabled ? 'ENABLED' : 'DISABLED'}`);
          this.updateFeatureFlag(configUpdate.key, flagEnabled);
          break;

        case 'rate_limit':
          console.log(`    → Rate limit updated to ${configUpdate.value} requests/min`);
          this.updateRateLimit(parseInt(configUpdate.value));
          break;

        case 'security':
          console.log(`    → Security policy updated for '${configUpdate.key}'`);
          this.updateSecurityPolicy(configUpdate.key, JSON.parse(configUpdate.value));
          break;

        case 'maintenance':
          const maintenance = JSON.parse(configUpdate.value);
          console.log(`    → Maintenance scheduled: ${maintenance.scheduled}`);
          this.scheduleMaintenance(maintenance);
          break;

        default:
          console.log(`    → Generic configuration applied`);
      }

      return 'SUCCESS';
    } catch (err) {
      console.error(`    → Failed to apply configuration: ${err.message}`);
      return 'ERROR';
    }
  }

  // Simulate applying configurations
  updateFeatureFlag(key, enabled) {
    // In real service, this would toggle features
    console.log(`      [${this.name}] Feature '${key}' is now ${enabled ? 'ACTIVE' : 'INACTIVE'}`);
  }

  updateRateLimit(limit) {
    // In real service, this would update rate limiting
    console.log(`      [${this.name}] Rate limit set to ${limit}/min`);
  }

  updateSecurityPolicy(key, policy) {
    // In real service, this would update security policies
    console.log(`      [${this.name}] Security policy '${key}' updated`);
  }

  scheduleMaintenance(maintenance) {
    // In real service, this would prepare for maintenance
    console.log(`      [${this.name}] Preparing for maintenance window`);
  }

  async shutdown() {
    if (this.consumer) {
      await this.consumer.shutdown();
      console.log(`[${this.name}-${this.instanceId}] Consumer stopped`);
    }
  }

  printStatus() {
    console.log(`\n📊 [${this.name}-${this.instanceId}] Status:`);
    console.log(`    Uptime: ${Math.round((new Date() - this.startTime) / 1000)}s`);
    console.log(`    Configurations: ${this.config.size}`);

    if (this.config.size > 0) {
      console.log('    Active configs:');
      for (const [key, config] of this.config.entries()) {
        console.log(`      - ${key}: v${config.version}`);
      }
    }
  }
}

// Service health monitor
async function serviceHealthMonitor(services) {
  console.log('\n=== Service Health Monitor ===');

  const monitorInterval = setInterval(() => {
    console.log('\n' + '='.repeat(50));
    console.log(`Health Check - ${new Date().toLocaleTimeString()}`);
    console.log('='.repeat(50));

    services.forEach(service => {
      service.printStatus();
    });
  }, 8000);

  // Stop monitoring after 30 seconds
  setTimeout(() => {
    clearInterval(monitorInterval);
    console.log('\nHealth monitoring stopped');
  }, 30000);
}

// Demo cluster consumption vs broadcast consumption
async function demoConsumptionPatterns() {
  console.log('\n=== Consumption Patterns Demo ===');

  // Cluster consumer (load balancing)
  const clusterConsumer1 = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'cluster-consumer-group',
    topic: 'pattern-demo',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'cluster-consumer-1',
  });

  const clusterConsumer2 = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'cluster-consumer-group',
    topic: 'pattern-demo',
    tag: '*',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'cluster-consumer-2',
  });

  // Broadcast consumers (all get all messages)
  const broadcastConsumer1 = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'broadcast-consumer-group',
    topic: 'pattern-demo',
    tag: '*',
    messageModel: MessageModel.BROADCASTING,
    instanceName: 'broadcast-consumer-1',
  });

  const broadcastConsumer2 = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'broadcast-consumer-group',
    topic: 'pattern-demo',
    tag: '*',
    messageModel: MessageModel.BROADCASTING,
    instanceName: 'broadcast-consumer-2',
  });

  await clusterConsumer1.start();
  await clusterConsumer2.start();
  await broadcastConsumer1.start();
  await broadcastConsumer2.start();

  // Set up message handlers
  let clusterMessageCount = 0;
  let broadcast1MessageCount = 0;
  let broadcast2MessageCount = 0;

  clusterConsumer1.subscribe((msg, ack) => {
    clusterMessageCount++;
    console.log(`[CLUSTER-1] Got message ${clusterMessageCount}: ${msg.body.toString()}`);
    ack();
  });

  clusterConsumer2.subscribe((msg, ack) => {
    console.log(`[CLUSTER-2] Got message: ${msg.body.toString()}`);
    ack();
  });

  broadcastConsumer1.subscribe((msg, ack) => {
    broadcast1MessageCount++;
    console.log(`[BROADCAST-1] Got message ${broadcast1MessageCount}: ${msg.body.toString()}`);
    ack();
  });

  broadcastConsumer2.subscribe((msg, ack) => {
    broadcast2MessageCount++;
    console.log(`[BROADCAST-2] Got message ${broadcast2MessageCount}: ${msg.body.toString()}`);
    ack();
  });

  // Producer for demo messages
  const producer = new Producer({
    endpoints: 'localhost:9876',
    producerGroup: 'pattern-demo-producer',
  });

  await producer.start();

  // Send messages
  for (let i = 1; i <= 5; i++) {
    await producer.send({
      topic: 'pattern-demo',
      body: Buffer.from(`Demo message ${i}`),
    });
    await new Promise(resolve => setTimeout(resolve, 500));
  }

  // Wait for processing
  await new Promise(resolve => setTimeout(resolve, 3000));

  console.log('\n📈 Consumption Pattern Results:');
  console.log(`Cluster consumers: ${clusterMessageCount} total messages (load balanced)`);
  console.log(`Broadcast consumer 1: ${broadcast1MessageCount} messages (all received)`);
  console.log(`Broadcast consumer 2: ${broadcast2MessageCount} messages (all received)`);

  // Cleanup
  await producer.shutdown();
  await clusterConsumer1.shutdown();
  await clusterConsumer2.shutdown();
  await broadcastConsumer1.shutdown();
  await broadcastConsumer2.shutdown();
}

// Main function
async function main() {
  console.log('=== RocketMQ Broadcast Consumption Example ===');

  try {
    // Step 1: Create multiple service instances
    console.log('\n1. Creating service instances...');
    const services = [
      new ServiceInstance('api-service', 'instance-1'),
      new ServiceInstance('api-service', 'instance-2'),
      new ServiceInstance('api-service', 'instance-3'),
      new ServiceInstance('web-service', 'instance-1'),
      new ServiceInstance('web-service', 'instance-2'),
    ];

    // Step 2: Start consumers for all services
    console.log('\n2. Starting broadcast consumers...');
    const consumerPromises = services.map(service => service.startConsumer());
    await Promise.all(consumerPromises);

    // Step 3: Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Step 4: Start health monitor
    console.log('\n3. Starting service health monitor...');
    serviceHealthMonitor(services);

    // Step 5: Send configuration updates
    console.log('\n4. Broadcasting configuration updates...');
    await configurationProducer();

    // Step 6: Demo consumption patterns
    await new Promise(resolve => setTimeout(resolve, 3000));
    await demoConsumptionPatterns();

    // Step 7: Wait for all processing to complete
    console.log('\n5. Waiting for processing to complete...');
    await new Promise(resolve => setTimeout(resolve, 35000));

    // Step 8: Shutdown all services
    console.log('\n6. Shutting down all services...');
    const shutdownPromises = services.map(service => service.shutdown());
    await Promise.all(shutdownPromises);

    console.log('\nBroadcast consumption example completed!');
    console.log('\nKey Points:');
    console.log('- All instances in the same consumer group received every message');
    console.log('- Each service instance applied configurations independently');
    console.log('- Useful for configuration updates, cache invalidation, and notifications');
    console.log('- Contrast with clustering which load balances messages');
  } catch (err) {
    console.error('Error in broadcast consumption example:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Messages Transactionnels RocketMQ javascript

🔴 complex ⭐⭐⭐⭐⭐

Implémenter des transactions distribuées en utilisant la fonction de messages transactionnels de RocketMQ pour la cohérence éventuelle

⏱️ 30 min 🏷️ rocketmq, transaction, distributed, acid
Prerequisites: RocketMQ server running, Node.js, Advanced RocketMQ knowledge, Understanding of distributed transactions
// RocketMQ Transactional Messages Example
// Demonstrates distributed transactions with local and remote operations

const { Producer, Consumer, MessageModel, TransactionMQProducer } = require('apache-rocketmq');

// Simulated database for orders
class OrderDatabase {
  constructor() {
    this.orders = new Map();
    this.payments = new Map();
    this.inventories = new Map();
  }

  async createOrder(order) {
    console.log(`[DB] Creating order: ${order.orderId}`);
    // Simulate database operation
    await new Promise(resolve => setTimeout(resolve, 100));
    this.orders.set(order.orderId, { ...order, status: 'created' });
    return true;
  }

  async processPayment(payment) {
    console.log(`[DB] Processing payment: ${payment.paymentId}`);
    // Simulate payment processing
    await new Promise(resolve => setTimeout(resolve, 200));

    // Simulate occasional payment failures
    if (Math.random() < 0.2) {
      console.log(`[DB] Payment failed: ${payment.paymentId}`);
      return false;
    }

    this.payments.set(payment.paymentId, { ...payment, status: 'completed' });
    return true;
  }

  async updateInventory(inventoryUpdate) {
    console.log(`[DB] Updating inventory for: ${inventoryUpdate.productId}`);
    // Simulate inventory update
    await new Promise(resolve => setTimeout(resolve, 50));
    this.inventories.set(inventoryUpdate.productId, inventoryUpdate);
    return true;
  }

  async rollbackOrder(orderId) {
    console.log(`[DB] Rolling back order: ${orderId}`);
    const order = this.orders.get(orderId);
    if (order) {
      order.status = 'cancelled';
      this.orders.set(orderId, order);
    }
  }

  getOrderStatus(orderId) {
    const order = this.orders.get(orderId);
    return order ? order.status : 'not_found';
  }
}

// Transactional producer for order processing
async function createTransactionalProducer() {
  const producer = new TransactionMQProducer({
    endpoints: 'localhost:9876',
    producerGroup: 'order-transaction-producer',
    instanceName: 'order-transaction-producer',
    // Transaction check listener
    checkListener: async (msg) => {
      try {
        const transactionId = msg.properties.transactionId;
        const orderData = JSON.parse(msg.body.toString());

        console.log(`[TRANSACTION CHECK] Checking transaction ${transactionId} for order ${orderData.orderId}`);

        // Check local transaction status
        // In real application, you would query your database
        const orderStatus = global.orderDB.getOrderStatus(orderData.orderId);

        console.log(`[TRANSACTION CHECK] Order ${orderData.orderId} status: ${orderStatus}`);

        // Return transaction status
        if (orderStatus === 'created' || orderStatus === 'processing') {
          return { state: 'COMMIT' };
        } else if (orderStatus === 'cancelled') {
          return { state: 'ROLLBACK' };
        } else {
          return { state: 'UNKNOWN' };
        }
      } catch (err) {
        console.error('Transaction check error:', err);
        return { state: 'UNKNOWN' };
      }
    },
  });

  await producer.start();
  console.log('Transactional producer started');
  return producer;
}

// Order service with transactional messaging
class OrderService {
  constructor(producer, database) {
    this.producer = producer;
    this.db = database;
  }

  async processOrder(orderRequest) {
    const transactionId = `tx_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
    console.log(`\n🔄 Starting transaction ${transactionId} for order ${orderRequest.orderId}`);

    // Step 1: Send half message (prepared message)
    const halfMessage = {
      topic: 'order-events',
      tag: 'order.created',
      body: Buffer.from(JSON.stringify({
        ...orderRequest,
        transactionId: transactionId,
        timestamp: new Date().toISOString(),
      })),
      keys: [orderRequest.orderId, transactionId],
      properties: {
        transactionId: transactionId,
        orderId: orderRequest.orderId,
      },
    };

    console.log(`[1/4] Sending half message for order ${orderRequest.orderId}`);
    const sendResult = await this.producer.sendTransactionMessage(halfMessage);

    try {
      // Step 2: Execute local transaction
      console.log(`[2/4] Executing local transaction for order ${orderRequest.orderId}`);

      const localTxSuccess = await this.executeLocalTransaction(orderRequest, transactionId);

      if (localTxSuccess) {
        // Step 3: Commit transaction
        console.log(`[3/4] Committing transaction ${transactionId}`);
        await this.producer.commitTransaction(sendResult.transactionId);
        console.log(`[4/4] ✅ Transaction ${transactionId} COMMITTED for order ${orderRequest.orderId}`);
        return { success: true, transactionId };
      } else {
        // Step 4: Rollback transaction
        console.log(`[3/4] Rolling back transaction ${transactionId}`);
        await this.producer.rollbackTransaction(sendResult.transactionId);
        console.log(`[4/4] ❌ Transaction ${transactionId} ROLLED BACK for order ${orderRequest.orderId}`);
        return { success: false, transactionId };
      }
    } catch (err) {
      console.error(`Local transaction failed for ${transactionId}:`, err);
      console.log(`[3/4] Rolling back transaction ${transactionId} due to error`);
      await this.producer.rollbackTransaction(sendResult.transactionId);
      throw err;
    }
  }

  async executeLocalTransaction(orderRequest, transactionId) {
    try {
      // Create order in local database
      const orderSuccess = await this.db.createOrder({
        ...orderRequest,
        status: 'processing',
        transactionId: transactionId,
      });

      if (!orderSuccess) {
        await this.db.rollbackOrder(orderRequest.orderId);
        return false;
      }

      // Process payment
      const paymentSuccess = await this.db.processPayment({
        paymentId: `pay_${orderRequest.orderId}`,
        orderId: orderRequest.orderId,
        amount: orderRequest.amount,
        transactionId: transactionId,
      });

      if (!paymentSuccess) {
        await this.db.rollbackOrder(orderRequest.orderId);
        return false;
      }

      // Update inventory
      for (const item of orderRequest.items) {
        const inventorySuccess = await this.db.updateInventory({
          productId: item.productId,
          quantity: -item.quantity, // Decrease inventory
          orderId: orderRequest.orderId,
          transactionId: transactionId,
        });

        if (!inventorySuccess) {
          await this.db.rollbackOrder(orderRequest.orderId);
          return false;
        }
      }

      // Mark order as completed
      const order = this.db.orders.get(orderRequest.orderId);
      order.status = 'completed';
      this.db.orders.set(orderRequest.orderId, order);

      console.log(`[LOCAL TX] All operations completed for order ${orderRequest.orderId}`);
      return true;

    } catch (err) {
      console.error(`[LOCAL TX] Error executing transaction for order ${orderRequest.orderId}:`, err);
      await this.db.rollbackOrder(orderRequest.orderId);
      return false;
    }
  }
}

// Order event consumer
async function orderEventConsumer() {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'order-event-consumer',
    topic: 'order-events',
    tag: '*',
    messageModel: MessageModel.BROADCASTING, // All consumers get all messages
    instanceName: 'order-event-consumer',
  });

  await consumer.start();
  console.log('Order event consumer started');

  consumer.subscribe((msg, ack) => {
    try {
      const orderEvent = JSON.parse(msg.body.toString());
      const now = new Date();

      console.log('\n📦 ORDER EVENT RECEIVED');
      console.log(`   Order ID: ${orderEvent.orderId}`);
      console.log(`   Transaction ID: ${orderEvent.transactionId}`);
      console.log(`   Amount: $${orderEvent.amount}`);
      console.log(`   Items: ${orderEvent.items.map(i => i.productId).join(', ')}`);
      console.log(`   Created At: ${new Date(orderEvent.timestamp).toLocaleTimeString()}`);
      console.log(`   Received At: ${now.toLocaleTimeString()}`);
      console.log(`   Processing Delay: ${(now.getTime() - new Date(orderEvent.timestamp).getTime())}ms`);

      // Here you would:
      // 1. Update search indexes
      // 2. Send notifications
      // 3. Update analytics
      // 4. Trigger other downstream services
      console.log('   → Processing downstream services...');

      // Simulate downstream processing
      setTimeout(() => {
        console.log(`   → Downstream processing completed for ${orderEvent.orderId}`);
      }, 100);

      console.log('---');
      ack();
    } catch (err) {
      console.error('Error processing order event:', err);
      ack();
    }
  });

  return consumer;
}

// Payment verification service (consumer)
async function paymentVerificationConsumer() {
  const consumer = new Consumer({
    endpoints: 'localhost:9876',
    consumerGroup: 'payment-verification-consumer',
    topic: 'order-events',
    tag: 'order.created',
    messageModel: MessageModel.CLUSTERING,
    instanceName: 'payment-verification-consumer',
  });

  await consumer.start();
  console.log('Payment verification consumer started');

  consumer.subscribe((msg, ack) => {
    try {
      const orderEvent = JSON.parse(msg.body.toString());

      console.log(`\n💳 Payment verification for order ${orderEvent.orderId}`);

      // In a real system, you might:
      // 1. Verify payment with external payment gateway
      // 2. Update fraud detection systems
      // 3. Send payment confirmation to customer

      const paymentStatus = Math.random() > 0.1 ? 'VERIFIED' : 'REQUIRES_REVIEW';
      console.log(`   → Payment status: ${paymentStatus}`);

      if (paymentStatus === 'REQUIRES_REVIEW') {
        console.log(`   → Order ${orderEvent.orderId} flagged for manual review`);
      }

      ack();
    } catch (err) {
      console.error('Error in payment verification:', err);
      ack();
    }
  });

  return consumer;
}

// Main function
async function main() {
  console.log('=== RocketMQ Transactional Messages Example ===');

  // Initialize database
  global.orderDB = new OrderDatabase();

  try {
    // Step 1: Create transactional producer
    console.log('\n1. Setting up transactional producer...');
    const producer = await createTransactionalProducer();

    // Step 2: Start consumers
    console.log('\n2. Starting consumers...');
    const orderConsumerPromise = orderEventConsumer();
    const paymentConsumerPromise = paymentVerificationConsumer();

    // Step 3: Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 2000));

    // Step 4: Create order service
    console.log('\n3. Creating order service...');
    const orderService = new OrderService(producer, global.orderDB);

    // Step 5: Process multiple orders
    console.log('\n4. Processing orders...');
    const orders = [
      {
        orderId: 'TX-ORDER-001',
        userId: 'user-tx-001',
        amount: 299.99,
        items: [
          { productId: 'LAPTOP-001', quantity: 1, price: 199.99 },
          { productId: 'MOUSE-001', quantity: 1, price: 49.99 },
          { productId: 'KEYBOARD-001', quantity: 1, price: 50.01 },
        ],
      },
      {
        orderId: 'TX-ORDER-002',
        userId: 'user-tx-002',
        amount: 149.99,
        items: [
          { productId: 'MONITOR-001', quantity: 1, price: 149.99 },
        ],
      },
      {
        orderId: 'TX-ORDER-003',
        userId: 'user-tx-003',
        amount: 89.97,
        items: [
          { productId: 'HEADPHONE-001', quantity: 2, price: 44.99 },
        ],
      },
    ];

    // Process orders with delays
    for (let i = 0; i < orders.length; i++) {
      const order = orders[i];
      console.log(`\n--- Processing Order ${i + 1}/${orders.length} ---`);

      const result = await orderService.processOrder(order);

      if (result.success) {
        console.log(`✅ Order ${order.orderId} processed successfully`);
      } else {
        console.log(`❌ Order ${order.orderId} processing failed`);
      }

      // Wait between orders
      if (i < orders.length - 1) {
        await new Promise(resolve => setTimeout(resolve, 3000));
      }
    }

    // Step 6: Wait for message processing
    console.log('\n5. Waiting for message processing...');
    await new Promise(resolve => setTimeout(resolve, 10000));

    // Step 7: Shutdown
    console.log('\n6. Shutting down...');
    await producer.shutdown();

    const orderConsumer = await orderConsumerPromise;
    const paymentConsumer = await paymentConsumerPromise;
    await orderConsumer.shutdown();
    await paymentConsumer.shutdown();

    console.log('\nTransactional messages example completed!');
    console.log('\nSummary:');
    console.log('- Demonstrated distributed transactions with local DB ops and remote messaging');
    console.log('- Transaction consistency maintained even with failures');
    console.log('- Downstream services only receive committed transactions');
  } catch (err) {
    console.error('Error in transactional messages example:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});