Exemples Apache Pulsar

Exemples Apache Pulsar incluant pub/sub, lecteurs, consommateurs avec différents types d'abonnement, gestion de schémas et stockage hiérarchique pour messagerie d'entreprise

Key Facts

Category
Messaging
Items
5
Format Families
json

Sample Overview

Exemples Apache Pulsar incluant pub/sub, lecteurs, consommateurs avec différents types d'abonnement, gestion de schémas et stockage hiérarchique pour messagerie d'entreprise This sample set belongs to Messaging and can be used to test related workflows inside Elysia Tools.

💻 Pulsar Hello World - Producteur/Consommateur de Base javascript

🟢 simple

Exemple simple de producteur et consommateur Apache Pulsar démontrant le modèle de base publication/abonnement

⏱️ 10 min 🏷️ pulsar, messaging, beginner
Prerequisites: Pulsar cluster running, Node.js
// Apache Pulsar Hello World - Basic Producer/Consumer Example
// Install: npm install pulsar-client

const Pulsar = require('pulsar-client');

async function helloWorld() {
  // Create Pulsar client
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
    operationTimeoutSeconds: 30,
  });

  console.log('Connected to Pulsar cluster');

  // Create producer
  const producer = await client.createProducer({
    topic: 'persistent://public/default/hello-world',
    sendTimeoutMs: 30000,
    batchingEnabled: true,
  });

  console.log('Producer created');

  // Create consumer
  const consumer = await client.subscribe({
    topic: 'persistent://public/default/hello-world',
    subscription: 'hello-subscription',
    subscriptionType: 'Shared',
    ackTimeoutMs: 10000,
  });

  console.log('Consumer created');

  // Start consumer in background
  (async () => {
    while (true) {
      try {
        const msg = await consumer.receive();
        console.log(`Received message: ${msg.getData().toString()}`);
        await consumer.acknowledge(msg);
      } catch (err) {
        console.error('Error receiving message:', err);
        break;
      }
    }
  })();

  // Wait a moment for consumer to be ready
  await new Promise(resolve => setTimeout(resolve, 1000));

  // Send messages
  console.log('Sending messages...');
  const messages = [
    'Hello Pulsar!',
    'Apache Pulsar is awesome!',
    'Streaming made easy!',
    'Hello from producer!'
  ];

  for (const message of messages) {
    await producer.send({
      data: Buffer.from(message),
      properties: {
        source: 'hello-world-example',
        timestamp: new Date().toISOString(),
      },
    });
    console.log(`Sent: ${message}`);
    await new Promise(resolve => setTimeout(resolve, 500));
  }

  // Wait for messages to be processed
  await new Promise(resolve => setTimeout(resolve, 2000));

  // Close producer, consumer, and client
  await producer.close();
  await consumer.close();
  await client.close();

  console.log('Pulsar example completed');
}

helloWorld().catch(err => {
  console.error('Error:', err);
});

💻 Types d'Abonnement Pulsar javascript

🟡 intermediate ⭐⭐⭐

Explorer différents types d'abonnement: Exclusive, Shared, Failover et Key_Shared

⏱️ 20 min 🏷️ pulsar, subscriptions, patterns
Prerequisites: Pulsar cluster running, Node.js, Pulsar basics
// Apache Pulsar Subscription Types Example
// Demonstrates: Exclusive, Shared, Failover, and Key_Shared

const Pulsar = require('pulsar-client');

// Consumer configuration for different subscription types
const subscriptionConfigs = {
  // Only one consumer can receive messages
  exclusive: {
    topic: 'persistent://public/default/subscription-demo',
    subscription: 'exclusive-sub',
    subscriptionType: 'Exclusive',
  },
  // Load balance messages across multiple consumers
  shared: {
    topic: 'persistent://public/default/subscription-demo',
    subscription: 'shared-sub',
    subscriptionType: 'Shared',
  },
  // One active consumer, others are standbys
  failover: {
    topic: 'persistent://public/default/subscription-demo',
    subscription: 'failover-sub',
    subscriptionType: 'Failover',
  },
  // Load balance with message key ordering
  key_shared: {
    topic: 'persistent://public/default/subscription-demo',
    subscription: 'key-shared-sub',
    subscriptionType: 'Key_Shared',
  },
};

