Apache Pulsar 示例
Apache Pulsar 示例,包括发布订阅、读取器、不同订阅类型的消费者、模式管理和分层存储,适用于企业级消息传递
💻 Pulsar Hello World - 基础生产者消费者 javascript
🟢 simple
⭐
简单的 Apache Pulsar 生产者和消费者示例,演示基础的发布-订阅模式
⏱️ 10 min
🏷️ pulsar, messaging, beginner
Prerequisites:
Pulsar cluster running, Node.js
// Apache Pulsar Hello World - Basic Producer/Consumer Example
// Install: npm install pulsar-client
const Pulsar = require('pulsar-client');
async function helloWorld() {
// Create Pulsar client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
console.log('Connected to Pulsar cluster');
// Create producer
const producer = await client.createProducer({
topic: 'persistent://public/default/hello-world',
sendTimeoutMs: 30000,
batchingEnabled: true,
});
console.log('Producer created');
// Create consumer
const consumer = await client.subscribe({
topic: 'persistent://public/default/hello-world',
subscription: 'hello-subscription',
subscriptionType: 'Shared',
ackTimeoutMs: 10000,
});
console.log('Consumer created');
// Start consumer in background
(async () => {
while (true) {
try {
const msg = await consumer.receive();
console.log(`Received message: ${msg.getData().toString()}`);
await consumer.acknowledge(msg);
} catch (err) {
console.error('Error receiving message:', err);
break;
}
}
})();
// Wait a moment for consumer to be ready
await new Promise(resolve => setTimeout(resolve, 1000));
// Send messages
console.log('Sending messages...');
const messages = [
'Hello Pulsar!',
'Apache Pulsar is awesome!',
'Streaming made easy!',
'Hello from producer!'
];
for (const message of messages) {
await producer.send({
data: Buffer.from(message),
properties: {
source: 'hello-world-example',
timestamp: new Date().toISOString(),
},
});
console.log(`Sent: ${message}`);
await new Promise(resolve => setTimeout(resolve, 500));
}
// Wait for messages to be processed
await new Promise(resolve => setTimeout(resolve, 2000));
// Close producer, consumer, and client
await producer.close();
await consumer.close();
await client.close();
console.log('Pulsar example completed');
}
helloWorld().catch(err => {
console.error('Error:', err);
});
💻 Pulsar 订阅类型 javascript
🟡 intermediate
⭐⭐⭐
探索不同的订阅类型:独占、共享、故障转移和键共享
⏱️ 20 min
🏷️ pulsar, subscriptions, patterns
Prerequisites:
Pulsar cluster running, Node.js, Pulsar basics
// Apache Pulsar Subscription Types Example
// Demonstrates: Exclusive, Shared, Failover, and Key_Shared
const Pulsar = require('pulsar-client');
// Consumer configuration for different subscription types
const subscriptionConfigs = {
// Only one consumer can receive messages
exclusive: {
topic: 'persistent://public/default/subscription-demo',
subscription: 'exclusive-sub',
subscriptionType: 'Exclusive',
},
// Load balance messages across multiple consumers
shared: {
topic: 'persistent://public/default/subscription-demo',
subscription: 'shared-sub',
subscriptionType: 'Shared',
},
// One active consumer, others are standbys
failover: {
topic: 'persistent://public/default/subscription-demo',
subscription: 'failover-sub',
subscriptionType: 'Failover',
},
// Load balance with message key ordering
key_shared: {
topic: 'persistent://public/default/subscription-demo',
subscription: 'key-shared-sub',
subscriptionType: 'Key_Shared',
},
};
// Create a consumer with specific configuration
async function createConsumer(client, config, consumerId) {
try {
const consumer = await client.subscribe(config);
console.log(`Consumer ${consumerId} created with type: ${config.subscriptionType}`);
// Start consuming messages
(async () => {
while (true) {
try {
const msg = await consumer.receive();
const messageData = msg.getData().toString();
const messageKey = msg.getMessageId();
const properties = msg.getProperties();
console.log(`[${consumerId}] Received: ${messageData}`);
console.log(`[${consumerId}] Properties: ${JSON.stringify(properties)}`);
await consumer.acknowledge(msg);
} catch (err) {
console.error(`[${consumerId}] Error receiving message:`, err);
break;
}
}
})();
return consumer;
} catch (err) {
console.error(`Failed to create consumer ${consumerId}:`, err);
return null;
}
}
// Producer that sends messages with keys
async function messageProducer() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const producer = await client.createProducer({
topic: 'persistent://public/default/subscription-demo',
batchingEnabled: true,
blockIfQueueFull: true,
});
console.log('Producer started');
// Send messages with different keys
const messages = [
{ key: 'user.1', content: 'User 1 login' },
{ key: 'user.2', content: 'User 2 login' },
{ key: 'order.1', content: 'Order created' },
{ key: 'user.1', content: 'User 1 logout' },
{ key: 'order.2', content: 'Order processed' },
{ key: 'user.2', content: 'User 2 update' },
{ key: 'order.1', content: 'Order shipped' },
{ key: 'user.1', content: 'User 1 login again' },
];
for (const msg of messages) {
await producer.send({
data: Buffer.from(msg.content),
partitionKey: msg.key,
properties: {
messageType: 'demo',
category: msg.key.split('.')[0],
id: msg.key.split('.')[1],
},
});
console.log(`Sent message with key '${msg.key}': ${msg.content}`);
await new Promise(resolve => setTimeout(resolve, 300));
}
await producer.close();
await client.close();
}
// Demo different subscription types
async function demoSubscriptionTypes() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('=== Pulsar Subscription Types Demo ===
');
// Demo 1: Exclusive subscription (only one consumer allowed)
console.log('1. EXCLUSIVE SUBSCRIPTION');
console.log(' - Only one consumer can subscribe');
console.log(' - All messages go to that single consumer
');
const exclusiveConsumer1 = await createConsumer(
client,
subscriptionConfigs.exclusive,
'EXCLUSIVE-1'
);
// Demo 2: Shared subscription (load balancing)
console.log('2. SHARED SUBSCRIPTION');
console.log(' - Multiple consumers can subscribe');
console.log(' - Messages are load balanced across consumers
');
const sharedConsumer1 = await createConsumer(
client,
subscriptionConfigs.shared,
'SHARED-1'
);
const sharedConsumer2 = await createConsumer(
client,
subscriptionConfigs.shared,
'SHARED-2'
);
// Demo 3: Failover subscription (active-standby)
console.log('3. FAILOVER SUBSCRIPTION');
console.log(' - Multiple consumers can subscribe');
console.log(' - Only one is active, others are standbys
');
const failoverConsumer1 = await createConsumer(
client,
subscriptionConfigs.failover,
'FAILOVER-1'
);
const failoverConsumer2 = await createConsumer(
client,
subscriptionConfigs.failover,
'FAILOVER-2'
);
// Demo 4: Key_Shared subscription (key-based ordering)
console.log('4. KEY_SHARED SUBSCRIPTION');
console.log(' - Multiple consumers can subscribe');
console.log(' - Messages with same key go to same consumer
');
const keySharedConsumer1 = await createConsumer(
client,
subscriptionConfigs.key_shared,
'KEY-SHARED-1'
);
const keySharedConsumer2 = await createConsumer(
client,
subscriptionConfigs.key_shared,
'KEY-SHARED-2'
);
// Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 1000));
// Start producer to send messages
await messageProducer();
// Wait for message processing
await new Promise(resolve => setTimeout(resolve, 5000));
// Clean up
console.log('
Cleaning up consumers...');
await exclusiveConsumer1.close();
await sharedConsumer1.close();
await sharedConsumer2.close();
await failoverConsumer1.close();
await failoverConsumer2.close();
await keySharedConsumer1.close();
await keySharedConsumer2.close();
await client.close();
}
demoSubscriptionTypes().catch(err => {
console.error('Error:', err);
});
💻 Pulsar 读取器 API javascript
🟡 intermediate
⭐⭐⭐
使用 Pulsar 读取器 API 从特定位置读取消息,具有游标控制功能
⏱️ 20 min
🏷️ pulsar, reader, cursor, api
Prerequisites:
Pulsar cluster running, Node.js, Pulsar basics
// Apache Pulsar Reader API Example
// Demonstrates reading messages with cursor control
const Pulsar = require('pulsar-client');
// Producer to populate topic with messages
async function populateTopic() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const producer = await client.createProducer({
topic: 'persistent://public/default/reader-demo',
batchingEnabled: true,
});
console.log('Populating topic with messages...');
// Send messages with timestamps
const messages = [];
const startTime = Date.now();
for (let i = 1; i <= 20; i++) {
const message = {
id: `msg-${i}`,
content: `Message number ${i}`,
timestamp: startTime + (i * 1000),
batch: Math.floor((i - 1) / 5) + 1,
};
await producer.send({
data: Buffer.from(JSON.stringify(message)),
properties: {
messageId: message.id,
batch: message.batch.toString(),
},
});
messages.push(message);
console.log(`Sent: ${message.content}`);
// Small delay between messages
await new Promise(resolve => setTimeout(resolve, 100));
}
await producer.close();
await client.close();
return messages;
}
// Reader that starts from the beginning
async function readFromBeginning() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('\n=== Reading from beginning ===');
const reader = await client.createReader({
topic: 'persistent://public/default/reader-demo',
startMessageId: Pulsar.MessageId.earliest(),
});
let messageCount = 0;
console.log('Reading first 5 messages from beginning:');
while (messageCount < 5) {
try {
const msg = await reader.readNext();
const messageData = JSON.parse(msg.getData().toString());
console.log(` ${messageData.content} (ID: ${messageData.id})`);
messageCount++;
} catch (err) {
console.error('Error reading message:', err);
break;
}
}
await reader.close();
await client.close();
}
// Reader that starts from the latest message
async function readFromLatest() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('\n=== Reading from latest ===');
// Wait a moment to ensure we're at the latest
await new Promise(resolve => setTimeout(resolve, 500));
const reader = await client.createReader({
topic: 'persistent://public/default/reader-demo',
startMessageId: Pulsar.MessageId.latest(),
});
// Should not receive any messages since we're reading from latest
// and no new messages have been produced
console.log('Reading from latest position (should be empty)...');
try {
// Set a timeout for reading
const msg = await Promise.race([
reader.readNext(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), 1000)
)
]);
console.log('Unexpected message received');
} catch (err) {
if (err.message === 'Timeout') {
console.log('No messages available at latest position (expected)');
} else {
console.error('Error:', err);
}
}
await reader.close();
await client.close();
}
// Reader that starts from a specific message ID
async function readFromSpecificMessage(messages) {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('\n=== Reading from specific message ===');
// First, read all messages to find a specific message ID
const tempReader = await client.createReader({
topic: 'persistent://public/default/reader-demo',
startMessageId: Pulsar.MessageId.earliest(),
});
let targetMessageId = null;
let messageCount = 0;
// Skip first 10 messages
while (messageCount < 10) {
try {
await tempReader.readNext();
messageCount++;
} catch (err) {
break;
}
}
// Get the next message ID as our target
try {
const targetMsg = await tempReader.readNext();
const targetData = JSON.parse(targetMsg.getData().toString());
console.log(`Found target message: ${targetData.content}`);
targetMessageId = targetMsg.getMessageId();
} catch (err) {
console.error('Error finding target message:', err);
await tempReader.close();
await client.close();
return;
}
await tempReader.close();
// Now create a reader starting from that specific message
const reader = await client.createReader({
topic: 'persistent://public/default/reader-demo',
startMessageId: targetMessageId,
});
console.log('Reading messages from message 11 onwards:');
messageCount = 0;
while (messageCount < 5) {
try {
const msg = await reader.readNext();
const messageData = JSON.parse(msg.getData().toString());
console.log(` ${messageData.content} (ID: ${messageData.id})`);
messageCount++;
} catch (err) {
console.error('Error reading message:', err);
break;
}
}
await reader.close();
await client.close();
}
// Reader with timestamp-based start
async function readFromTimestamp() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('\n=== Reading from timestamp ===');
// Calculate timestamp for halfway through the messages
const halfTime = Date.now() - 10000; // 10 seconds ago
const reader = await client.createReader({
topic: 'persistent://public/default/reader-demo',
startMessageId: Pulsar.MessageId.timestamp(halfTime),
});
console.log(`Reading messages from timestamp: ${new Date(halfTime)}`);
let messageCount = 0;
while (messageCount < 5) {
try {
const msg = await reader.readNext();
const messageData = JSON.parse(msg.getData().toString());
console.log(` ${messageData.content} (Time: ${new Date(messageData.timestamp)})`);
messageCount++;
} catch (err) {
console.error('Error reading message:', err);
break;
}
}
await reader.close();
await client.close();
}
// Compare reader vs consumer
async function compareReaderConsumer() {
console.log('\n=== Reader vs Consumer Comparison ===');
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
// Create a reader
const reader = await client.createReader({
topic: 'persistent://public/default/reader-demo',
startMessageId: Pulsar.MessageId.earliest(),
});
// Create a consumer
const consumer = await client.subscribe({
topic: 'persistent://public/default/reader-demo',
subscription: 'comparison-sub',
subscriptionType: 'Exclusive',
});
console.log('Reader and Consumer both reading from beginning...');
// Reader reads without acknowledging
console.log('\nReader messages (no ack needed):');
for (let i = 0; i < 3; i++) {
try {
const msg = await reader.readNext();
const messageData = JSON.parse(msg.getData().toString());
console.log(` Reader: ${messageData.content}`);
} catch (err) {
break;
}
}
// Consumer reads and acknowledges
console.log('\nConsumer messages (requires ack):');
for (let i = 0; i < 3; i++) {
try {
const msg = await consumer.receive();
const messageData = JSON.parse(msg.getData().toString());
console.log(` Consumer: ${messageData.content}`);
await consumer.acknowledge(msg);
} catch (err) {
break;
}
}
console.log('\nKey differences:');
console.log('- Reader: No subscription, no ack needed, can start from any position');
console.log('- Consumer: Has subscription, requires ack, tracks cursor per subscription');
await reader.close();
await consumer.close();
await client.close();
}
// Main function
async function main() {
console.log('=== Pulsar Reader API Demo ===');
try {
// Step 1: Populate the topic
console.log('Step 1: Populating topic with messages...');
await populateTopic();
// Step 2: Demonstrate different reading patterns
await new Promise(resolve => setTimeout(resolve, 1000));
await readFromBeginning();
await new Promise(resolve => setTimeout(resolve, 1000));
await readFromLatest();
await new Promise(resolve => setTimeout(resolve, 1000));
await readFromSpecificMessage();
await new Promise(resolve => setTimeout(resolve, 1000));
await readFromTimestamp();
await new Promise(resolve => setTimeout(resolve, 1000));
await compareReaderConsumer();
console.log('\nReader API demo completed!');
} catch (err) {
console.error('Error in reader demo:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Pulsar 模式注册表 javascript
🔴 complex
⭐⭐⭐⭐
使用 Pulsar 模式注册表处理 JSON、Avro 和 Protobuf 模式的结构化数据
⏱️ 25 min
🏷️ pulsar, schema, registry, structured-data
Prerequisites:
Pulsar cluster with schema registry, Node.js, Advanced Pulsar knowledge
// Apache Pulsar Schema Registry Example
// Demonstrates using JSON, Avro, and Protobuf schemas
const Pulsar = require('pulsar-client');
// Define JSON schema for user events
const userEventSchema = {
type: 'JSON',
schema: `{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "string"},
{"name": "eventType", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "data", "type": ["null", {
"type": "record",
"name": "EventData",
"fields": [
{"name": "email", "type": ["null", "string"]},
{"name": "name", "type": ["null", "string"]},
{"name": "action", "type": ["null", "string"]}
]
}], "default": null}
]
}`,
};
// Define Avro schema for order events
const orderEventSchema = {
type: 'AVRO',
schema: `{
"type": "record",
"name": "OrderEvent",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "eventType", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "items", "type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}}
]
}`,
};
// Producer with JSON schema for user events
async function userEventProducer() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('Creating user event producer with JSON schema...');
const producer = await client.createProducer({
topic: 'persistent://public/default/user-events',
schema: userEventSchema,
});
// User events to publish
const userEvents = [
{
userId: 'user123',
eventType: 'user.created',
timestamp: Date.now(),
data: {
email: '[email protected]',
name: 'John Doe',
action: 'registration',
},
},
{
userId: 'user123',
eventType: 'user.updated',
timestamp: Date.now(),
data: {
email: '[email protected]',
name: 'John Smith',
action: 'profile_update',
},
},
{
userId: 'user456',
eventType: 'user.login',
timestamp: Date.now(),
data: {
action: 'authentication',
},
},
];
for (const event of userEvents) {
console.log(`Publishing user event: ${event.eventType}`);
await producer.send({
data: Buffer.from(JSON.stringify(event)),
});
}
await producer.close();
await client.close();
}
// Producer with Avro schema for order events
async function orderEventProducer() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('Creating order event producer with Avro schema...');
const producer = await client.createProducer({
topic: 'persistent://public/default/order-events',
schema: orderEventSchema,
});
// Order events to publish
const orderEvents = [
{
orderId: 'order789',
customerId: 'user123',
eventType: 'order.created',
amount: 99.99,
currency: 'USD',
timestamp: Date.now(),
items: [
{
productId: 'prod001',
quantity: 2,
price: 49.99,
},
],
},
{
orderId: 'order790',
customerId: 'user456',
eventType: 'order.created',
amount: 149.99,
currency: 'USD',
timestamp: Date.now(),
items: [
{
productId: 'prod002',
quantity: 1,
price: 149.99,
},
],
},
];
for (const event of orderEvents) {
console.log(`Publishing order event: ${event.eventType}`);
await producer.send({
data: Buffer.from(JSON.stringify(event)),
});
}
await producer.close();
await client.close();
}
// Consumer for user events with JSON schema
async function userEventConsumer() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('Creating user event consumer with JSON schema...');
const consumer = await client.subscribe({
topic: 'persistent://public/default/user-events',
subscription: 'user-events-sub',
subscriptionType: 'Shared',
schema: userEventSchema,
});
console.log('Consuming user events...');
let messageCount = 0;
while (messageCount < 3) {
try {
const msg = await consumer.receive();
const eventData = JSON.parse(msg.getData().toString());
console.log(`User Event Received:`);
console.log(` - User ID: ${eventData.userId}`);
console.log(` - Event Type: ${eventData.eventType}`);
console.log(` - Timestamp: ${new Date(eventData.timestamp)}`);
if (eventData.data) {
console.log(` - Data: ${JSON.stringify(eventData.data)}`);
}
console.log('');
await consumer.acknowledge(msg);
messageCount++;
} catch (err) {
console.error('Error consuming user event:', err);
break;
}
}
await consumer.close();
await client.close();
}
// Consumer for order events with Avro schema
async function orderEventConsumer() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('Creating order event consumer with Avro schema...');
const consumer = await client.subscribe({
topic: 'persistent://public/default/order-events',
subscription: 'order-events-sub',
subscriptionType: 'Shared',
schema: orderEventSchema,
});
console.log('Consuming order events...');
let messageCount = 0;
while (messageCount < 2) {
try {
const msg = await consumer.receive();
const eventData = JSON.parse(msg.getData().toString());
console.log(`Order Event Received:`);
console.log(` - Order ID: ${eventData.orderId}`);
console.log(` - Customer ID: ${eventData.customerId}`);
console.log(` - Event Type: ${eventData.eventType}`);
console.log(` - Amount: ${eventData.amount} ${eventData.currency}`);
console.log(` - Items Count: ${eventData.items.length}`);
console.log('');
await consumer.acknowledge(msg);
messageCount++;
} catch (err) {
console.error('Error consuming order event:', err);
break;
}
}
await consumer.close();
await client.close();
}
// Main function to run the schema demo
async function main() {
console.log('=== Pulsar Schema Registry Demo ===
');
try {
// Start consumers first
console.log('1. Starting consumers...
');
const userConsumerPromise = userEventConsumer();
const orderConsumerPromise = orderEventConsumer();
// Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 1000));
// Produce events
console.log('2. Producing events...
');
await userEventProducer();
await new Promise(resolve => setTimeout(resolve, 500));
await orderEventProducer();
// Wait for consumption to complete
await userConsumerPromise;
await orderConsumerPromise;
console.log('Schema registry demo completed successfully!');
} catch (err) {
console.error('Error in schema demo:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Pulsar 多租户 javascript
🔴 complex
⭐⭐⭐⭐⭐
使用命名空间、隔离和资源管理实现多租户架构
⏱️ 30 min
🏷️ pulsar, multi-tenancy, namespaces
Prerequisites:
Pulsar cluster with multi-tenancy enabled, Node.js, Advanced Pulsar knowledge
// Apache Pulsar Multi-Tenancy Example
// Demonstrates namespace isolation and tenant-specific configurations
const Pulsar = require('pulsar-client');
// Tenant configurations
const tenants = {
'company-a': {
name: 'Company A',
topics: [
'persistent://company-a/production/orders',
'persistent://company-a/production/users',
'persistent://company-a/staging/logs',
],
producers: 2,
consumers: 3,
},
'company-b': {
name: 'Company B',
topics: [
'persistent://company-b/analytics/events',
'persistent://company-b/analytics/metrics',
'persistent://company-b/notifications/emails',
],
producers: 1,
consumers: 2,
},
};
// Create producers for a specific tenant
async function createTenantProducers(tenantId, tenantConfig) {
const producers = [];
for (let i = 0; i < tenantConfig.producers; i++) {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const producer = await client.createProducer({
topic: tenantConfig.topics[i % tenantConfig.topics.length],
producerName: `${tenantId}-producer-${i + 1}`,
batchingEnabled: true,
blockIfQueueFull: true,
sendTimeoutMs: 30000,
});
producers.push({ client, producer, topicIndex: i });
console.log(`Created ${tenantId} producer ${i + 1}`);
}
return producers;
}
// Create consumers for a specific tenant
async function createTenantConsumers(tenantId, tenantConfig) {
const consumers = [];
for (let i = 0; i < tenantConfig.consumers; i++) {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const topic = tenantConfig.topics[i % tenantConfig.topics.length];
const consumer = await client.subscribe({
topic: topic,
subscription: `${tenantId}-subscription-${i + 1}`,
subscriptionType: 'Shared',
consumerName: `${tenantId}-consumer-${i + 1}`,
ackTimeoutMs: 10000,
});
// Start consuming messages
(async () => {
while (true) {
try {
const msg = await consumer.receive();
const messageData = JSON.parse(msg.getData().toString());
console.log(`[${tenantId.toUpperCase()}] ${consumer.getConsumerName()}: ${messageData.event}`);
await consumer.acknowledge(msg);
} catch (err) {
console.error(`[${tenantId.toUpperCase()}] Consumer error:`, err);
break;
}
}
})();
consumers.push({ client, consumer });
console.log(`Created ${tenantId} consumer ${i + 1}`);
}
return consumers;
}
// Produce messages for a tenant
async function produceTenantMessages(producerInfo, tenantId, tenantConfig) {
const { producer, topicIndex } = producerInfo;
const topic = tenantConfig.topics[topicIndex];
// Extract topic name from full topic path
const topicName = topic.split('/').pop();
const events = [];
for (let i = 1; i <= 10; i++) {
events.push({
tenant: tenantId,
event: `${topicName}-event-${i}`,
timestamp: Date.now(),
producerId: producer.getProducerName(),
data: {
message: `Event ${i} from ${tenantConfig.name}`,
metadata: {
tenant: tenantId,
topic: topicName,
batch: Math.ceil(i / 3),
},
},
});
}
for (const event of events) {
await producer.send({
data: Buffer.from(JSON.stringify(event)),
properties: {
tenant: tenantId,
topic: topicName,
producer: producer.getProducerName(),
},
});
console.log(`[${tenantId.toUpperCase()}] ${producer.getProducerName()}: Sent ${event.event}`);
await new Promise(resolve => setTimeout(resolve, 200));
}
}
// Monitor cross-tenant message flow
async function crossTenantMonitor() {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
console.log('\n=== Cross-Tenant Monitor ===');
console.log('Monitoring messages across all tenants...');
// Monitor Company A topics
const companyAReader = await client.createReader({
topic: 'persistent://company-a/production/orders',
startMessageId: Pulsar.MessageId.latest(),
});
// Monitor Company B topics
const companyBReader = await client.createReader({
topic: 'persistent://company-b/analytics/events',
startMessageId: Pulsar.MessageId.latest(),
});
// Monitor for new messages
const monitorMessages = async (reader, tenantId) => {
while (true) {
try {
const msg = await Promise.race([
reader.readNext(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), 2000)
)
]);
const messageData = JSON.parse(msg.getData().toString());
console.log(`[MONITOR] ${tenantId}: ${messageData.event}`);
} catch (err) {
if (err.message === 'Timeout') {
// Continue monitoring
continue;
} else {
console.error(`[MONITOR] ${tenantId} error:`, err);
break;
}
}
}
};
// Start monitoring in background
monitorMessages(companyAReader, 'Company A');
monitorMessages(companyBReader, 'Company B');
}
// Demo resource isolation with different message sizes
async function demoResourceIsolation() {
console.log('\n=== Resource Isolation Demo ===');
// Company A: Large messages
const clientA = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const producerA = await clientA.createProducer({
topic: 'persistent://company-a/processing/heavy-tasks',
producerName: 'company-a-heavy-producer',
maxPendingMessages: 100,
batchingMaxMessages: 10,
});
// Company B: Small, frequent messages
const clientB = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const producerB = await clientB.createProducer({
topic: 'persistent://company-b/notifications/light-events',
producerName: 'company-b-light-producer',
maxPendingMessages: 1000,
batchingMaxMessages: 100,
});
// Company A: Send large messages
console.log('Company A sending large messages...');
const largePayload = 'x'.repeat(1024 * 10); // 10KB payload
for (let i = 0; i < 5; i++) {
const message = {
taskId: `heavy-task-${i}`,
payload: largePayload,
metadata: {
tenant: 'company-a',
type: 'heavy-processing',
size: largePayload.length,
},
};
await producerA.send({
data: Buffer.from(JSON.stringify(message)),
});
console.log(`[Company A] Sent heavy task ${i} (${largePayload.length} bytes)`);
await new Promise(resolve => setTimeout(resolve, 500));
}
// Company B: Send small messages
console.log('\nCompany B sending small messages...');
for (let i = 0; i < 20; i++) {
const message = {
eventId: `light-event-${i}`,
payload: `Small event ${i}`,
metadata: {
tenant: 'company-b',
type: 'notification',
size: 20,
},
};
await producerB.send({
data: Buffer.from(JSON.stringify(message)),
});
if (i % 5 === 0) {
console.log(`[Company B] Sent light events ${i - 4}-${i}`);
}
}
await producerA.close();
await clientA.close();
await producerB.close();
await clientB.close();
console.log('\nResource isolation demo completed');
console.log('- Company A: Few large messages with dedicated resources');
console.log('- Company B: Many small messages with optimized batching');
}
// Main function to run multi-tenancy demo
async function main() {
console.log('=== Pulsar Multi-Tenancy Demo ===');
try {
// Step 1: Set up producers and consumers for each tenant
console.log('\n1. Setting up tenant infrastructure...');
const tenantA = tenants['company-a'];
const tenantB = tenants['company-b'];
const companyAProducers = await createTenantProducers('company-a', tenantA);
const companyBProducers = await createTenantProducers('company-b', tenantB);
const companyAConsumers = await createTenantConsumers('company-a', tenantA);
const companyBConsumers = await createTenantConsumers('company-b', tenantB);
// Wait for consumers to be ready
await new Promise(resolve => setTimeout(resolve, 1000));
// Step 2: Start cross-tenant monitor
console.log('\n2. Starting cross-tenant monitoring...');
crossTenantMonitor();
// Step 3: Produce messages for each tenant
console.log('\n3. Producing tenant-specific messages...');
const productionPromises = [];
// Company A production
for (const producerInfo of companyAProducers) {
productionPromises.push(produceTenantMessages(producerInfo, 'company-a', tenantA));
}
// Company B production
for (const producerInfo of companyBProducers) {
productionPromises.push(produceTenantMessages(producerInfo, 'company-b', tenantB));
}
// Wait for all production to complete
await Promise.all(productionPromises);
// Step 4: Demo resource isolation
await new Promise(resolve => setTimeout(resolve, 2000));
await demoResourceIsolation();
// Wait for message processing
await new Promise(resolve => setTimeout(resolve, 5000));
// Cleanup
console.log('\n5. Cleaning up tenant resources...');
for (const { client, producer } of companyAProducers) {
await producer.close();
await client.close();
}
for (const { client, producer } of companyBProducers) {
await producer.close();
await client.close();
}
for (const { client, consumer } of companyAConsumers) {
await consumer.close();
await client.close();
}
for (const { client, consumer } of companyBConsumers) {
await consumer.close();
await client.close();
}
console.log('\nMulti-tenancy demo completed successfully!');
} catch (err) {
console.error('Error in multi-tenancy demo:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});