Apache Pulsar 示例

Apache Pulsar 示例,包括发布订阅、读取器、不同订阅类型的消费者、模式管理和分层存储,适用于企业级消息传递

💻 Pulsar Hello World - 基础生产者消费者 javascript

🟢 simple

简单的 Apache Pulsar 生产者和消费者示例,演示基础的发布-订阅模式

⏱️ 10 min 🏷️ pulsar, messaging, beginner
Prerequisites: Pulsar cluster running, Node.js
// Apache Pulsar Hello World - Basic Producer/Consumer Example
// Install: npm install pulsar-client

const Pulsar = require('pulsar-client');

async function helloWorld() {
  // Create Pulsar client
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
    operationTimeoutSeconds: 30,
  });

  console.log('Connected to Pulsar cluster');

  // Create producer
  const producer = await client.createProducer({
    topic: 'persistent://public/default/hello-world',
    sendTimeoutMs: 30000,
    batchingEnabled: true,
  });

  console.log('Producer created');

  // Create consumer
  const consumer = await client.subscribe({
    topic: 'persistent://public/default/hello-world',
    subscription: 'hello-subscription',
    subscriptionType: 'Shared',
    ackTimeoutMs: 10000,
  });

  console.log('Consumer created');

  // Start consumer in background
  (async () => {
    while (true) {
      try {
        const msg = await consumer.receive();
        console.log(`Received message: ${msg.getData().toString()}`);
        await consumer.acknowledge(msg);
      } catch (err) {
        console.error('Error receiving message:', err);
        break;
      }
    }
  })();

  // Wait a moment for consumer to be ready
  await new Promise(resolve => setTimeout(resolve, 1000));

  // Send messages
  console.log('Sending messages...');
  const messages = [
    'Hello Pulsar!',
    'Apache Pulsar is awesome!',
    'Streaming made easy!',
    'Hello from producer!'
  ];

  for (const message of messages) {
    await producer.send({
      data: Buffer.from(message),
      properties: {
        source: 'hello-world-example',
        timestamp: new Date().toISOString(),
      },
    });
    console.log(`Sent: ${message}`);
    await new Promise(resolve => setTimeout(resolve, 500));
  }

  // Wait for messages to be processed
  await new Promise(resolve => setTimeout(resolve, 2000));

  // Close producer, consumer, and client
  await producer.close();
  await consumer.close();
  await client.close();

  console.log('Pulsar example completed');
}

helloWorld().catch(err => {
  console.error('Error:', err);
});

💻 Pulsar 订阅类型 javascript

🟡 intermediate ⭐⭐⭐

探索不同的订阅类型:独占、共享、故障转移和键共享

⏱️ 20 min 🏷️ pulsar, subscriptions, patterns
Prerequisites: Pulsar cluster running, Node.js, Pulsar basics
// Apache Pulsar Subscription Types Example
// Demonstrates: Exclusive, Shared, Failover, and Key_Shared

const Pulsar = require('pulsar-client');

// Consumer configuration for different subscription types
const subscriptionConfigs = {
  // Only one consumer can receive messages
  exclusive: {
    topic: 'persistent://public/default/subscription-demo',
    subscription: 'exclusive-sub',
    subscriptionType: 'Exclusive',
  },
  // Load balance messages across multiple consumers
  shared: {
    topic: 'persistent://public/default/subscription-demo',
    subscription: 'shared-sub',
    subscriptionType: 'Shared',
  },
  // One active consumer, others are standbys
  failover: {
    topic: 'persistent://public/default/subscription-demo',
    subscription: 'failover-sub',
    subscriptionType: 'Failover',
  },
  // Load balance with message key ordering
  key_shared: {
    topic: 'persistent://public/default/subscription-demo',
    subscription: 'key-shared-sub',
    subscriptionType: 'Key_Shared',
  },
};