// Create a consumer with specific configuration
async function createConsumer(client, config, consumerId) {
  try {
    const consumer = await client.subscribe(config);
    console.log(`Consumer ${consumerId} created with type: ${config.subscriptionType}`);

    // Start consuming messages
    (async () => {
      while (true) {
        try {
          const msg = await consumer.receive();
          const messageData = msg.getData().toString();
          const messageKey = msg.getMessageId();
          const properties = msg.getProperties();

          console.log(`[${consumerId}] Received: ${messageData}`);
          console.log(`[${consumerId}] Properties: ${JSON.stringify(properties)}`);

          await consumer.acknowledge(msg);
        } catch (err) {
          console.error(`[${consumerId}] Error receiving message:`, err);
          break;
        }
      }
    })();

    return consumer;
  } catch (err) {
    console.error(`Failed to create consumer ${consumerId}:`, err);
    return null;
  }
}

// Producer that sends messages with keys
async function messageProducer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producer = await client.createProducer({
    topic: 'persistent://public/default/subscription-demo',
    batchingEnabled: true,
    blockIfQueueFull: true,
  });

  console.log('Producer started');

  // Send messages with different keys
  const messages = [
    { key: 'user.1', content: 'User 1 login' },
    { key: 'user.2', content: 'User 2 login' },
    { key: 'order.1', content: 'Order created' },
    { key: 'user.1', content: 'User 1 logout' },
    { key: 'order.2', content: 'Order processed' },
    { key: 'user.2', content: 'User 2 update' },
    { key: 'order.1', content: 'Order shipped' },
    { key: 'user.1', content: 'User 1 login again' },
  ];

  for (const msg of messages) {
    await producer.send({
      data: Buffer.from(msg.content),
      partitionKey: msg.key,
      properties: {
        messageType: 'demo',
        category: msg.key.split('.')[0],
        id: msg.key.split('.')[1],
      },
    });

    console.log(`Sent message with key '${msg.key}': ${msg.content}`);
    await new Promise(resolve => setTimeout(resolve, 300));
  }

  await producer.close();
  await client.close();
}

// Demo different subscription types
async function demoSubscriptionTypes() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('=== Pulsar Subscription Types Demo ===\n');

  // Demo 1: Exclusive subscription (only one consumer allowed)
  console.log('1. EXCLUSIVE SUBSCRIPTION');
  console.log('   - Only one consumer can subscribe');
  console.log('   - All messages go to that single consumer\n');

  const exclusiveConsumer1 = await createConsumer(
    client,
    subscriptionConfigs.exclusive,
    'EXCLUSIVE-1'
  );

  // Demo 2: Shared subscription (load balancing)
  console.log('2. SHARED SUBSCRIPTION');
  console.log('   - Multiple consumers can subscribe');
  console.log('   - Messages are load balanced across consumers\n');

  const sharedConsumer1 = await createConsumer(
    client,
    subscriptionConfigs.shared,
    'SHARED-1'
  );

  const sharedConsumer2 = await createConsumer(
    client,
    subscriptionConfigs.shared,
    'SHARED-2'
  );

  // Demo 3: Failover subscription (active-standby)
  console.log('3. FAILOVER SUBSCRIPTION');
  console.log('   - Multiple consumers can subscribe');
  console.log('   - Only one is active, others are standbys\n');

  const failoverConsumer1 = await createConsumer(
    client,
    subscriptionConfigs.failover,
    'FAILOVER-1'
  );

  const failoverConsumer2 = await createConsumer(
    client,
    subscriptionConfigs.failover,
    'FAILOVER-2'
  );

  // Demo 4: Key_Shared subscription (key-based ordering)
  console.log('4. KEY_SHARED SUBSCRIPTION');
  console.log('   - Multiple consumers can subscribe');
  console.log('   - Messages with same key go to same consumer\n');

  const keySharedConsumer1 = await createConsumer(
    client,
    subscriptionConfigs.key_shared,
    'KEY-SHARED-1'
  );

  const keySharedConsumer2 = await createConsumer(
    client,
    subscriptionConfigs.key_shared,
    'KEY-SHARED-2'
  );

  // Wait for consumers to be ready
  await new Promise(resolve => setTimeout(resolve, 1000));

  // Start producer to send messages
  await messageProducer();

  // Wait for message processing
  await new Promise(resolve => setTimeout(resolve, 5000));

  // Clean up
  console.log('\nCleaning up consumers...');
  await exclusiveConsumer1.close();
  await sharedConsumer1.close();
  await sharedConsumer2.close();
  await failoverConsumer1.close();
  await failoverConsumer2.close();
  await keySharedConsumer1.close();
  await keySharedConsumer2.close();
  await client.close();
}

