🎯 Рекомендуемые коллекции
Балансированные коллекции примеров кода из различных категорий, которые вы можете исследовать
Протокол сообщений AMQP
Примеры протокола AMQP (Advanced Message Queuing Protocol) для корпоративной передачи сообщений с RabbitMQ, очередями, обменниками и расширенными паттернами маршрутизации
💻 Основы AMQP и RabbitMQ javascript
🟢 simple
⭐⭐
Введение в протокол AMQP и RabbitMQ с базовыми паттернами публикации/подписки
⏱️ 20 min
🏷️ amqp, rabbitmq, messaging, queue, exchange
Prerequisites:
Node.js, Message queue concepts, RabbitMQ basics
// AMQP Basic Examples using amqplib
// npm install amqplib
const amqp = require('amqplib/callback_api');
// RabbitMQ connection configuration
const RABBITMQ_URL = 'amqp://guest:guest@localhost:5672';
const EXCHANGE_NAME = 'demo_exchange';
const QUEUE_NAME = 'demo_queue';
const ROUTING_KEY = 'demo.routing.key';
// 1. Basic Producer (Publisher)
async function createProducer() {
console.log('Creating AMQP Producer...');
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
console.log('✅ Producer channel created');
// Declare a queue (idempotent - will only be created if it doesn't exist)
channel.assertQueue(QUEUE_NAME, {
durable: true // Queue will survive broker restart
}, (error2, q) => {
if (error2) {
reject(error2);
return;
}
console.log(`✅ Queue '${QUEUE_NAME}' declared`);
// Producer function
const producer = {
channel,
connection,
// Send message
sendMessage: (message, options = {}) => {
const messageBuffer = Buffer.from(JSON.stringify({
content: message,
timestamp: new Date().toISOString(),
id: Math.random().toString(36).substr(2, 9),
...options
}));
const defaultOptions = {
persistent: true, // Message will survive broker restart
timestamp: Date.now(),
messageId: Math.random().toString(36).substr(2, 9)
};
const sendOptions = { ...defaultOptions, ...options };
channel.sendToQueue(
QUEUE_NAME,
messageBuffer,
sendOptions,
(error3) => {
if (error3) {
console.error('Failed to send message:', error3);
} else {
console.log(`✅ Message sent to queue '${QUEUE_NAME}': ${message}`);
}
}
);
},
// Send multiple messages
sendBatch: (messages) => {
messages.forEach((msg, index) => {
setTimeout(() => {
producer.sendMessage(msg.content, msg.options);
}, index * 100); // 100ms delay between messages
});
},
// Close connection
close: () => {
channel.close(() => {
connection.close();
console.log('🔌 Producer connection closed');
});
}
};
resolve(producer);
});
});
});
});
}
// 2. Basic Consumer (Subscriber)
async function createConsumer(queueName = QUEUE_NAME) {
console.log(`Creating AMQP Consumer for queue '${queueName}'...`);
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
console.log('✅ Consumer channel created');
// Set prefetch to control how many messages are fetched at once
channel.prefetch(1);
// Declare queue (ensures it exists)
channel.assertQueue(queueName, {
durable: true
}, (error2, q) => {
if (error2) {
reject(error2);
return;
}
console.log(`✅ Queue '${queueName}' declared`);
console.log(`📋 Waiting for messages in queue '${queueName}'. To exit press CTRL+C`);
// Start consuming messages
channel.consume(queueName, (msg) => {
if (msg) {
try {
const messageContent = JSON.parse(msg.content.toString());
console.log(`📨 Received message: `, messageContent);
// Process the message (simulating processing time)
setTimeout(() => {
console.log(`✅ Processed message: ${messageContent.content}`);
// Acknowledge the message
channel.ack(msg);
}, 500);
} catch (error) {
console.error('❌ Error processing message:', error);
// Negative acknowledge (reject and requeue)
channel.nack(msg, false, true);
}
}
}, {
noAck: false // Manual acknowledgment mode
});
const consumer = {
channel,
connection,
queueName,
// Stop consuming
stop: () => {
channel.cancel(queueName, () => {
console.log(`🛑 Stopped consuming from '${queueName}'`);
});
},
// Close connection
close: () => {
channel.close(() => {
connection.close();
console.log('🔌 Consumer connection closed');
});
}
};
resolve(consumer);
});
});
});
});
}
// 3. Exchange-based Publisher/Subscriber
async function createExchangePublisher(exchangeName = EXCHANGE_NAME) {
console.log(`Creating Exchange Publisher for '${exchangeName}'...`);
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
// Declare a topic exchange
channel.assertExchange(exchangeName, 'topic', {
durable: true
}, (error2) => {
if (error2) {
reject(error2);
return;
}
console.log(`✅ Exchange '${exchangeName}' declared`);
const publisher = {
channel,
connection,
exchangeName,
// Publish message with routing key
publish: (routingKey, message, options = {}) => {
const messageBuffer = Buffer.from(JSON.stringify({
content: message,
timestamp: new Date().toISOString(),
routingKey,
...options
}));
channel.publish(exchangeName, routingKey, messageBuffer, {
persistent: true,
timestamp: Date.now(),
...options
});
console.log(`📤 Published to '${routingKey}': ${message}`);
},
// Close connection
close: () => {
channel.close(() => {
connection.close();
console.log('🔌 Publisher connection closed');
});
}
};
resolve(publisher);
});
});
});
});
}
// 4. Exchange-based Subscriber
async function createExchangeSubscriber(exchangeName = EXCHANGE_NAME, bindingPatterns = []) {
console.log(`Creating Exchange Subscriber for '${exchangeName}'...`);
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
// Declare exchange
channel.assertExchange(exchangeName, 'topic', {
durable: true
}, (error2) => {
if (error2) {
reject(error2);
return;
}
// Create an exclusive queue for this subscriber
channel.assertQueue('', {
exclusive: true,
durable: false
}, (error3, q) => {
if (error3) {
reject(error3);
return;
}
console.log(`✅ Created exclusive queue: ${q.queue}`);
// Bind queue to exchange with routing patterns
const bindings = [];
bindingPatterns.forEach(pattern => {
channel.bindQueue(q.queue, exchangeName, pattern, {}, (error4) => {
if (error4) {
console.error(`❌ Failed to bind '${pattern}':`, error4);
} else {
console.log(`✅ Bound queue to '${pattern}'`);
bindings.push(pattern);
}
});
});
// Start consuming
channel.consume(q.queue, (msg) => {
if (msg) {
try {
const messageContent = JSON.parse(msg.content.toString());
console.log(`📨 [${msg.fields.routingKey}] ${messageContent.content}`);
// Acknowledge message
channel.ack(msg);
} catch (error) {
console.error('❌ Error processing message:', error);
channel.nack(msg, false, false); // Don't requeue on error
}
}
}, {
noAck: false
});
const subscriber = {
channel,
connection,
queueName: q.queue,
bindings,
// Add new binding
addBinding: (pattern) => {
channel.bindQueue(q.queue, exchangeName, pattern, {}, (error) => {
if (!error) {
bindings.push(pattern);
console.log(`✅ Added binding for '${pattern}'`);
}
});
},
// Remove binding
removeBinding: (pattern) => {
channel.unbindQueue(q.queue, exchangeName, pattern, {}, (error) => {
if (!error) {
const index = bindings.indexOf(pattern);
if (index > -1) {
bindings.splice(index, 1);
}
console.log(`✅ Removed binding for '${pattern}'`);
}
});
},
// Close connection
close: () => {
channel.close(() => {
connection.close();
console.log('🔌 Subscriber connection closed');
});
}
};
resolve(subscriber);
});
});
});
});
});
}
// 5. RPC (Remote Procedure Call) Pattern
async function createRpcServer(serviceName = 'math_service') {
console.log(`Creating RPC Server for '${serviceName}'...`);
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
// Declare RPC queue
channel.assertQueue(serviceName, {
durable: false
}, (error2) => {
if (error2) {
reject(error2);
return;
}
console.log(`✅ RPC Server '${serviceName}' ready`);
// Set prefetch to 1 to process one request at a time
channel.prefetch(1);
// Process RPC requests
channel.consume(serviceName, (msg) => {
if (msg) {
try {
const request = JSON.parse(msg.content.toString());
console.log(`🔧 RPC Request: ${request.method}(${request.params?.join(', ')})`);
let result;
let error = null;
// Handle different RPC methods
switch (request.method) {
case 'add':
result = request.params.reduce((a, b) => a + b, 0);
break;
case 'multiply':
result = request.params.reduce((a, b) => a * b, 1);
break;
case 'factorial':
result = factorial(request.params[0]);
break;
case 'fibonacci':
result = fibonacci(request.params[0]);
break;
default:
error = `Unknown method: ${request.method}`;
}
const response = {
requestId: request.requestId,
result: result,
error: error,
timestamp: new Date().toISOString()
};
// Send response back to client
channel.sendToQueue(
msg.properties.replyTo,
Buffer.from(JSON.stringify(response)),
{
correlationId: msg.properties.correlationId
}
);
console.log(`✅ RPC Response: ${error || result}`);
channel.ack(msg);
} catch (error) {
console.error('❌ RPC Error:', error);
channel.nack(msg, false, false);
}
}
}, {
noAck: false
});
const rpcServer = {
channel,
connection,
serviceName,
close: () => {
channel.close(() => {
connection.close();
console.log('🔌 RPC Server connection closed');
});
}
};
resolve(rpcServer);
});
});
});
});
}
// 6. RPC Client
async function createRpcClient() {
console.log('Creating RPC Client...');
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
// Create callback queue for responses
channel.assertQueue('', {
exclusive: true
}, (error2, q) => {
if (error2) {
reject(error2);
return;
}
const pendingRequests = new Map();
// Consume responses
channel.consume(q.queue, (msg) => {
if (msg) {
try {
const response = JSON.parse(msg.content.toString());
const { requestId, result, error } = response;
if (pendingRequests.has(requestId)) {
const { resolve, reject, timeout } = pendingRequests.get(requestId);
clearTimeout(timeout);
pendingRequests.delete(requestId);
if (error) {
reject(new Error(error));
} else {
resolve(result);
}
}
channel.ack(msg);
} catch (error) {
console.error('❌ Error parsing RPC response:', error);
channel.nack(msg, false, false);
}
}
}, {
noAck: false
});
const rpcClient = {
channel,
connection,
callbackQueue: q.queue,
// Call RPC method
call: (serviceName, method, params = [], timeoutMs = 5000) => {
return new Promise((resolve, reject) => {
const requestId = Math.random().toString(36).substr(2, 9);
const request = {
requestId,
method,
params,
timestamp: new Date().toISOString()
};
// Set timeout
const timeout = setTimeout(() => {
pendingRequests.delete(requestId);
reject(new Error(`RPC timeout for ${method}`));
}, timeoutMs);
pendingRequests.set(requestId, { resolve, reject, timeout });
// Send request
channel.sendToQueue(
serviceName,
Buffer.from(JSON.stringify(request)),
{
replyTo: q.queue,
correlationId: requestId,
expiration: timeoutMs
}
);
console.log(`📞 RPC Call: ${method}(${params.join(', ')})`);
});
},
close: () => {
channel.close(() => {
connection.close();
console.log('🔌 RPC Client connection closed');
});
}
};
resolve(rpcClient);
});
});
});
});
}
// Helper functions
function factorial(n) {
if (n <= 1) return 1;
return n * factorial(n - 1);
}
function fibonacci(n) {
if (n <= 1) return n;
let a = 0, b = 1;
for (let i = 2; i <= n; i++) {
[a, b] = [b, a + b];
}
return b;
}
// 7. Dead Letter Queue Example
async function setupDeadLetterQueue() {
console.log('Setting up Dead Letter Queue...');
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
// Declare dead letter exchange
channel.assertExchange('dlx', 'direct', { durable: true });
// Declare dead letter queue
channel.assertQueue('dlq', { durable: true });
channel.bindQueue('dlq', 'dlx', 'dlq');
// Declare main queue with dead letter exchange
channel.assertQueue('main_queue', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dlq',
'x-message-ttl': 60000 // Messages expire after 60 seconds
}
});
console.log('✅ Dead Letter Queue setup complete');
const dlqManager = {
channel,
connection,
publishToMain: (message) => {
channel.sendToQueue('main_queue', Buffer.from(message), {
persistent: true
});
console.log(`📤 Published to main queue: ${message}`);
},
consumeFromDLQ: (callback) => {
channel.consume('dlq', (msg) => {
if (msg) {
callback(msg.content.toString(), msg.properties);
channel.ack(msg);
}
}, { noAck: false });
console.log('📨 Consuming from Dead Letter Queue');
},
close: () => {
channel.close(() => {
connection.close();
console.log('🔌 DLQ Manager connection closed');
});
}
};
resolve(dlqManager);
});
});
});
}
// Main demonstration
async function runDemo() {
console.log('=== AMQP Demo ===\n');
try {
// 1. Basic producer/consumer
console.log('1. Basic Producer/Consumer Demo');
const producer = await createProducer();
const consumer = await createConsumer();
// Send messages
producer.sendMessage('Hello AMQP!');
producer.sendMessage('This is a test message');
producer.sendMessage('Message with options', { priority: 5 });
// Wait for processing
await new Promise(resolve => setTimeout(resolve, 2000));
// 2. Exchange-based pub/sub
console.log('\n2. Exchange-based Publisher/Subscriber Demo');
const exchangePublisher = await createExchangePublisher();
const exchangeSubscriber = await createExchangeSubscriber('demo_exchange', [
'sensor.temperature.*',
'alert.critical.*'
]);
// Publish to exchange with different routing keys
exchangePublisher.publish('sensor.temperature.living_room', '25°C');
exchangePublisher.publish('sensor.temperature.bedroom', '22°C');
exchangePublisher.publish('alert.critical.fire', 'Fire detected!');
exchangePublisher.publish('sensor.humidity.kitchen', '60%'); // Won't be received
await new Promise(resolve => setTimeout(resolve, 2000));
// 3. RPC Demo
console.log('\n3. RPC Demo');
const rpcServer = await createRpcServer();
const rpcClient = await createRpcClient();
// Make RPC calls
const addResult = await rpcClient.call('math_service', 'add', [5, 3, 2]);
console.log(`RPC Result (add): ${addResult}`);
const multiplyResult = await rpcClient.call('math_service', 'multiply', [4, 5]);
console.log(`RPC Result (multiply): ${multiplyResult}`);
const factorialResult = await rpcClient.call('math_service', 'factorial', [5]);
console.log(`RPC Result (factorial): ${factorialResult}`);
// 4. Dead Letter Queue Demo
console.log('\n4. Dead Letter Queue Demo');
const dlqManager = await setupDeadLetterQueue();
// Set up DLQ consumer
dlqManager.consumeFromDLQ((message, properties) => {
console.log(`💀 Dead Letter: ${message}`);
console.log(` Headers: ${JSON.stringify(properties.headers, null, 2)}`);
});
// Close connections
setTimeout(() => {
console.log('\n🔌 Closing connections...');
producer.close();
consumer.close();
exchangePublisher.close();
exchangeSubscriber.close();
rpcServer.close();
rpcClient.close();
dlqManager.close();
}, 3000);
} catch (error) {
console.error('❌ Demo error:', error);
}
}
// Run demo if called directly
if (require.main === module) {
runDemo();
}
module.exports = {
createProducer,
createConsumer,
createExchangePublisher,
createExchangeSubscriber,
createRpcServer,
createRpcClient,
setupDeadLetterQueue
};
💻 Корпоративные паттерны AMQP javascript
🟡 intermediate
⭐⭐⭐⭐
Расширенные паттерны корпоративной передачи сообщений, включая очереди работы, тематическую маршрутизацию, очереди приоритетов и планирование сообщений
⏱️ 35 min
🏷️ amqp, enterprise, patterns, rabbitmq, queue
Prerequisites:
AMQP basics, Enterprise patterns, Node.js async
// Enterprise AMQP Patterns
// Advanced patterns for production messaging systems
const amqp = require('amqplib/callback_api');
const EventEmitter = require('events');
const { v4: uuidv4 } = require('uuid');
// Configuration
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';
// 1. Work Queue Pattern (Task Distribution)
class WorkQueue {
constructor(queueName = 'work_queue', options = {}) {
this.queueName = queueName;
this.options = {
durable: true,
prefetch: options.prefetch || 1,
maxLength: options.maxLength || 10000,
messageTTL: options.messageTTL || 3600000, // 1 hour
...options
};
this.connection = null;
this.channel = null;
this.workers = new Map();
}
async connect() {
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
this.connection = connection;
this.channel = channel;
// Declare work queue with enterprise features
channel.assertQueue(this.queueName, {
durable: this.options.durable,
maxLength: this.options.maxLength,
messageTTL: this.options.messageTTL,
arguments: {
'x-queue-mode': 'lazy', // Save memory for large queues
'x-max-priority': 10 // Enable message priorities
}
});
// Set prefetch for fair dispatch
channel.prefetch(this.options.prefetch);
console.log(`✅ Work queue '${this.queueName}' ready`);
resolve();
});
});
});
}
// Add task to work queue
addTask(task, priority = 5, delay = 0) {
if (!this.channel) {
throw new Error('WorkQueue not connected');
}
const message = {
id: uuidv4(),
task: task,
createdAt: new Date().toISOString(),
priority,
delay
};
const headers = {};
if (delay > 0) {
headers['x-delay'] = delay;
}
this.channel.sendToQueue(
this.queueName,
Buffer.from(JSON.stringify(message)),
{
persistent: true,
priority: priority,
timestamp: Date.now(),
messageId: message.id,
headers
}
);
console.log(`📋 Task added: ${task} (priority: ${priority})`);
return message.id;
}
// Register worker
async registerWorker(workerId, handler) {
if (!this.channel) {
throw new Error('WorkQueue not connected');
}
this.channel.consume(this.queueName, async (msg) => {
if (msg) {
try {
const task = JSON.parse(msg.content.toString());
console.log(`🔧 Worker ${workerId} processing task: ${task.task}`);
// Process task
const startTime = Date.now();
await handler(task);
const duration = Date.now() - startTime;
console.log(`✅ Worker ${workerId} completed task in ${duration}ms`);
this.channel.ack(msg);
// Update worker stats
const stats = this.workers.get(workerId) || { tasks: 0, totalTime: 0 };
stats.tasks++;
stats.totalTime += duration;
this.workers.set(workerId, stats);
} catch (error) {
console.error(`❌ Worker ${workerId} error:`, error);
// Reject message without requeue (failed tasks go to DLQ if configured)
this.channel.nack(msg, false, false);
}
}
}, {
noAck: false,
consumerTag: `worker_${workerId}`
});
console.log(`👷 Worker '${workerId}' registered`);
}
// Get worker statistics
getStats() {
return Array.from(this.workers.entries()).map(([workerId, stats]) => ({
workerId,
tasks: stats.tasks,
totalTime: stats.totalTime,
avgTime: stats.tasks > 0 ? stats.totalTime / stats.tasks : 0
}));
}
// Close connections
close() {
if (this.channel && this.connection) {
this.channel.close(() => {
this.connection.close();
console.log(`🔌 Work queue '${this.queueName}' closed`);
});
}
}
}
// 2. Topic Exchange Pattern (Pub/Sub with routing)
class TopicPublisher extends EventEmitter {
constructor(exchangeName = 'topics') {
super();
this.exchangeName = exchangeName;
this.connection = null;
this.channel = null;
this.routingStats = new Map();
}
async connect() {
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
this.connection = connection;
this.channel = channel;
// Declare topic exchange
channel.assertExchange(this.exchangeName, 'topic', {
durable: true,
arguments: {
'alternate-exchange': `${this.exchangeName}_alt` // Alternate exchange for unroutable messages
}
});
// Declare alternate exchange
channel.assertExchange(`${this.exchangeName}_alt`, 'fanout', {
durable: true
});
console.log(`✅ Topic exchange '${this.exchangeName}' ready`);
resolve();
});
});
});
}
// Publish message with routing key
publish(routingKey, message, options = {}) {
if (!this.channel) {
throw new Error('TopicPublisher not connected');
}
const messageData = {
id: uuidv4(),
routingKey,
content: message,
timestamp: new Date().toISOString(),
...options
};
this.channel.publish(
this.exchangeName,
routingKey,
Buffer.from(JSON.stringify(messageData)),
{
persistent: true,
timestamp: Date.now(),
messageId: messageData.id,
headers: options.headers || {}
}
);
// Update routing statistics
const count = this.routingStats.get(routingKey) || 0;
this.routingStats.set(routingKey, count + 1);
console.log(`📤 Published to '${routingKey}': ${message}`);
this.emit('published', { routingKey, message: messageData });
return messageData.id;
}
// Get routing statistics
getRoutingStats() {
return Object.fromEntries(this.routingStats);
}
close() {
if (this.channel && this.connection) {
this.channel.close(() => {
this.connection.close();
console.log(`🔌 Topic publisher '${this.exchangeName}' closed`);
});
}
}
}
// 3. Topic Subscriber with pattern matching
class TopicSubscriber extends EventEmitter {
constructor(exchangeName = 'topics', subscriberId = null) {
super();
this.exchangeName = exchangeName;
this.subscriberId = subscriberId || `sub_${uuidv4().substr(0, 8)}`;
this.connection = null;
this.channel = null;
this.queueName = null;
this.bindings = new Set();
this.messageStats = new Map();
}
async connect() {
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
this.connection = connection;
this.channel = channel;
// Declare exchange
channel.assertExchange(this.exchangeName, 'topic', {
durable: true
});
// Create exclusive queue for this subscriber
channel.assertQueue('', {
exclusive: true,
durable: false,
arguments: {
'x-message-ttl': 300000, // 5 minutes TTL
'x-max-length': 1000 // Max 1000 messages
}
}, (error2, q) => {
if (error2) {
reject(error2);
return;
}
this.queueName = q.queue;
// Start consuming messages
channel.consume(this.queueName, (msg) => {
if (msg) {
try {
const messageData = JSON.parse(msg.content.toString());
this.handleMessage(messageData, msg);
this.channel.ack(msg);
} catch (error) {
console.error(`❌ Message processing error:`, error);
this.channel.nack(msg, false, false);
}
}
}, {
noAck: false,
consumerTag: `subscriber_${this.subscriberId}`
});
console.log(`✅ Topic subscriber '${this.subscriberId}' connected`);
resolve();
});
});
});
});
}
// Add binding pattern
bind(pattern) {
if (!this.channel) {
throw new Error('TopicSubscriber not connected');
}
this.channel.bindQueue(this.queueName, this.exchangeName, pattern, {}, (error) => {
if (!error) {
this.bindings.add(pattern);
console.log(`✅ Bound to pattern: ${pattern}`);
this.emit('bound', { pattern });
}
});
}
// Remove binding pattern
unbind(pattern) {
if (!this.channel) {
throw new Error('TopicSubscriber not connected');
}
this.channel.unbindQueue(this.queueName, this.exchangeName, pattern, {}, (error) => {
if (!error) {
this.bindings.delete(pattern);
console.log(`✅ Unbound from pattern: ${pattern}`);
this.emit('unbound', { pattern });
}
});
}
// Handle received message
handleMessage(messageData, msg) {
console.log(`📨 [${messageData.routingKey}] ${messageData.content}`);
// Update message statistics
const routingKey = messageData.routingKey;
const count = this.messageStats.get(routingKey) || 0;
this.messageStats.set(routingKey, count + 1);
// Emit message event
this.emit('message', messageData);
// Emit specific routing key event
this.emit(`message:${routingKey}`, messageData);
}
// Get message statistics
getMessageStats() {
return {
bindings: Array.from(this.bindings),
messageCounts: Object.fromEntries(this.messageStats),
totalMessages: Array.from(this.messageStats.values()).reduce((a, b) => a + b, 0)
};
}
close() {
if (this.channel && this.connection) {
this.channel.close(() => {
this.connection.close();
console.log(`🔌 Topic subscriber '${this.subscriberId}' closed`);
});
}
}
}
// 4. Priority Queue Manager
class PriorityQueueManager {
constructor(queueName = 'priority_queue') {
this.queueName = queueName;
this.connection = null;
this.channel = null;
}
async connect() {
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
this.connection = connection;
this.channel = channel;
// Declare priority queue
channel.assertQueue(this.queueName, {
durable: true,
arguments: {
'x-max-priority': 10 // Support priorities 0-10
}
});
console.log(`✅ Priority queue '${this.queueName}' ready`);
resolve();
});
});
});
}
// Add message with priority
addMessage(message, priority = 5) {
if (!this.channel) {
throw new Error('PriorityQueueManager not connected');
}
const messageData = {
id: uuidv4(),
content: message,
priority,
timestamp: new Date().toISOString()
};
this.channel.sendToQueue(
this.queueName,
Buffer.from(JSON.stringify(messageData)),
{
persistent: true,
priority: Math.max(0, Math.min(10, priority)),
timestamp: Date.now(),
messageId: messageData.id
}
);
console.log(`📋 Added message with priority ${priority}: ${message}`);
return messageData.id;
}
// Process messages in priority order
processMessages(handler) {
if (!this.channel) {
throw new Error('PriorityQueueManager not connected');
}
// Set prefetch to 1 to ensure priority order
this.channel.prefetch(1);
this.channel.consume(this.queueName, async (msg) => {
if (msg) {
try {
const messageData = JSON.parse(msg.content.toString());
console.log(`⚡ Processing priority ${messageData.priority}: ${messageData.content}`);
await handler(messageData);
this.channel.ack(msg);
} catch (error) {
console.error('❌ Processing error:', error);
this.channel.nack(msg, false, false);
}
}
}, {
noAck: false
});
console.log(`🔄 Processing priority queue messages`);
}
close() {
if (this.channel && this.connection) {
this.channel.close(() => {
this.connection.close();
console.log(`🔌 Priority queue '${this.queueName}' closed`);
});
}
}
}
// 5. Message Scheduler (Delayed Messages)
class MessageScheduler {
constructor(queueName = 'scheduled_messages') {
this.queueName = queueName;
this.exchangeName = 'delayed_exchange';
this.connection = null;
this.channel = null;
}
async connect() {
return new Promise((resolve, reject) => {
amqp.connect(RABBITMQ_URL, (error0, connection) => {
if (error0) {
reject(error0);
return;
}
connection.createChannel((error1, channel) => {
if (error1) {
reject(error1);
return;
}
this.connection = connection;
this.channel = channel;
// Declare delayed message exchange (requires rabbitmq_delayed_message_exchange plugin)
channel.assertExchange(this.exchangeName, 'x-delayed-message', {
durable: true,
arguments: {
'x-delayed-type': 'direct'
}
});
// Declare queue for scheduled messages
channel.assertQueue(this.queueName, {
durable: true
});
// Bind queue to exchange
channel.bindQueue(this.queueName, this.exchangeName, this.queueName);
console.log(`✅ Message scheduler ready`);
resolve();
});
});
});
}
// Schedule message for later delivery
schedule(message, delayMs, routingKey = null) {
if (!this.channel) {
throw new Error('MessageScheduler not connected');
}
const messageData = {
id: uuidv4(),
content: message,
scheduledFor: new Date(Date.now() + delayMs).toISOString(),
delay: delayMs,
timestamp: new Date().toISOString()
};
this.channel.publish(
this.exchangeName,
routingKey || this.queueName,
Buffer.from(JSON.stringify(messageData)),
{
persistent: true,
timestamp: Date.now(),
messageId: messageData.id,
headers: {
'x-delay': delayMs
}
}
);
console.log(`⏰ Scheduled message for ${delayMs}ms: ${message}`);
return messageData.id;
}
// Process scheduled messages
processMessages(handler) {
if (!this.channel) {
throw new Error('MessageScheduler not connected');
}
this.channel.consume(this.queueName, async (msg) => {
if (msg) {
try {
const messageData = JSON.parse(msg.content.toString());
console.log(`✅ Scheduled message delivered: ${messageData.content}`);
await handler(messageData);
this.channel.ack(msg);
} catch (error) {
console.error('❌ Processing error:', error);
this.channel.nack(msg, false, false);
}
}
}, {
noAck: false
});
console.log(`🔄 Processing scheduled messages`);
}
close() {
if (this.channel && this.connection) {
this.channel.close(() => {
this.connection.close();
console.log('🔌 Message scheduler closed');
});
}
}
}
// 6. Enterprise Integration Demo
async function enterpriseDemo() {
console.log('=== Enterprise AMQP Patterns Demo ===\n');
try {
// 1. Work Queue Demo
console.log('1. Work Queue Pattern');
const workQueue = new WorkQueue('enterprise_tasks', { prefetch: 2 });
await workQueue.connect();
// Register workers
await workQueue.registerWorker('worker_1', async (task) => {
await new Promise(resolve => setTimeout(resolve, 1000 + Math.random() * 1000));
});
await workQueue.registerWorker('worker_2', async (task) => {
await new Promise(resolve => setTimeout(resolve, 500 + Math.random() * 1000));
});
// Add tasks with different priorities
for (let i = 0; i < 10; i++) {
const priority = Math.random() > 0.7 ? 10 : Math.floor(Math.random() * 5) + 1;
workQueue.addTask(`Task ${i}`, priority);
}
// 2. Topic Exchange Demo
console.log('\n2. Topic Exchange Pattern');
const publisher = new TopicPublisher('enterprise_events');
await publisher.connect();
const subscriber1 = new TopicSubscriber('enterprise_events', 'sub_1');
await subscriber1.connect();
subscriber1.bind('order.*');
subscriber1.bind('payment.*');
const subscriber2 = new TopicSubscriber('enterprise_events', 'sub_2');
await subscriber2.connect();
subscriber2.bind('order.created');
subscriber2.bind('inventory.*');
// Publish events
publisher.publish('order.created', 'New order #12345');
publisher.publish('payment.processed', 'Payment received for order #12345');
publisher.publish('inventory.updated', 'Stock reduced for order #12345');
publisher.publish('shipment.dispatched', 'Order #12345 shipped');
// 3. Priority Queue Demo
console.log('\n3. Priority Queue Pattern');
const priorityQueue = new PriorityQueueManager('urgent_tasks');
await priorityQueue.connect();
priorityQueue.processMessages(async (msg) => {
console.log(`⚡ Processing urgent task: ${msg.content}`);
await new Promise(resolve => setTimeout(resolve, 500));
});
// Add messages with different priorities
priorityQueue.addMessage('Low priority task', 2);
priorityQueue.addMessage('High priority task', 9);
priorityQueue.addMessage('Medium priority task', 5);
priorityQueue.addMessage('Critical task', 10);
// 4. Message Scheduler Demo
console.log('\n4. Message Scheduling');
const scheduler = new MessageScheduler('scheduled_tasks');
await scheduler.connect();
scheduler.processMessages(async (msg) => {
console.log(`⏰ Scheduled task executed: ${msg.content}`);
});
// Schedule messages for different times
scheduler.schedule('Task in 2 seconds', 2000);
scheduler.schedule('Task in 5 seconds', 5000);
scheduler.schedule('Task in 10 seconds', 10000);
// Show statistics after some time
setTimeout(() => {
console.log('\n📊 Work Queue Stats:');
console.table(workQueue.getStats());
console.log('\n📊 Topic Publisher Stats:');
console.log(publisher.getRoutingStats());
console.log('\n📊 Subscriber 1 Stats:');
console.log(subscriber1.getMessageStats());
console.log('\n📊 Subscriber 2 Stats:');
console.log(subscriber2.getMessageStats());
}, 3000);
// Clean up
setTimeout(() => {
console.log('\n🔌 Closing connections...');
workQueue.close();
publisher.close();
subscriber1.close();
subscriber2.close();
priorityQueue.close();
scheduler.close();
}, 15000);
} catch (error) {
console.error('❌ Demo error:', error);
}
}
// Run demo
if (require.main === module) {
enterpriseDemo();
}
module.exports = {
WorkQueue,
TopicPublisher,
TopicSubscriber,
PriorityQueueManager,
MessageScheduler,
enterpriseDemo
};
💻 Реализация AMQP на Python python
🟡 intermediate
⭐⭐⭐⭐
Реализация AMQP на Python с использованием библиотеки pika с корпоративными паттернами и обработкой ошибок
⏱️ 40 min
🏷️ amqp, python, rabbitmq, enterprise, rpc
Prerequisites:
Python, AMQP concepts, Threading, JSON
# AMQP Python Examples
# pip install pika
import pika
import json
import time
import threading
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass
from enum import Enum
import uuid
import queue
import concurrent.futures
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USER = 'guest'
RABBITMQ_PASS = 'guest'
RABBITMQ_VHOST = '/'
class ExchangeType(Enum):
DIRECT = 'direct'
TOPIC = 'topic'
FANOUT = 'fanout'
HEADERS = 'headers'
@dataclass
class AMQPMessage:
id: str
content: Any
timestamp: str
routing_key: str
headers: Dict[str, Any] = None
priority: int = 5
expiration: Optional[int] = None
correlation_id: Optional[str] = None
reply_to: Optional[str] = None
class AMQPConnection:
"""AMQP connection manager with automatic reconnection"""
def __init__(self, host=RABBITMQ_HOST, port=RABBITMQ_PORT,
username=RABBITMQ_USER, password=RABBITMQ_PASS,
virtual_host=RABBITMQ_VHOST):
self.host = host
self.port = port
self.username = username
self.password = password
self.virtual_host = virtual_host
self.connection = None
self.channel = None
self.is_connected = False
self.reconnect_delay = 5
self.max_reconnect_attempts = 10
self.reconnect_attempts = 0
self.connection_callbacks = []
def connect(self) -> bool:
"""Establish connection to RabbitMQ"""
try:
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=5
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.is_connected = True
self.reconnect_attempts = 0
logger.info(f"Connected to RabbitMQ at {self.host}:{self.port}")
# Notify callbacks
for callback in self.connection_callbacks:
callback(True)
return True
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
self.is_connected = False
return False
def disconnect(self):
"""Close connection"""
if self.connection and not self.connection.is_closed:
self.connection.close()
self.is_connected = False
logger.info("Disconnected from RabbitMQ")
def ensure_connection(self) -> bool:
"""Ensure connection is active, reconnect if necessary"""
if not self.is_connected or (self.connection and self.connection.is_closed):
logger.info("Connection lost, attempting to reconnect...")
while self.reconnect_attempts < self.max_reconnect_attempts:
self.reconnect_attempts += 1
logger.info(f"Reconnection attempt {self.reconnect_attempts}")
if self.connect():
return True
time.sleep(self.reconnect_delay)
logger.error(f"Failed to reconnect after {self.max_reconnect_attempts} attempts")
return False
return True
def add_connection_callback(self, callback: Callable[[bool], None]):
"""Add callback for connection state changes"""
self.connection_callbacks.append(callback)
class AMQPProducer:
"""AMQP message producer with advanced features"""
def __init__(self, connection: AMQPConnection, exchange_name: str,
exchange_type: ExchangeType = ExchangeType.DIRECT):
self.connection = connection
self.exchange_name = exchange_name
self.exchange_type = exchange_type.value
self.confirms_enabled = True
self.publish_timeout = 30
self.message_count = 0
self.failed_messages = 0
def setup_exchange(self, durable: bool = True) -> bool:
"""Declare the exchange"""
if not self.connection.ensure_connection():
return False
try:
self.connection.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type=self.exchange_type,
durable=durable,
arguments={
'alternate-exchange': f'{self.exchange_name}_alt'
}
)
logger.info(f"Exchange '{self.exchange_name}' declared")
return True
except Exception as e:
logger.error(f"Failed to declare exchange: {e}")
return False
def enable_confirms(self) -> bool:
"""Enable publisher confirms"""
try:
self.connection.channel.confirm_delivery()
self.confirms_enabled = True
logger.info("Publisher confirms enabled")
return True
except Exception as e:
logger.error(f"Failed to enable confirms: {e}")
return False
def publish(self, routing_key: str, message: Any,
headers: Dict[str, Any] = None,
priority: int = 5,
expiration: int = None,
correlation_id: str = None,
reply_to: str = None) -> bool:
"""Publish a message"""
if not self.connection.ensure_connection():
return False
try:
# Create AMQP message
amqp_message = AMQPMessage(
id=str(uuid.uuid4()),
content=message,
timestamp=datetime.now().isoformat(),
routing_key=routing_key,
headers=headers or {},
priority=priority,
expiration=expiration,
correlation_id=correlation_id,
reply_to=reply_to
)
# Prepare message properties
properties = pika.BasicProperties(
message_id=amqp_message.id,
timestamp=int(time.time()),
content_type='application/json',
delivery_mode=2, # Persistent
priority=priority,
headers=amqp_message.headers,
correlation_id=correlation_id,
reply_to=reply_to
)
# Add expiration if specified
if expiration:
properties.expiration = str(expiration)
# Serialize message
body = json.dumps({
'id': amqp_message.id,
'content': message,
'timestamp': amqp_message.timestamp,
'routing_key': routing_key
}, default=str)
# Publish message
self.connection.channel.basic_publish(
exchange=self.exchange_name,
routing_key=routing_key,
body=body,
properties=properties,
mandatory=True # Return message if unroutable
)
# Wait for confirmation if confirms are enabled
if self.confirms_enabled:
self.connection.channel.wait_for_publish_confirms()
self.message_count += 1
logger.info(f"Published message {amqp_message.id} to '{routing_key}'")
return True
except Exception as e:
self.failed_messages += 1
logger.error(f"Failed to publish message: {e}")
return False
def publish_batch(self, messages: List[Dict]) -> Dict[str, int]:
"""Publish multiple messages efficiently"""
success_count = 0
failure_count = 0
for msg_info in messages:
if self.publish(**msg_info):
success_count += 1
else:
failure_count += 1
return {'success': success_count, 'failure': failure_count}
def get_stats(self) -> Dict[str, Any]:
"""Get publisher statistics"""
return {
'messages_sent': self.message_count,
'messages_failed': self.failed_messages,
'success_rate': (self.message_count / max(1, self.message_count + self.failed_messages)) * 100,
'exchange': self.exchange_name,
'exchange_type': self.exchange_type,
'confirms_enabled': self.confirms_enabled
}
class AMQPConsumer:
"""AMQP message consumer with advanced features"""
def __init__(self, connection: AMQPConnection, queue_name: str):
self.connection = connection
self.queue_name = queue_name
self.consumer_tag = None
self.message_handler = None
self.processed_count = 0
self.failed_count = 0
self.is_consuming = False
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
self.auto_ack = False
def setup_queue(self, durable: bool = True,
max_length: int = None,
message_ttl: int = None,
dead_letter_exchange: str = None) -> bool:
"""Declare the queue with enterprise features"""
if not self.connection.ensure_connection():
return False
try:
arguments = {}
if max_length:
arguments['x-max-length'] = max_length
if message_ttl:
arguments['x-message-ttl'] = message_ttl
if dead_letter_exchange:
arguments['x-dead-letter-exchange'] = dead_letter_exchange
arguments['x-dead-letter-routing-key'] = self.queue_name
self.connection.channel.queue_declare(
queue=self.queue_name,
durable=durable,
arguments=arguments
)
logger.info(f"Queue '{self.queue_name}' declared")
return True
except Exception as e:
logger.error(f"Failed to declare queue: {e}")
return False
def bind_to_exchange(self, exchange_name: str, routing_key: str = None) -> bool:
"""Bind queue to exchange"""
if not self.connection.ensure_connection():
return False
try:
self.connection.channel.queue_bind(
queue=self.queue_name,
exchange=exchange_name,
routing_key=routing_key or self.queue_name
)
logger.info(f"Bound queue '{self.queue_name}' to exchange '{exchange_name}'")
return True
except Exception as e:
logger.error(f"Failed to bind queue: {e}")
return False
def set_prefetch_count(self, count: int) -> bool:
"""Set QoS prefetch count"""
try:
self.connection.channel.basic_qos(prefetch_count=count)
logger.info(f"Set prefetch count to {count}")
return True
except Exception as e:
logger.error(f"Failed to set prefetch count: {e}")
return False
def set_message_handler(self, handler: Callable[[AMQPMessage], None]):
"""Set the message handler function"""
self.message_handler = handler
def _process_message(self, ch, method, properties, body):
"""Process a single message"""
try:
# Parse message
data = json.loads(body.decode('utf-8'))
amqp_message = AMQPMessage(
id=data.get('id', str(uuid.uuid4())),
content=data.get('content'),
timestamp=data.get('timestamp', datetime.now().isoformat()),
routing_key=method.routing_key,
headers=properties.headers or {},
priority=getattr(properties, 'priority', 5),
correlation_id=properties.correlation_id,
reply_to=properties.reply_to
)
logger.info(f"Processing message {amqp_message.id} from '{method.routing_key}'")
# Handle message in thread pool
if self.message_handler:
future = self.executor.submit(self.message_handler, amqp_message)
# Wait for completion with timeout
try:
future.result(timeout=30)
ch.basic_ack(delivery_tag=method.delivery_tag)
self.processed_count += 1
except concurrent.futures.TimeoutError:
logger.error(f"Message processing timeout for {amqp_message.id}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.failed_count += 1
except Exception as e:
logger.error(f"Message processing error for {amqp_message.id}: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.failed_count += 1
else:
logger.warning("No message handler set, acknowledging message")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.failed_count += 1
def start_consuming(self, auto_ack: bool = False) -> bool:
"""Start consuming messages"""
if not self.connection.ensure_connection():
return False
if not self.message_handler:
logger.warning("No message handler set")
try:
self.auto_ack = auto_ack
self.consumer_tag = self.connection.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self._process_message if not auto_ack else None,
auto_ack=auto_ack
)
self.is_consuming = True
logger.info(f"Started consuming from queue '{self.queue_name}'")
return True
except Exception as e:
logger.error(f"Failed to start consuming: {e}")
return False
def stop_consuming(self):
"""Stop consuming messages"""
if self.consumer_tag and self.connection.channel:
try:
self.connection.channel.basic_cancel(self.consumer_tag)
self.is_consuming = False
logger.info("Stopped consuming messages")
except Exception as e:
logger.error(f"Failed to stop consuming: {e}")
def get_stats(self) -> Dict[str, Any]:
"""Get consumer statistics"""
return {
'queue': self.queue_name,
'messages_processed': self.processed_count,
'messages_failed': self.failed_count,
'is_consuming': self.is_consuming,
'auto_ack': self.auto_ack
}
class AMQPRPCServer:
"""AMQP RPC Server implementation"""
def __init__(self, connection: AMQPConnection, service_name: str):
self.connection = connection
self.service_name = service_name
self.methods = {}
self.consumer = AMQPConsumer(connection, service_name)
def register_method(self, method_name: str, handler: Callable):
"""Register an RPC method"""
self.methods[method_name] = handler
logger.info(f"Registered RPC method: {method_name}")
def _handle_request(self, message: AMQPMessage):
"""Handle RPC request"""
try:
request_data = json.loads(message.content)
method_name = request_data.get('method')
params = request_data.get('params', [])
request_id = request_data.get('request_id')
if method_name not in self.methods:
response = {
'request_id': request_id,
'error': f'Unknown method: {method_name}',
'timestamp': datetime.now().isoformat()
}
else:
try:
result = self.methods[method_name](*params)
response = {
'request_id': request_id,
'result': result,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
response = {
'request_id': request_id,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
# Send response
if message.reply_to:
self._send_response(message.reply_to, response, message.correlation_id)
except Exception as e:
logger.error(f"Error handling RPC request: {e}")
def _send_response(self, reply_to: str, response: Dict, correlation_id: str):
"""Send RPC response"""
try:
properties = pika.BasicProperties(
correlation_id=correlation_id,
content_type='application/json'
)
self.connection.channel.basic_publish(
exchange='',
routing_key=reply_to,
body=json.dumps(response, default=str),
properties=properties
)
except Exception as e:
logger.error(f"Failed to send RPC response: {e}")
def start(self) -> bool:
"""Start RPC server"""
if not self.consumer.setup_queue(durable=False):
return False
self.consumer.set_message_handler(self._handle_request)
return self.consumer.start_consuming()
def stop(self):
"""Stop RPC server"""
self.consumer.stop_consuming()
class AMQPRPCClient:
"""AMQP RPC Client implementation"""
def __init__(self, connection: AMQPConnection):
self.connection = connection
self.callback_queue = None
self.response_queue = queue.Queue()
self.correlation_ids = set()
def setup(self) -> bool:
"""Setup RPC client"""
if not self.connection.ensure_connection():
return False
try:
# Declare callback queue
result = self.connection.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
# Start consuming responses
self.connection.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self._on_response,
auto_ack=True
)
return True
except Exception as e:
logger.error(f"Failed to setup RPC client: {e}")
return False
def _on_response(self, ch, method, properties, body):
"""Handle RPC response"""
if properties.correlation_id in self.correlation_ids:
self.correlation_ids.remove(properties.correlation_id)
response = json.loads(body.decode('utf-8'))
self.response_queue.put(response)
def call(self, service_name: str, method: str, params: List = None,
timeout: float = 30.0) -> Any:
"""Make RPC call"""
if not self.setup():
raise Exception("Failed to setup RPC client")
correlation_id = str(uuid.uuid4())
self.correlation_ids.add(correlation_id)
request = {
'method': method,
'params': params or [],
'request_id': correlation_id,
'timestamp': datetime.now().isoformat()
}
try:
# Send request
self.connection.channel.basic_publish(
exchange='',
routing_key=service_name,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=correlation_id,
content_type='application/json'
),
body=json.dumps(request, default=str)
)
# Wait for response
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = self.response_queue.get(timeout=1)
if 'error' in response:
raise Exception(response['error'])
return response['result']
except queue.Empty:
continue
raise TimeoutError(f"RPC call timed out after {timeout} seconds")
finally:
if correlation_id in self.correlation_ids:
self.correlation_ids.remove(correlation_id)
# Example usage
def example_work_queue():
"""Example work queue implementation"""
print("=== Work Queue Example ===")
# Create connection
conn = AMQPConnection()
if not conn.connect():
return
# Create producer
producer = AMQPProducer(conn, 'work_exchange', ExchangeType.DIRECT)
producer.setup_exchange()
# Create consumer
consumer = AMQPConsumer(conn, 'work_queue')
consumer.setup_queue()
consumer.bind_to_exchange('work_exchange', 'work_tasks')
consumer.set_prefetch_count(5)
def handle_task(message: AMQPMessage):
print(f"Processing task: {message.content}")
time.sleep(1) # Simulate work
consumer.set_message_handler(handle_task)
consumer.start_consuming()
# Send tasks
for i in range(10):
producer.publish('work_tasks', f'Task {i}')
time.sleep(5)
consumer.stop_consuming()
conn.disconnect()
def example_pub_sub():
"""Example publish/subscribe pattern"""
print("=== Pub/Sub Example ===")
conn = AMQPConnection()
if not conn.connect():
return
# Create publisher
publisher = AMQPProducer(conn, 'events', ExchangeType.TOPIC)
publisher.setup_exchange()
# Create subscribers
def create_subscriber(name: str, bindings: List[str]):
consumer = AMQPConsumer(conn, f'events_sub_{name}')
consumer.setup_queue(durable=False)
for binding in bindings:
consumer.bind_to_exchange('events', binding)
def handle_event(message: AMQPMessage):
print(f"[{name}] {message.routing_key}: {message.content}")
consumer.set_message_handler(handle_event)
consumer.start_consuming()
return consumer
# Create multiple subscribers
sub1 = create_subscriber('payment', ['payment.*'])
sub2 = create_subscriber('order', ['order.*'])
sub3 = create_subscriber('all', ['*.*'])
# Publish events
events = [
('order.created', 'Order #123 created'),
('payment.processed', 'Payment for order #123 processed'),
('order.shipped', 'Order #123 shipped'),
('payment.refunded', 'Refund for order #124 processed')
]
for routing_key, content in events:
publisher.publish(routing_key, content)
time.sleep(0.5)
time.sleep(2)
# Cleanup
for sub in [sub1, sub2, sub3]:
sub.stop_consuming()
conn.disconnect()
def example_rpc():
"""Example RPC implementation"""
print("=== RPC Example ===")
conn = AMQPConnection()
if not conn.connect():
return
# Create RPC server
rpc_server = AMQPRPCServer(conn, 'math_service')
# Register methods
def add(a, b):
return a + b
def multiply(a, b):
return a * b
def factorial(n):
if n <= 1:
return 1
return n * factorial(n - 1)
rpc_server.register_method('add', add)
rpc_server.register_method('multiply', multiply)
rpc_server.register_method('factorial', factorial)
# Start server in separate thread
server_thread = threading.Thread(target=rpc_server.start)
server_thread.daemon = True
server_thread.start()
# Create RPC client
rpc_client = AMQPRPCClient(conn)
# Make RPC calls
try:
result1 = rpc_client.call('math_service', 'add', [5, 3])
print(f"5 + 3 = {result1}")
result2 = rpc_client.call('math_service', 'multiply', [4, 7])
print(f"4 * 7 = {result2}")
result3 = rpc_client.call('math_service', 'factorial', [5])
print(f"5! = {result3}")
except Exception as e:
print(f"RPC error: {e}")
time.sleep(1)
rpc_server.stop()
conn.disconnect()
if __name__ == "__main__":
example_work_queue()
print()
example_pub_sub()
print()
example_rpc()