// Create a consumer with specific configuration
async function createConsumer(client, config, consumerId) {
  try {
    const consumer = await client.subscribe(config);
    console.log(`Consumer ${consumerId} created with type: ${config.subscriptionType}`);

    // Start consuming messages
    (async () => {
      while (true) {
        try {
          const msg = await consumer.receive();
          const messageData = msg.getData().toString();
          const messageKey = msg.getMessageId();
          const properties = msg.getProperties();

          console.log(`[${consumerId}] Received: ${messageData}`);
          console.log(`[${consumerId}] Properties: ${JSON.stringify(properties)}`);

          await consumer.acknowledge(msg);
        } catch (err) {
          console.error(`[${consumerId}] Error receiving message:`, err);
          break;
        }
      }
    })();

    return consumer;
  } catch (err) {
    console.error(`Failed to create consumer ${consumerId}:`, err);
    return null;
  }
}

// Producer that sends messages with keys
async function messageProducer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producer = await client.createProducer({
    topic: 'persistent://public/default/subscription-demo',
    batchingEnabled: true,
    blockIfQueueFull: true,
  });

  console.log('Producer started');

  // Send messages with different keys
  const messages = [
    { key: 'user.1', content: 'User 1 login' },
    { key: 'user.2', content: 'User 2 login' },
    { key: 'order.1', content: 'Order created' },
    { key: 'user.1', content: 'User 1 logout' },
    { key: 'order.2', content: 'Order processed' },
    { key: 'user.2', content: 'User 2 update' },
    { key: 'order.1', content: 'Order shipped' },
    { key: 'user.1', content: 'User 1 login again' },
  ];

  for (const msg of messages) {
    await producer.send({
      data: Buffer.from(msg.content),
      partitionKey: msg.key,
      properties: {
        messageType: 'demo',
        category: msg.key.split('.')[0],
        id: msg.key.split('.')[1],
      },
    });

    console.log(`Sent message with key '${msg.key}': ${msg.content}`);
    await new Promise(resolve => setTimeout(resolve, 300));
  }

  await producer.close();
  await client.close();
}