demoSubscriptionTypes().catch(err => {
  console.error('Error:', err);
});

💻 API de Lecteur Pulsar javascript

🟡 intermediate ⭐⭐⭐

Utiliser l'API de Lecteur Pulsar pour lire des messages depuis des positions spécifiques avec contrôle de curseur

⏱️ 20 min 🏷️ pulsar, reader, cursor, api
Prerequisites: Pulsar cluster running, Node.js, Pulsar basics
// Apache Pulsar Reader API Example
// Demonstrates reading messages with cursor control

const Pulsar = require('pulsar-client');

// Producer to populate topic with messages
async function populateTopic() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producer = await client.createProducer({
    topic: 'persistent://public/default/reader-demo',
    batchingEnabled: true,
  });

  console.log('Populating topic with messages...');

  // Send messages with timestamps
  const messages = [];
  const startTime = Date.now();

  for (let i = 1; i <= 20; i++) {
    const message = {
      id: `msg-${i}`,
      content: `Message number ${i}`,
      timestamp: startTime + (i * 1000),
      batch: Math.floor((i - 1) / 5) + 1,
    };

    await producer.send({
      data: Buffer.from(JSON.stringify(message)),
      properties: {
        messageId: message.id,
        batch: message.batch.toString(),
      },
    });

    messages.push(message);
    console.log(`Sent: ${message.content}`);

    // Small delay between messages
    await new Promise(resolve => setTimeout(resolve, 100));
  }

  await producer.close();
  await client.close();

  return messages;
}

// Reader that starts from the beginning
async function readFromBeginning() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Reading from beginning ===');

  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.earliest(),
  });

  let messageCount = 0;
  console.log('Reading first 5 messages from beginning:');

  while (messageCount < 5) {
    try {
      const msg = await reader.readNext();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  ${messageData.content} (ID: ${messageData.id})`);
      messageCount++;
    } catch (err) {
      console.error('Error reading message:', err);
      break;
    }
  }

  await reader.close();
  await client.close();
}

// Reader that starts from the latest message
async function readFromLatest() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Reading from latest ===');

  // Wait a moment to ensure we're at the latest
  await new Promise(resolve => setTimeout(resolve, 500));

  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.latest(),
  });

  // Should not receive any messages since we're reading from latest
  // and no new messages have been produced
  console.log('Reading from latest position (should be empty)...');

  try {
    // Set a timeout for reading
    const msg = await Promise.race([
      reader.readNext(),
      new Promise((_, reject) =>
        setTimeout(() => reject(new Error('Timeout')), 1000)
      )
    ]);
    console.log('Unexpected message received');
  } catch (err) {
    if (err.message === 'Timeout') {
      console.log('No messages available at latest position (expected)');
    } else {
      console.error('Error:', err);
    }
  }

  await reader.close();
  await client.close();
}

// Reader that starts from a specific message ID
async function readFromSpecificMessage(messages) {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Reading from specific message ===');

  // First, read all messages to find a specific message ID
  const tempReader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.earliest(),
  });

  let targetMessageId = null;
  let messageCount = 0;

  // Skip first 10 messages
  while (messageCount < 10) {
    try {
      await tempReader.readNext();
      messageCount++;
    } catch (err) {
      break;
    }
  }

  // Get the next message ID as our target
  try {
    const targetMsg = await tempReader.readNext();
    const targetData = JSON.parse(targetMsg.getData().toString());
    console.log(`Found target message: ${targetData.content}`);
    targetMessageId = targetMsg.getMessageId();
  } catch (err) {
    console.error('Error finding target message:', err);
    await tempReader.close();
    await client.close();
    return;
  }

  await tempReader.close();

  // Now create a reader starting from that specific message
  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: targetMessageId,
  });

  console.log('Reading messages from message 11 onwards:');
  messageCount = 0;

  while (messageCount < 5) {
    try {
      const msg = await reader.readNext();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  ${messageData.content} (ID: ${messageData.id})`);
      messageCount++;
    } catch (err) {
      console.error('Error reading message:', err);
      break;
    }
  }

  await reader.close();
  await client.close();
}