// Demo different subscription types
async function demoSubscriptionTypes() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('=== Pulsar Subscription Types Demo ===
');

  // Demo 1: Exclusive subscription (only one consumer allowed)
  console.log('1. EXCLUSIVE SUBSCRIPTION');
  console.log('   - Only one consumer can subscribe');
  console.log('   - All messages go to that single consumer
');

  const exclusiveConsumer1 = await createConsumer(
    client,
    subscriptionConfigs.exclusive,
    'EXCLUSIVE-1'
  );

  // Demo 2: Shared subscription (load balancing)
  console.log('2. SHARED SUBSCRIPTION');
  console.log('   - Multiple consumers can subscribe');
  console.log('   - Messages are load balanced across consumers
');

  const sharedConsumer1 = await createConsumer(
    client,
    subscriptionConfigs.shared,
    'SHARED-1'
  );

  const sharedConsumer2 = await createConsumer(
    client,
    subscriptionConfigs.shared,
    'SHARED-2'
  );

  // Demo 3: Failover subscription (active-standby)
  console.log('3. FAILOVER SUBSCRIPTION');
  console.log('   - Multiple consumers can subscribe');
  console.log('   - Only one is active, others are standbys
');

  const failoverConsumer1 = await createConsumer(
    client,
    subscriptionConfigs.failover,
    'FAILOVER-1'
  );

  const failoverConsumer2 = await createConsumer(
    client,
    subscriptionConfigs.failover,
    'FAILOVER-2'
  );

  // Demo 4: Key_Shared subscription (key-based ordering)
  console.log('4. KEY_SHARED SUBSCRIPTION');
  console.log('   - Multiple consumers can subscribe');
  console.log('   - Messages with same key go to same consumer
');

  const keySharedConsumer1 = await createConsumer(
    client,
    subscriptionConfigs.key_shared,
    'KEY-SHARED-1'
  );

  const keySharedConsumer2 = await createConsumer(
    client,
    subscriptionConfigs.key_shared,
    'KEY-SHARED-2'
  );

  // Wait for consumers to be ready
  await new Promise(resolve => setTimeout(resolve, 1000));

  // Start producer to send messages
  await messageProducer();

  // Wait for message processing
  await new Promise(resolve => setTimeout(resolve, 5000));

  // Clean up
  console.log('
Cleaning up consumers...');
  await exclusiveConsumer1.close();
  await sharedConsumer1.close();
  await sharedConsumer2.close();
  await failoverConsumer1.close();
  await failoverConsumer2.close();
  await keySharedConsumer1.close();
  await keySharedConsumer2.close();
  await client.close();
}

demoSubscriptionTypes().catch(err => {
  console.error('Error:', err);
});

💻 Pulsar 读取器 API javascript

🟡 intermediate ⭐⭐⭐

使用 Pulsar 读取器 API 从特定位置读取消息,具有游标控制功能

⏱️ 20 min 🏷️ pulsar, reader, cursor, api
Prerequisites: Pulsar cluster running, Node.js, Pulsar basics
// Apache Pulsar Reader API Example
// Demonstrates reading messages with cursor control

const Pulsar = require('pulsar-client');

// Producer to populate topic with messages
async function populateTopic() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producer = await client.createProducer({
    topic: 'persistent://public/default/reader-demo',
    batchingEnabled: true,
  });

  console.log('Populating topic with messages...');

  // Send messages with timestamps
  const messages = [];
  const startTime = Date.now();

  for (let i = 1; i <= 20; i++) {
    const message = {
      id: `msg-${i}`,
      content: `Message number ${i}`,
      timestamp: startTime + (i * 1000),
      batch: Math.floor((i - 1) / 5) + 1,
    };

    await producer.send({
      data: Buffer.from(JSON.stringify(message)),
      properties: {
        messageId: message.id,
        batch: message.batch.toString(),
      },
    });

    messages.push(message);
    console.log(`Sent: ${message.content}`);

    // Small delay between messages
    await new Promise(resolve => setTimeout(resolve, 100));
  }

  await producer.close();
  await client.close();

  return messages;
}

// Reader that starts from the beginning
async function readFromBeginning() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Reading from beginning ===');

  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.earliest(),
  });

  let messageCount = 0;
  console.log('Reading first 5 messages from beginning:');

  while (messageCount < 5) {
    try {
      const msg = await reader.readNext();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  ${messageData.content} (ID: ${messageData.id})`);
      messageCount++;
    } catch (err) {
      console.error('Error reading message:', err);
      break;
    }
  }

  await reader.close();
  await client.close();
}

// Reader that starts from the latest message
async function readFromLatest() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Reading from latest ===');

  // Wait a moment to ensure we're at the latest
  await new Promise(resolve => setTimeout(resolve, 500));

  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.latest(),
  });

  // Should not receive any messages since we're reading from latest
  // and no new messages have been produced
  console.log('Reading from latest position (should be empty)...');

  try {
    // Set a timeout for reading
    const msg = await Promise.race([
      reader.readNext(),
      new Promise((_, reject) =>
        setTimeout(() => reject(new Error('Timeout')), 1000)
      )
    ]);
    console.log('Unexpected message received');
  } catch (err) {
    if (err.message === 'Timeout') {
      console.log('No messages available at latest position (expected)');
    } else {
      console.error('Error:', err);
    }
  }

  await reader.close();
  await client.close();
}

// Reader that starts from a specific message ID
async function readFromSpecificMessage(messages) {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Reading from specific message ===');

  // First, read all messages to find a specific message ID
  const tempReader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.earliest(),
  });

  let targetMessageId = null;
  let messageCount = 0;

  // Skip first 10 messages
  while (messageCount < 10) {
    try {
      await tempReader.readNext();
      messageCount++;
    } catch (err) {
      break;
    }
  }

  // Get the next message ID as our target
  try {
    const targetMsg = await tempReader.readNext();
    const targetData = JSON.parse(targetMsg.getData().toString());
    console.log(`Found target message: ${targetData.content}`);
    targetMessageId = targetMsg.getMessageId();
  } catch (err) {
    console.error('Error finding target message:', err);
    await tempReader.close();
    await client.close();
    return;
  }

  await tempReader.close();

  // Now create a reader starting from that specific message
  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: targetMessageId,
  });

  console.log('Reading messages from message 11 onwards:');
  messageCount = 0;

  while (messageCount < 5) {
    try {
      const msg = await reader.readNext();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  ${messageData.content} (ID: ${messageData.id})`);
      messageCount++;
    } catch (err) {
      console.error('Error reading message:', err);
      break;
    }
  }

  await reader.close();
  await client.close();
}

// Reader with timestamp-based start
async function readFromTimestamp() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Reading from timestamp ===');

  // Calculate timestamp for halfway through the messages
  const halfTime = Date.now() - 10000; // 10 seconds ago

  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.timestamp(halfTime),
  });

  console.log(`Reading messages from timestamp: ${new Date(halfTime)}`);

  let messageCount = 0;
  while (messageCount < 5) {
    try {
      const msg = await reader.readNext();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  ${messageData.content} (Time: ${new Date(messageData.timestamp)})`);
      messageCount++;
    } catch (err) {
      console.error('Error reading message:', err);
      break;
    }
  }

  await reader.close();
  await client.close();
}

// Compare reader vs consumer
async function compareReaderConsumer() {
  console.log('\n=== Reader vs Consumer Comparison ===');

  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  // Create a reader
  const reader = await client.createReader({
    topic: 'persistent://public/default/reader-demo',
    startMessageId: Pulsar.MessageId.earliest(),
  });

  // Create a consumer
  const consumer = await client.subscribe({
    topic: 'persistent://public/default/reader-demo',
    subscription: 'comparison-sub',
    subscriptionType: 'Exclusive',
  });

  console.log('Reader and Consumer both reading from beginning...');

  // Reader reads without acknowledging
  console.log('\nReader messages (no ack needed):');
  for (let i = 0; i < 3; i++) {
    try {
      const msg = await reader.readNext();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  Reader: ${messageData.content}`);
    } catch (err) {
      break;
    }
  }

  // Consumer reads and acknowledges
  console.log('\nConsumer messages (requires ack):');
  for (let i = 0; i < 3; i++) {
    try {
      const msg = await consumer.receive();
      const messageData = JSON.parse(msg.getData().toString());
      console.log(`  Consumer: ${messageData.content}`);
      await consumer.acknowledge(msg);
    } catch (err) {
      break;
    }
  }

  console.log('\nKey differences:');
  console.log('- Reader: No subscription, no ack needed, can start from any position');
  console.log('- Consumer: Has subscription, requires ack, tracks cursor per subscription');

  await reader.close();
  await consumer.close();
  await client.close();
}