// Reader with timestamp-based start
async function readFromTimestamp() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Reading from timestamp ===');

  // Calculate timestamp for halfway through the messages
  const halfTime = Date.now() - 10000; // 10 seconds ago

  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.timestamp(halfTime),
  });

  console.log(`Reading messages from timestamp: ${new Date(halfTime)}`);

  let messageCount = 0;
  while (messageCount < 5) {
    try {
      const msg = await reader.readNext();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  ${messageData.content} (Time: ${new Date(messageData.timestamp)})`);
      messageCount++;
    } catch (err) {
      console.error('Error reading message:', err);
      break;
    }
  }

  await reader.close();
  await client.close();
}

// Compare reader vs consumer
async function compareReaderConsumer() {
  console.log('\n=== Reader vs Consumer Comparison ===');

  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  // Create a reader
  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.earliest(),
  });

  // Create a consumer
  const consumer = await client.subscribe({
    topic: 'persistent://public/default/reader-demo',
    subscription: 'comparison-sub',
    subscriptionType: 'Exclusive',
  });

  console.log('Reader and Consumer both reading from beginning...');

  // Reader reads without acknowledging
  console.log('\nReader messages (no ack needed):');
  for (let i = 0; i < 3; i++) {
    try {
      const msg = await reader.readNext();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  Reader: ${messageData.content}`);
    } catch (err) {
      break;
    }
  }

  // Consumer reads and acknowledges
  console.log('\nConsumer messages (requires ack):');
  for (let i = 0; i < 3; i++) {
    try {
      const msg = await consumer.receive();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  Consumer: ${messageData.content}`);
      await consumer.acknowledge(msg);
    } catch (err) {
      break;
    }
  }

  console.log('\nKey differences:');
  console.log('- Reader: No subscription, no ack needed, can start from any position');
  console.log('- Consumer: Has subscription, requires ack, tracks cursor per subscription');

  await reader.close();
  await consumer.close();
  await client.close();
}

// Main function
async function main() {
  console.log('=== Pulsar Reader API Demo ===');

  try {
    // Step 1: Populate the topic
    console.log('Step 1: Populating topic with messages...');
    await populateTopic();

    // Step 2: Demonstrate different reading patterns
    await new Promise(resolve => setTimeout(resolve, 1000));
    await readFromBeginning();

    await new Promise(resolve => setTimeout(resolve, 1000));
    await readFromLatest();

    await new Promise(resolve => setTimeout(resolve, 1000));
    await readFromSpecificMessage();

    await new Promise(resolve => setTimeout(resolve, 1000));
    await readFromTimestamp();

    await new Promise(resolve => setTimeout(resolve, 1000));
    await compareReaderConsumer();

    console.log('\nReader API demo completed!');
  } catch (err) {
    console.error('Error in reader demo:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Registre de Schémas Pulsar javascript

🔴 complex ⭐⭐⭐⭐

Utiliser le Registre de Schémas Pulsar avec des schémas JSON, Avro et Protobuf pour les données structurées

⏱️ 25 min 🏷️ pulsar, schema, registry, structured-data
Prerequisites: Pulsar cluster with schema registry, Node.js, Advanced Pulsar knowledge
// Apache Pulsar Schema Registry Example
// Demonstrates using JSON, Avro, and Protobuf schemas

const Pulsar = require('pulsar-client');

// Define JSON schema for user events
const userEventSchema = {
  type: 'JSON',
  schema: `{
    "type": "record",
    "name": "UserEvent",
    "fields": [
      {"name": "userId", "type": "string"},
      {"name": "eventType", "type": "string"},
      {"name": "timestamp", "type": "long"},
      {"name": "data", "type": ["null", {
        "type": "record",
        "name": "EventData",
        "fields": [
          {"name": "email", "type": ["null", "string"]},
          {"name": "name", "type": ["null", "string"]},
          {"name": "action", "type": ["null", "string"]}
        ]
      }], "default": null}
    ]
  }`,
};

// Define Avro schema for order events
const orderEventSchema = {
  type: 'AVRO',
  schema: `{
    "type": "record",
    "name": "OrderEvent",
    "fields": [
      {"name": "orderId", "type": "string"},
      {"name": "customerId", "type": "string"},
      {"name": "eventType", "type": "string"},
      {"name": "amount", "type": "double"},
      {"name": "currency", "type": "string"},
      {"name": "timestamp", "type": "long"},
      {"name": "items", "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }
      }}
    ]
  }`,
};

// Producer with JSON schema for user events
async function userEventProducer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('Creating user event producer with JSON schema...');

  const producer = await client.createProducer({
    topic: 'persistent://public/default/user-events',
    schema: userEventSchema,
  });

  // User events to publish
  const userEvents = [
    {
      userId: 'user123',
      eventType: 'user.created',
      timestamp: Date.now(),
      data: {
        email: '[email protected]',
        name: 'John Doe',
        action: 'registration',
      },
    },
    {
      userId: 'user123',
      eventType: 'user.updated',
      timestamp: Date.now(),
      data: {
        email: '[email protected]',
        name: 'John Smith',
        action: 'profile_update',
      },
    },
    {
      userId: 'user456',
      eventType: 'user.login',
      timestamp: Date.now(),
      data: {
        action: 'authentication',
      },
    },
  ];

  for (const event of userEvents) {
    console.log(`Publishing user event: ${event.eventType}`);
    await producer.send({
      data: Buffer.from(JSON.stringify(event)),
    });
  }

  await producer.close();
  await client.close();
}

// Producer with Avro schema for order events
async function orderEventProducer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('Creating order event producer with Avro schema...');

  const producer = await client.createProducer({
    topic: 'persistent://public/default/order-events',
    schema: orderEventSchema,
  });

  // Order events to publish
  const orderEvents = [
    {
      orderId: 'order789',
      customerId: 'user123',
      eventType: 'order.created',
      amount: 99.99,
      currency: 'USD',
      timestamp: Date.now(),
      items: [
        {
          productId: 'prod001',
          quantity: 2,
          price: 49.99,
        },
      ],
    },
    {
      orderId: 'order790',
      customerId: 'user456',
      eventType: 'order.created',
      amount: 149.99,
      currency: 'USD',
      timestamp: Date.now(),
      items: [
        {
          productId: 'prod002',
          quantity: 1,
          price: 149.99,
        },
      ],
    },
  ];

  for (const event of orderEvents) {
    console.log(`Publishing order event: ${event.eventType}`);
    await producer.send({
      data: Buffer.from(JSON.stringify(event)),
    });
  }

  await producer.close();
  await client.close();
}

// Consumer for user events with JSON schema
async function userEventConsumer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('Creating user event consumer with JSON schema...');

  const consumer = await client.subscribe({
    topic: 'persistent://public/default/user-events',
    subscription: 'user-events-sub',
    subscriptionType: 'Shared',
    schema: userEventSchema,
  });

  console.log('Consuming user events...');
  let messageCount = 0;

  while (messageCount < 3) {
    try {
      const msg = await consumer.receive();
      const eventData = JSON.parse(msg.getData().toString());

      console.log(`User Event Received:`);
      console.log(`  - User ID: ${eventData.userId}`);
      console.log(`  - Event Type: ${eventData.eventType}`);
      console.log(`  - Timestamp: ${new Date(eventData.timestamp)}`);
      if (eventData.data) {
        console.log(`  - Data: ${JSON.stringify(eventData.data)}`);
      }
      console.log('');

      await consumer.acknowledge(msg);
      messageCount++;
    } catch (err) {
      console.error('Error consuming user event:', err);
      break;
    }
  }

  await consumer.close();
  await client.close();
}

// Consumer for order events with Avro schema
async function orderEventConsumer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('Creating order event consumer with Avro schema...');

  const consumer = await client.subscribe({
    topic: 'persistent://public/default/order-events',
    subscription: 'order-events-sub',
    subscriptionType: 'Shared',
    schema: orderEventSchema,
  });

  console.log('Consuming order events...');
  let messageCount = 0;

  while (messageCount < 2) {
    try {
      const msg = await consumer.receive();
      const eventData = JSON.parse(msg.getData().toString());

      console.log(`Order Event Received:`);
      console.log(`  - Order ID: ${eventData.orderId}`);
      console.log(`  - Customer ID: ${eventData.customerId}`);
      console.log(`  - Event Type: ${eventData.eventType}`);
      console.log(`  - Amount: ${eventData.amount} ${eventData.currency}`);
      console.log(`  - Items Count: ${eventData.items.length}`);
      console.log('');

      await consumer.acknowledge(msg);
      messageCount++;
    } catch (err) {
      console.error('Error consuming order event:', err);
      break;
    }
  }

  await consumer.close();
  await client.close();
}

// Main function to run the schema demo
async function main() {
  console.log('=== Pulsar Schema Registry Demo ===\n');

  try {
    // Start consumers first
    console.log('1. Starting consumers...\n');

    const userConsumerPromise = userEventConsumer();
    const orderConsumerPromise = orderEventConsumer();

    // Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 1000));

    // Produce events
    console.log('2. Producing events...\n');
    await userEventProducer();
    await new Promise(resolve => setTimeout(resolve, 500));
    await orderEventProducer();

    // Wait for consumption to complete
    await userConsumerPromise;
    await orderConsumerPromise;

    console.log('Schema registry demo completed successfully!');
  } catch (err) {
    console.error('Error in schema demo:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Multi-Tenancy Pulsar javascript

🔴 complex ⭐⭐⭐⭐⭐

Implémenter une architecture multi-tenant avec espaces de noms, isolation et gestion des ressources

⏱️ 30 min 🏷️ pulsar, multi-tenancy, namespaces
Prerequisites: Pulsar cluster with multi-tenancy enabled, Node.js, Advanced Pulsar knowledge
// Apache Pulsar Multi-Tenancy Example
// Demonstrates namespace isolation and tenant-specific configurations

const Pulsar = require('pulsar-client');

// Tenant configurations
const tenants = {
  'company-a': {
    name: 'Company A',
    topics: [
      'persistent://company-a/production/orders',
      'persistent://company-a/production/users',
      'persistent://company-a/staging/logs',
    ],
    producers: 2,
    consumers: 3,
  },
  'company-b': {
    name: 'Company B',
    topics: [
      'persistent://company-b/analytics/events',
      'persistent://company-b/analytics/metrics',
      'persistent://company-b/notifications/emails',
    ],
    producers: 1,
    consumers: 2,
  },
};

// Create producers for a specific tenant
async function createTenantProducers(tenantId, tenantConfig) {
  const producers = [];

  for (let i = 0; i < tenantConfig.producers; i++) {
    const client = new Pulsar.Client({
      serviceUrl: 'pulsar://localhost:6650',
    });

    const producer = await client.createProducer({
      topic: tenantConfig.topics[i % tenantConfig.topics.length],
      producerName: `${tenantId}-producer-${i + 1}`,
      batchingEnabled: true,
      blockIfQueueFull: true,
      sendTimeoutMs: 30000,
    });

    producers.push({ client, producer, topicIndex: i });
    console.log(`Created ${tenantId} producer ${i + 1}`);
  }

  return producers;
}

// Create consumers for a specific tenant
async function createTenantConsumers(tenantId, tenantConfig) {
  const consumers = [];

  for (let i = 0; i < tenantConfig.consumers; i++) {
    const client = new Pulsar.Client({
      serviceUrl: 'pulsar://localhost:6650',
    });

    const topic = tenantConfig.topics[i % tenantConfig.topics.length];
    const consumer = await client.subscribe({
      topic: topic,
      subscription: `${tenantId}-subscription-${i + 1}`,
      subscriptionType: 'Shared',
      consumerName: `${tenantId}-consumer-${i + 1}`,
      ackTimeoutMs: 10000,
    });

    // Start consuming messages
    (async () => {
      while (true) {
        try {
          const msg = await consumer.receive();
          const messageData = JSON.parse(msg.getData().toString());
          console.log(`[${tenantId.toUpperCase()}] ${consumer.getConsumerName()}: ${messageData.event}`);
          await consumer.acknowledge(msg);
        } catch (err) {
          console.error(`[${tenantId.toUpperCase()}] Consumer error:`, err);
          break;
        }
      }
    })();

    consumers.push({ client, consumer });
    console.log(`Created ${tenantId} consumer ${i + 1}`);
  }

  return consumers;
}

// Produce messages for a tenant
async function produceTenantMessages(producerInfo, tenantId, tenantConfig) {
  const { producer, topicIndex } = producerInfo;
  const topic = tenantConfig.topics[topicIndex];

  // Extract topic name from full topic path
  const topicName = topic.split('/').pop();

  const events = [];
  for (let i = 1; i <= 10; i++) {
    events.push({
      tenant: tenantId,
      event: `${topicName}-event-${i}`,
      timestamp: Date.now(),
      producerId: producer.getProducerName(),
      data: {
        message: `Event ${i} from ${tenantConfig.name}`,
        metadata: {
          tenant: tenantId,
          topic: topicName,
          batch: Math.ceil(i / 3),
        },
      },
    });
  }

  for (const event of events) {
    await producer.send({
      data: Buffer.from(JSON.stringify(event)),
      properties: {
        tenant: tenantId,
        topic: topicName,
        producer: producer.getProducerName(),
      },
    });

    console.log(`[${tenantId.toUpperCase()}] ${producer.getProducerName()}: Sent ${event.event}`);
    await new Promise(resolve => setTimeout(resolve, 200));
  }
}

// Monitor cross-tenant message flow
async function crossTenantMonitor() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Cross-Tenant Monitor ===');
  console.log('Monitoring messages across all tenants...');

  // Monitor Company A topics
  const companyAReader = await client.createReader({
    topic: 'persistent://company-a/production/orders',
    startMessageId: Pulsar.MessageId.latest(),
  });

  // Monitor Company B topics
  const companyBReader = await client.createReader({
    topic: 'persistent://company-b/analytics/events',
    startMessageId: Pulsar.MessageId.latest(),
  });

  // Monitor for new messages
  const monitorMessages = async (reader, tenantId) => {
    while (true) {
      try {
        const msg = await Promise.race([
          reader.readNext(),
          new Promise((_, reject) =>
            setTimeout(() => reject(new Error('Timeout')), 2000)
          )
        ]);
        const messageData = JSON.parse(msg.getData().toString());
        console.log(`[MONITOR] ${tenantId}: ${messageData.event}`);
      } catch (err) {
        if (err.message === 'Timeout') {
          // Continue monitoring
          continue;
        } else {
          console.error(`[MONITOR] ${tenantId} error:`, err);
          break;
        }
      }
    }
  };

  // Start monitoring in background
  monitorMessages(companyAReader, 'Company A');
  monitorMessages(companyBReader, 'Company B');
}

// Demo resource isolation with different message sizes
async function demoResourceIsolation() {
  console.log('\n=== Resource Isolation Demo ===');

  // Company A: Large messages
  const clientA = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producerA = await clientA.createProducer({
    topic: 'persistent://company-a/processing/heavy-tasks',
    producerName: 'company-a-heavy-producer',
    maxPendingMessages: 100,
    batchingMaxMessages: 10,
  });

  // Company B: Small, frequent messages
  const clientB = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producerB = await clientB.createProducer({
    topic: 'persistent://company-b/notifications/light-events',
    producerName: 'company-b-light-producer',
    maxPendingMessages: 1000,
    batchingMaxMessages: 100,
  });

  // Company A: Send large messages
  console.log('Company A sending large messages...');
  const largePayload = 'x'.repeat(1024 * 10); // 10KB payload

  for (let i = 0; i < 5; i++) {
    const message = {
      taskId: `heavy-task-${i}`,
      payload: largePayload,
      metadata: {
        tenant: 'company-a',
        type: 'heavy-processing',
        size: largePayload.length,
      },
    };

    await producerA.send({
      data: Buffer.from(JSON.stringify(message)),
    });

    console.log(`[Company A] Sent heavy task ${i} (${largePayload.length} bytes)`);
    await new Promise(resolve => setTimeout(resolve, 500));
  }

  // Company B: Send small messages
  console.log('\nCompany B sending small messages...');

  for (let i = 0; i < 20; i++) {
    const message = {
      eventId: `light-event-${i}`,
      payload: `Small event ${i}`,
      metadata: {
        tenant: 'company-b',
        type: 'notification',
        size: 20,
      },
    };

    await producerB.send({
      data: Buffer.from(JSON.stringify(message)),
    });

    if (i % 5 === 0) {
      console.log(`[Company B] Sent light events ${i - 4}-${i}`);
    }
  }

  await producerA.close();
  await clientA.close();
  await producerB.close();
  await clientB.close();

  console.log('\nResource isolation demo completed');
  console.log('- Company A: Few large messages with dedicated resources');
  console.log('- Company B: Many small messages with optimized batching');
}

// Main function to run multi-tenancy demo
async function main() {
  console.log('=== Pulsar Multi-Tenancy Demo ===');

  try {
    // Step 1: Set up producers and consumers for each tenant
    console.log('\n1. Setting up tenant infrastructure...');

    const tenantA = tenants['company-a'];
    const tenantB = tenants['company-b'];

    const companyAProducers = await createTenantProducers('company-a', tenantA);
    const companyBProducers = await createTenantProducers('company-b', tenantB);

    const companyAConsumers = await createTenantConsumers('company-a', tenantA);
    const companyBConsumers = await createTenantConsumers('company-b', tenantB);

    // Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 1000));

    // Step 2: Start cross-tenant monitor
    console.log('\n2. Starting cross-tenant monitoring...');
    crossTenantMonitor();

    // Step 3: Produce messages for each tenant
    console.log('\n3. Producing tenant-specific messages...');

    const productionPromises = [];

    // Company A production
    for (const producerInfo of companyAProducers) {
      productionPromises.push(produceTenantMessages(producerInfo, 'company-a', tenantA));
    }

    // Company B production
    for (const producerInfo of companyBProducers) {
      productionPromises.push(produceTenantMessages(producerInfo, 'company-b', tenantB));
    }

    // Wait for all production to complete
    await Promise.all(productionPromises);

    // Step 4: Demo resource isolation
    await new Promise(resolve => setTimeout(resolve, 2000));
    await demoResourceIsolation();

    // Wait for message processing
    await new Promise(resolve => setTimeout(resolve, 5000));

    // Cleanup
    console.log('\n5. Cleaning up tenant resources...');

    for (const { client, producer } of companyAProducers) {
      await producer.close();
      await client.close();
    }

    for (const { client, producer } of companyBProducers) {
      await producer.close();
      await client.close();
    }

    for (const { client, consumer } of companyAConsumers) {
      await consumer.close();
      await client.close();
    }

    for (const { client, consumer } of companyBConsumers) {
      await consumer.close();
      await client.close();
    }

    console.log('\nMulti-tenancy demo completed successfully!');
  } catch (err) {
    console.error('Error in multi-tenancy demo:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});