// Main function
async function main() {
  console.log('=== Pulsar Reader API Demo ===');

  try {
    // Step 1: Populate the topic
    console.log('Step 1: Populating topic with messages...');
    await populateTopic();

    // Step 2: Demonstrate different reading patterns
    await new Promise(resolve => setTimeout(resolve, 1000));
    await readFromBeginning();

    await new Promise(resolve => setTimeout(resolve, 1000));
    await readFromLatest();

    await new Promise(resolve => setTimeout(resolve, 1000));
    await readFromSpecificMessage();

    await new Promise(resolve => setTimeout(resolve, 1000));
    await readFromTimestamp();

    await new Promise(resolve => setTimeout(resolve, 1000));
    await compareReaderConsumer();

    console.log('\nReader API demo completed!');
  } catch (err) {
    console.error('Error in reader demo:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Pulsar 模式注册表 javascript

🔴 complex ⭐⭐⭐⭐

使用 Pulsar 模式注册表处理 JSON、Avro 和 Protobuf 模式的结构化数据

⏱️ 25 min 🏷️ pulsar, schema, registry, structured-data
Prerequisites: Pulsar cluster with schema registry, Node.js, Advanced Pulsar knowledge
// Apache Pulsar Schema Registry Example
// Demonstrates using JSON, Avro, and Protobuf schemas

const Pulsar = require('pulsar-client');

// Define JSON schema for user events
const userEventSchema = {
  type: 'JSON',
  schema: `{
    "type": "record",
    "name": "UserEvent",
    "fields": [
      {"name": "userId", "type": "string"},
      {"name": "eventType", "type": "string"},
      {"name": "timestamp", "type": "long"},
      {"name": "data", "type": ["null", {
        "type": "record",
        "name": "EventData",
        "fields": [
          {"name": "email", "type": ["null", "string"]},
          {"name": "name", "type": ["null", "string"]},
          {"name": "action", "type": ["null", "string"]}
        ]
      }], "default": null}
    ]
  }`,
};

// Define Avro schema for order events
const orderEventSchema = {
  type: 'AVRO',
  schema: `{
    "type": "record",
    "name": "OrderEvent",
    "fields": [
      {"name": "orderId", "type": "string"},
      {"name": "customerId", "type": "string"},
      {"name": "eventType", "type": "string"},
      {"name": "amount", "type": "double"},
      {"name": "currency", "type": "string"},
      {"name": "timestamp", "type": "long"},
      {"name": "items", "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }
      }}
    ]
  }`,
};

// Producer with JSON schema for user events
async function userEventProducer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('Creating user event producer with JSON schema...');

  const producer = await client.createProducer({
    topic: 'persistent://public/default/user-events',
    schema: userEventSchema,
  });

  // User events to publish
  const userEvents = [
    {
      userId: 'user123',
      eventType: 'user.created',
      timestamp: Date.now(),
      data: {
        email: '[email protected]',
        name: 'John Doe',
        action: 'registration',
      },
    },
    {
      userId: 'user123',
      eventType: 'user.updated',
      timestamp: Date.now(),
      data: {
        email: '[email protected]',
        name: 'John Smith',
        action: 'profile_update',
      },
    },
    {
      userId: 'user456',
      eventType: 'user.login',
      timestamp: Date.now(),
      data: {
        action: 'authentication',
      },
    },
  ];

  for (const event of userEvents) {
    console.log(`Publishing user event: ${event.eventType}`);
    await producer.send({
      data: Buffer.from(JSON.stringify(event)),
    });
  }

  await producer.close();
  await client.close();
}

// Producer with Avro schema for order events
async function orderEventProducer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('Creating order event producer with Avro schema...');

  const producer = await client.createProducer({
    topic: 'persistent://public/default/order-events',
    schema: orderEventSchema,
  });

  // Order events to publish
  const orderEvents = [
    {
      orderId: 'order789',
      customerId: 'user123',
      eventType: 'order.created',
      amount: 99.99,
      currency: 'USD',
      timestamp: Date.now(),
      items: [
        {
          productId: 'prod001',
          quantity: 2,
          price: 49.99,
        },
      ],
    },
    {
      orderId: 'order790',
      customerId: 'user456',
      eventType: 'order.created',
      amount: 149.99,
      currency: 'USD',
      timestamp: Date.now(),
      items: [
        {
          productId: 'prod002',
          quantity: 1,
          price: 149.99,
        },
      ],
    },
  ];

  for (const event of orderEvents) {
    console.log(`Publishing order event: ${event.eventType}`);
    await producer.send({
      data: Buffer.from(JSON.stringify(event)),
    });
  }

  await producer.close();
  await client.close();
}

// Consumer for user events with JSON schema
async function userEventConsumer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('Creating user event consumer with JSON schema...');

  const consumer = await client.subscribe({
    topic: 'persistent://public/default/user-events',
    subscription: 'user-events-sub',
    subscriptionType: 'Shared',
    schema: userEventSchema,
  });

  console.log('Consuming user events...');
  let messageCount = 0;

  while (messageCount < 3) {
    try {
      const msg = await consumer.receive();
      const eventData = JSON.parse(msg.getData().toString());

      console.log(`User Event Received:`);
      console.log(`  - User ID: ${eventData.userId}`);
      console.log(`  - Event Type: ${eventData.eventType}`);
      console.log(`  - Timestamp: ${new Date(eventData.timestamp)}`);
      if (eventData.data) {
        console.log(`  - Data: ${JSON.stringify(eventData.data)}`);
      }
      console.log('');

      await consumer.acknowledge(msg);
      messageCount++;
    } catch (err) {
      console.error('Error consuming user event:', err);
      break;
    }
  }

  await consumer.close();
  await client.close();
}

// Consumer for order events with Avro schema
async function orderEventConsumer() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('Creating order event consumer with Avro schema...');

  const consumer = await client.subscribe({
    topic: 'persistent://public/default/order-events',
    subscription: 'order-events-sub',
    subscriptionType: 'Shared',
    schema: orderEventSchema,
  });

  console.log('Consuming order events...');
  let messageCount = 0;

  while (messageCount < 2) {
    try {
      const msg = await consumer.receive();
      const eventData = JSON.parse(msg.getData().toString());

      console.log(`Order Event Received:`);
      console.log(`  - Order ID: ${eventData.orderId}`);
      console.log(`  - Customer ID: ${eventData.customerId}`);
      console.log(`  - Event Type: ${eventData.eventType}`);
      console.log(`  - Amount: ${eventData.amount} ${eventData.currency}`);
      console.log(`  - Items Count: ${eventData.items.length}`);
      console.log('');

      await consumer.acknowledge(msg);
      messageCount++;
    } catch (err) {
      console.error('Error consuming order event:', err);
      break;
    }
  }

  await consumer.close();
  await client.close();
}

// Main function to run the schema demo
async function main() {
  console.log('=== Pulsar Schema Registry Demo ===
');

  try {
    // Start consumers first
    console.log('1. Starting consumers...
');

    const userConsumerPromise = userEventConsumer();
    const orderConsumerPromise = orderEventConsumer();

    // Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 1000));

    // Produce events
    console.log('2. Producing events...
');
    await userEventProducer();
    await new Promise(resolve => setTimeout(resolve, 500));
    await orderEventProducer();

    // Wait for consumption to complete
    await userConsumerPromise;
    await orderConsumerPromise;

    console.log('Schema registry demo completed successfully!');
  } catch (err) {
    console.error('Error in schema demo:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Pulsar 多租户 javascript

🔴 complex ⭐⭐⭐⭐⭐

使用命名空间、隔离和资源管理实现多租户架构

⏱️ 30 min 🏷️ pulsar, multi-tenancy, namespaces
Prerequisites: Pulsar cluster with multi-tenancy enabled, Node.js, Advanced Pulsar knowledge
// Apache Pulsar Multi-Tenancy Example
// Demonstrates namespace isolation and tenant-specific configurations

const Pulsar = require('pulsar-client');

// Tenant configurations
const tenants = {
  'company-a': {
    name: 'Company A',
    topics: [
      'persistent://company-a/production/orders',
      'persistent://company-a/production/users',
      'persistent://company-a/staging/logs',
    ],
    producers: 2,
    consumers: 3,
  },
  'company-b': {
    name: 'Company B',
    topics: [
      'persistent://company-b/analytics/events',
      'persistent://company-b/analytics/metrics',
      'persistent://company-b/notifications/emails',
    ],
    producers: 1,
    consumers: 2,
  },
};

// Create producers for a specific tenant
async function createTenantProducers(tenantId, tenantConfig) {
  const producers = [];

  for (let i = 0; i < tenantConfig.producers; i++) {
    const client = new Pulsar.Client({
      serviceUrl: 'pulsar://localhost:6650',
    });

    const producer = await client.createProducer({
      topic: tenantConfig.topics[i % tenantConfig.topics.length],
      producerName: `${tenantId}-producer-${i + 1}`,
      batchingEnabled: true,
      blockIfQueueFull: true,
      sendTimeoutMs: 30000,
    });

    producers.push({ client, producer, topicIndex: i });
    console.log(`Created ${tenantId} producer ${i + 1}`);
  }

  return producers;
}

// Create consumers for a specific tenant
async function createTenantConsumers(tenantId, tenantConfig) {
  const consumers = [];

  for (let i = 0; i < tenantConfig.consumers; i++) {
    const client = new Pulsar.Client({
      serviceUrl: 'pulsar://localhost:6650',
    });

    const topic = tenantConfig.topics[i % tenantConfig.topics.length];
    const consumer = await client.subscribe({
      topic: topic,
      subscription: `${tenantId}-subscription-${i + 1}`,
      subscriptionType: 'Shared',
      consumerName: `${tenantId}-consumer-${i + 1}`,
      ackTimeoutMs: 10000,
    });

    // Start consuming messages
    (async () => {
      while (true) {
        try {
          const msg = await consumer.receive();
          const messageData = JSON.parse(msg.getData().toString());
          console.log(`[${tenantId.toUpperCase()}] ${consumer.getConsumerName()}: ${messageData.event}`);
          await consumer.acknowledge(msg);
        } catch (err) {
          console.error(`[${tenantId.toUpperCase()}] Consumer error:`, err);
          break;
        }
      }
    })();

    consumers.push({ client, consumer });
    console.log(`Created ${tenantId} consumer ${i + 1}`);
  }

  return consumers;
}

// Produce messages for a tenant
async function produceTenantMessages(producerInfo, tenantId, tenantConfig) {
  const { producer, topicIndex } = producerInfo;
  const topic = tenantConfig.topics[topicIndex];

  // Extract topic name from full topic path
  const topicName = topic.split('/').pop();

  const events = [];
  for (let i = 1; i <= 10; i++) {
    events.push({
      tenant: tenantId,
      event: `${topicName}-event-${i}`,
      timestamp: Date.now(),
      producerId: producer.getProducerName(),
      data: {
        message: `Event ${i} from ${tenantConfig.name}`,
        metadata: {
          tenant: tenantId,
          topic: topicName,
          batch: Math.ceil(i / 3),
        },
      },
    });
  }

  for (const event of events) {
    await producer.send({
      data: Buffer.from(JSON.stringify(event)),
      properties: {
        tenant: tenantId,
        topic: topicName,
        producer: producer.getProducerName(),
      },
    });

    console.log(`[${tenantId.toUpperCase()}] ${producer.getProducerName()}: Sent ${event.event}`);
    await new Promise(resolve => setTimeout(resolve, 200));
  }
}

// Monitor cross-tenant message flow
async function crossTenantMonitor() {
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  console.log('\n=== Cross-Tenant Monitor ===');
  console.log('Monitoring messages across all tenants...');

  // Monitor Company A topics
  const companyAReader = await client.createReader({
    topic: 'persistent://company-a/production/orders',
    startMessageId: Pulsar.MessageId.latest(),
  });

  // Monitor Company B topics
  const companyBReader = await client.createReader({
    topic: 'persistent://company-b/analytics/events',
    startMessageId: Pulsar.MessageId.latest(),
  });

  // Monitor for new messages
  const monitorMessages = async (reader, tenantId) => {
    while (true) {
      try {
        const msg = await Promise.race([
          reader.readNext(),
          new Promise((_, reject) =>
            setTimeout(() => reject(new Error('Timeout')), 2000)
          )
        ]);
        const messageData = JSON.parse(msg.getData().toString());
        console.log(`[MONITOR] ${tenantId}: ${messageData.event}`);
      } catch (err) {
        if (err.message === 'Timeout') {
          // Continue monitoring
          continue;
        } else {
          console.error(`[MONITOR] ${tenantId} error:`, err);
          break;
        }
      }
    }
  };

  // Start monitoring in background
  monitorMessages(companyAReader, 'Company A');
  monitorMessages(companyBReader, 'Company B');
}

// Demo resource isolation with different message sizes
async function demoResourceIsolation() {
  console.log('\n=== Resource Isolation Demo ===');

  // Company A: Large messages
  const clientA = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producerA = await clientA.createProducer({
    topic: 'persistent://company-a/processing/heavy-tasks',
    producerName: 'company-a-heavy-producer',
    maxPendingMessages: 100,
    batchingMaxMessages: 10,
  });

  // Company B: Small, frequent messages
  const clientB = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  const producerB = await clientB.createProducer({
    topic: 'persistent://company-b/notifications/light-events',
    producerName: 'company-b-light-producer',
    maxPendingMessages: 1000,
    batchingMaxMessages: 100,
  });

  // Company A: Send large messages
  console.log('Company A sending large messages...');
  const largePayload = 'x'.repeat(1024 * 10); // 10KB payload

  for (let i = 0; i < 5; i++) {
    const message = {
      taskId: `heavy-task-${i}`,
      payload: largePayload,
      metadata: {
        tenant: 'company-a',
        type: 'heavy-processing',
        size: largePayload.length,
      },
    };

    await producerA.send({
      data: Buffer.from(JSON.stringify(message)),
    });

    console.log(`[Company A] Sent heavy task ${i} (${largePayload.length} bytes)`);
    await new Promise(resolve => setTimeout(resolve, 500));
  }

  // Company B: Send small messages
  console.log('\nCompany B sending small messages...');

  for (let i = 0; i < 20; i++) {
    const message = {
      eventId: `light-event-${i}`,
      payload: `Small event ${i}`,
      metadata: {
        tenant: 'company-b',
        type: 'notification',
        size: 20,
      },
    };

    await producerB.send({
      data: Buffer.from(JSON.stringify(message)),
    });

    if (i % 5 === 0) {
      console.log(`[Company B] Sent light events ${i - 4}-${i}`);
    }
  }

  await producerA.close();
  await clientA.close();
  await producerB.close();
  await clientB.close();

  console.log('\nResource isolation demo completed');
  console.log('- Company A: Few large messages with dedicated resources');
  console.log('- Company B: Many small messages with optimized batching');
}

// Main function to run multi-tenancy demo
async function main() {
  console.log('=== Pulsar Multi-Tenancy Demo ===');

  try {
    // Step 1: Set up producers and consumers for each tenant
    console.log('\n1. Setting up tenant infrastructure...');

    const tenantA = tenants['company-a'];
    const tenantB = tenants['company-b'];

    const companyAProducers = await createTenantProducers('company-a', tenantA);
    const companyBProducers = await createTenantProducers('company-b', tenantB);

    const companyAConsumers = await createTenantConsumers('company-a', tenantA);
    const companyBConsumers = await createTenantConsumers('company-b', tenantB);

    // Wait for consumers to be ready
    await new Promise(resolve => setTimeout(resolve, 1000));

    // Step 2: Start cross-tenant monitor
    console.log('\n2. Starting cross-tenant monitoring...');
    crossTenantMonitor();

    // Step 3: Produce messages for each tenant
    console.log('\n3. Producing tenant-specific messages...');

    const productionPromises = [];

    // Company A production
    for (const producerInfo of companyAProducers) {
      productionPromises.push(produceTenantMessages(producerInfo, 'company-a', tenantA));
    }

    // Company B production
    for (const producerInfo of companyBProducers) {
      productionPromises.push(produceTenantMessages(producerInfo, 'company-b', tenantB));
    }

    // Wait for all production to complete
    await Promise.all(productionPromises);

    // Step 4: Demo resource isolation
    await new Promise(resolve => setTimeout(resolve, 2000));
    await demoResourceIsolation();

    // Wait for message processing
    await new Promise(resolve => setTimeout(resolve, 5000));

    // Cleanup
    console.log('\n5. Cleaning up tenant resources...');

    for (const { client, producer } of companyAProducers) {
      await producer.close();
      await client.close();
    }

    for (const { client, producer } of companyBProducers) {
      await producer.close();
      await client.close();
    }

    for (const { client, consumer } of companyAConsumers) {
      await consumer.close();
      await client.close();
    }

    for (const { client, consumer } of companyBConsumers) {
      await consumer.close();
      await client.close();
    }

    console.log('\nMulti-tenancy demo completed successfully!');
  } catch (err) {
    console.error('Error in multi-tenancy demo:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});