🎯 Рекомендуемые коллекции

Балансированные коллекции примеров кода из различных категорий, которые вы можете исследовать

Протокол сообщений AMQP

Примеры протокола AMQP (Advanced Message Queuing Protocol) для корпоративной передачи сообщений с RabbitMQ, очередями, обменниками и расширенными паттернами маршрутизации

💻 Основы AMQP и RabbitMQ javascript

🟢 simple ⭐⭐

Введение в протокол AMQP и RabbitMQ с базовыми паттернами публикации/подписки

⏱️ 20 min 🏷️ amqp, rabbitmq, messaging, queue, exchange
Prerequisites: Node.js, Message queue concepts, RabbitMQ basics
// AMQP Basic Examples using amqplib
// npm install amqplib

const amqp = require('amqplib/callback_api');

// RabbitMQ connection configuration
const RABBITMQ_URL = 'amqp://guest:guest@localhost:5672';
const EXCHANGE_NAME = 'demo_exchange';
const QUEUE_NAME = 'demo_queue';
const ROUTING_KEY = 'demo.routing.key';

// 1. Basic Producer (Publisher)
async function createProducer() {
  console.log('Creating AMQP Producer...');

  return new Promise((resolve, reject) => {
    amqp.connect(RABBITMQ_URL, (error0, connection) => {
      if (error0) {
        reject(error0);
        return;
      }

      connection.createChannel((error1, channel) => {
        if (error1) {
          reject(error1);
          return;
        }

        console.log('✅ Producer channel created');

        // Declare a queue (idempotent - will only be created if it doesn't exist)
        channel.assertQueue(QUEUE_NAME, {
          durable: true // Queue will survive broker restart
        }, (error2, q) => {
          if (error2) {
            reject(error2);
            return;
          }

          console.log(`✅ Queue '${QUEUE_NAME}' declared`);

          // Producer function
          const producer = {
            channel,
            connection,

            // Send message
            sendMessage: (message, options = {}) => {
              const messageBuffer = Buffer.from(JSON.stringify({
                content: message,
                timestamp: new Date().toISOString(),
                id: Math.random().toString(36).substr(2, 9),
                ...options
              }));

              const defaultOptions = {
                persistent: true, // Message will survive broker restart
                timestamp: Date.now(),
                messageId: Math.random().toString(36).substr(2, 9)
              };

              const sendOptions = { ...defaultOptions, ...options };

              channel.sendToQueue(
                QUEUE_NAME,
                messageBuffer,
                sendOptions,
                (error3) => {
                  if (error3) {
                    console.error('Failed to send message:', error3);
                  } else {
                    console.log(`✅ Message sent to queue '${QUEUE_NAME}': ${message}`);
                  }
                }
              );
            },

            // Send multiple messages
            sendBatch: (messages) => {
              messages.forEach((msg, index) => {
                setTimeout(() => {
                  producer.sendMessage(msg.content, msg.options);
                }, index * 100); // 100ms delay between messages
              });
            },

            // Close connection
            close: () => {
              channel.close(() => {
                connection.close();
                console.log('🔌 Producer connection closed');
              });
            }
          };

          resolve(producer);
        });
      });
    });
  });
}

// 2. Basic Consumer (Subscriber)
async function createConsumer(queueName = QUEUE_NAME) {
  console.log(`Creating AMQP Consumer for queue '${queueName}'...`);

  return new Promise((resolve, reject) => {
    amqp.connect(RABBITMQ_URL, (error0, connection) => {
      if (error0) {
        reject(error0);
        return;
      }

      connection.createChannel((error1, channel) => {
        if (error1) {
          reject(error1);
          return;
        }

        console.log('✅ Consumer channel created');

        // Set prefetch to control how many messages are fetched at once
        channel.prefetch(1);

        // Declare queue (ensures it exists)
        channel.assertQueue(queueName, {
          durable: true
        }, (error2, q) => {
          if (error2) {
            reject(error2);
            return;
          }

          console.log(`✅ Queue '${queueName}' declared`);
          console.log(`📋 Waiting for messages in queue '${queueName}'. To exit press CTRL+C`);

          // Start consuming messages
          channel.consume(queueName, (msg) => {
            if (msg) {
              try {
                const messageContent = JSON.parse(msg.content.toString());
                console.log(`📨 Received message: `, messageContent);

                // Process the message (simulating processing time)
                setTimeout(() => {
                  console.log(`✅ Processed message: ${messageContent.content}`);

                  // Acknowledge the message
                  channel.ack(msg);
                }, 500);

              } catch (error) {
                console.error('❌ Error processing message:', error);

                // Negative acknowledge (reject and requeue)
                channel.nack(msg, false, true);
              }
            }
          }, {
            noAck: false // Manual acknowledgment mode
          });

          const consumer = {
            channel,
            connection,
            queueName,

            // Stop consuming
            stop: () => {
              channel.cancel(queueName, () => {
                console.log(`🛑 Stopped consuming from '${queueName}'`);
              });
            },

            // Close connection
            close: () => {
              channel.close(() => {
                connection.close();
                console.log('🔌 Consumer connection closed');
              });
            }
          };

          resolve(consumer);
        });
      });
    });
  });
}

// 3. Exchange-based Publisher/Subscriber
async function createExchangePublisher(exchangeName = EXCHANGE_NAME) {
  console.log(`Creating Exchange Publisher for '${exchangeName}'...`);

  return new Promise((resolve, reject) => {
    amqp.connect(RABBITMQ_URL, (error0, connection) => {
      if (error0) {
        reject(error0);
        return;
      }

      connection.createChannel((error1, channel) => {
        if (error1) {
          reject(error1);
          return;
        }

        // Declare a topic exchange
        channel.assertExchange(exchangeName, 'topic', {
          durable: true
        }, (error2) => {
          if (error2) {
            reject(error2);
            return;
          }

          console.log(`✅ Exchange '${exchangeName}' declared`);

          const publisher = {
            channel,
            connection,
            exchangeName,

            // Publish message with routing key
            publish: (routingKey, message, options = {}) => {
              const messageBuffer = Buffer.from(JSON.stringify({
                content: message,
                timestamp: new Date().toISOString(),
                routingKey,
                ...options
              }));

              channel.publish(exchangeName, routingKey, messageBuffer, {
                persistent: true,
                timestamp: Date.now(),
                ...options
              });

              console.log(`📤 Published to '${routingKey}': ${message}`);
            },

            // Close connection
            close: () => {
              channel.close(() => {
                connection.close();
                console.log('🔌 Publisher connection closed');
              });
            }
          };

          resolve(publisher);
        });
      });
    });
  });
}

// 4. Exchange-based Subscriber
async function createExchangeSubscriber(exchangeName = EXCHANGE_NAME, bindingPatterns = []) {
  console.log(`Creating Exchange Subscriber for '${exchangeName}'...`);

  return new Promise((resolve, reject) => {
    amqp.connect(RABBITMQ_URL, (error0, connection) => {
      if (error0) {
        reject(error0);
        return;
      }

      connection.createChannel((error1, channel) => {
        if (error1) {
          reject(error1);
          return;
        }

        // Declare exchange
        channel.assertExchange(exchangeName, 'topic', {
          durable: true
        }, (error2) => {
          if (error2) {
            reject(error2);
            return;
          }

          // Create an exclusive queue for this subscriber
          channel.assertQueue('', {
            exclusive: true,
            durable: false
          }, (error3, q) => {
            if (error3) {
              reject(error3);
              return;
            }

            console.log(`✅ Created exclusive queue: ${q.queue}`);

            // Bind queue to exchange with routing patterns
            const bindings = [];
            bindingPatterns.forEach(pattern => {
              channel.bindQueue(q.queue, exchangeName, pattern, {}, (error4) => {
                if (error4) {
                  console.error(`❌ Failed to bind '${pattern}':`, error4);
                } else {
                  console.log(`✅ Bound queue to '${pattern}'`);
                  bindings.push(pattern);
                }
              });
            });

            // Start consuming
            channel.consume(q.queue, (msg) => {
              if (msg) {
                try {
                  const messageContent = JSON.parse(msg.content.toString());
                  console.log(`📨 [${msg.fields.routingKey}] ${messageContent.content}`);

                  // Acknowledge message
                  channel.ack(msg);
                } catch (error) {
                  console.error('❌ Error processing message:', error);
                  channel.nack(msg, false, false); // Don't requeue on error
                }
              }
            }, {
              noAck: false
            });

            const subscriber = {
              channel,
              connection,
              queueName: q.queue,
              bindings,

              // Add new binding
              addBinding: (pattern) => {
                channel.bindQueue(q.queue, exchangeName, pattern, {}, (error) => {
                  if (!error) {
                    bindings.push(pattern);
                    console.log(`✅ Added binding for '${pattern}'`);
                  }
                });
              },

              // Remove binding
              removeBinding: (pattern) => {
                channel.unbindQueue(q.queue, exchangeName, pattern, {}, (error) => {
                  if (!error) {
                    const index = bindings.indexOf(pattern);
                    if (index > -1) {
                      bindings.splice(index, 1);
                    }
                    console.log(`✅ Removed binding for '${pattern}'`);
                  }
                });
              },

              // Close connection
              close: () => {
                channel.close(() => {
                  connection.close();
                  console.log('🔌 Subscriber connection closed');
                });
              }
            };

            resolve(subscriber);
          });
        });
      });
    });
  });
}

// 5. RPC (Remote Procedure Call) Pattern
async function createRpcServer(serviceName = 'math_service') {
  console.log(`Creating RPC Server for '${serviceName}'...`);

  return new Promise((resolve, reject) => {
    amqp.connect(RABBITMQ_URL, (error0, connection) => {
      if (error0) {
        reject(error0);
        return;
      }

      connection.createChannel((error1, channel) => {
        if (error1) {
          reject(error1);
          return;
        }

        // Declare RPC queue
        channel.assertQueue(serviceName, {
          durable: false
        }, (error2) => {
          if (error2) {
            reject(error2);
            return;
          }

          console.log(`✅ RPC Server '${serviceName}' ready`);

          // Set prefetch to 1 to process one request at a time
          channel.prefetch(1);

          // Process RPC requests
          channel.consume(serviceName, (msg) => {
            if (msg) {
              try {
                const request = JSON.parse(msg.content.toString());
                console.log(`🔧 RPC Request: ${request.method}(${request.params?.join(', ')})`);

                let result;
                let error = null;

                // Handle different RPC methods
                switch (request.method) {
                  case 'add':
                    result = request.params.reduce((a, b) => a + b, 0);
                    break;
                  case 'multiply':
                    result = request.params.reduce((a, b) => a * b, 1);
                    break;
                  case 'factorial':
                    result = factorial(request.params[0]);
                    break;
                  case 'fibonacci':
                    result = fibonacci(request.params[0]);
                    break;
                  default:
                    error = `Unknown method: ${request.method}`;
                }

                const response = {
                  requestId: request.requestId,
                  result: result,
                  error: error,
                  timestamp: new Date().toISOString()
                };

                // Send response back to client
                channel.sendToQueue(
                  msg.properties.replyTo,
                  Buffer.from(JSON.stringify(response)),
                  {
                    correlationId: msg.properties.correlationId
                  }
                );

                console.log(`✅ RPC Response: ${error || result}`);
                channel.ack(msg);

              } catch (error) {
                console.error('❌ RPC Error:', error);
                channel.nack(msg, false, false);
              }
            }
          }, {
            noAck: false
          });

          const rpcServer = {
            channel,
            connection,
            serviceName,

            close: () => {
              channel.close(() => {
                connection.close();
                console.log('🔌 RPC Server connection closed');
              });
            }
          };

          resolve(rpcServer);
        });
      });
    });
  });
}

// 6. RPC Client
async function createRpcClient() {
  console.log('Creating RPC Client...');

  return new Promise((resolve, reject) => {
    amqp.connect(RABBITMQ_URL, (error0, connection) => {
      if (error0) {
        reject(error0);
        return;
      }

      connection.createChannel((error1, channel) => {
        if (error1) {
          reject(error1);
          return;
        }

        // Create callback queue for responses
        channel.assertQueue('', {
          exclusive: true
        }, (error2, q) => {
          if (error2) {
            reject(error2);
            return;
          }

          const pendingRequests = new Map();

          // Consume responses
          channel.consume(q.queue, (msg) => {
            if (msg) {
              try {
                const response = JSON.parse(msg.content.toString());
                const { requestId, result, error } = response;

                if (pendingRequests.has(requestId)) {
                  const { resolve, reject, timeout } = pendingRequests.get(requestId);
                  clearTimeout(timeout);
                  pendingRequests.delete(requestId);

                  if (error) {
                    reject(new Error(error));
                  } else {
                    resolve(result);
                  }
                }

                channel.ack(msg);
              } catch (error) {
                console.error('❌ Error parsing RPC response:', error);
                channel.nack(msg, false, false);
              }
            }
          }, {
            noAck: false
          });

          const rpcClient = {
            channel,
            connection,
            callbackQueue: q.queue,

            // Call RPC method
            call: (serviceName, method, params = [], timeoutMs = 5000) => {
              return new Promise((resolve, reject) => {
                const requestId = Math.random().toString(36).substr(2, 9);
                const request = {
                  requestId,
                  method,
                  params,
                  timestamp: new Date().toISOString()
                };

                // Set timeout
                const timeout = setTimeout(() => {
                  pendingRequests.delete(requestId);
                  reject(new Error(`RPC timeout for ${method}`));
                }, timeoutMs);

                pendingRequests.set(requestId, { resolve, reject, timeout });

                // Send request
                channel.sendToQueue(
                  serviceName,
                  Buffer.from(JSON.stringify(request)),
                  {
                    replyTo: q.queue,
                    correlationId: requestId,
                    expiration: timeoutMs
                  }
                );

                console.log(`📞 RPC Call: ${method}(${params.join(', ')})`);
              });
            },

            close: () => {
              channel.close(() => {
                connection.close();
                console.log('🔌 RPC Client connection closed');
              });
            }
          };

          resolve(rpcClient);
        });
      });
    });
  });
}

// Helper functions
function factorial(n) {
  if (n <= 1) return 1;
  return n * factorial(n - 1);
}

function fibonacci(n) {
  if (n <= 1) return n;
  let a = 0, b = 1;
  for (let i = 2; i <= n; i++) {
    [a, b] = [b, a + b];
  }
  return b;
}

// 7. Dead Letter Queue Example
async function setupDeadLetterQueue() {
  console.log('Setting up Dead Letter Queue...');

  return new Promise((resolve, reject) => {
    amqp.connect(RABBITMQ_URL, (error0, connection) => {
      if (error0) {
        reject(error0);
        return;
      }

      connection.createChannel((error1, channel) => {
        if (error1) {
          reject(error1);
          return;
        }

        // Declare dead letter exchange
        channel.assertExchange('dlx', 'direct', { durable: true });

        // Declare dead letter queue
        channel.assertQueue('dlq', { durable: true });
        channel.bindQueue('dlq', 'dlx', 'dlq');

        // Declare main queue with dead letter exchange
        channel.assertQueue('main_queue', {
          durable: true,
          arguments: {
            'x-dead-letter-exchange': 'dlx',
            'x-dead-letter-routing-key': 'dlq',
            'x-message-ttl': 60000 // Messages expire after 60 seconds
          }
        });

        console.log('✅ Dead Letter Queue setup complete');

        const dlqManager = {
          channel,
          connection,

          publishToMain: (message) => {
            channel.sendToQueue('main_queue', Buffer.from(message), {
              persistent: true
            });
            console.log(`📤 Published to main queue: ${message}`);
          },

          consumeFromDLQ: (callback) => {
            channel.consume('dlq', (msg) => {
              if (msg) {
                callback(msg.content.toString(), msg.properties);
                channel.ack(msg);
              }
            }, { noAck: false });
            console.log('📨 Consuming from Dead Letter Queue');
          },

          close: () => {
            channel.close(() => {
              connection.close();
              console.log('🔌 DLQ Manager connection closed');
            });
          }
        };

        resolve(dlqManager);
      });
    });
  });
}

// Main demonstration
async function runDemo() {
  console.log('=== AMQP Demo ===\n');

  try {
    // 1. Basic producer/consumer
    console.log('1. Basic Producer/Consumer Demo');
    const producer = await createProducer();
    const consumer = await createConsumer();

    // Send messages
    producer.sendMessage('Hello AMQP!');
    producer.sendMessage('This is a test message');
    producer.sendMessage('Message with options', { priority: 5 });

    // Wait for processing
    await new Promise(resolve => setTimeout(resolve, 2000));

    // 2. Exchange-based pub/sub
    console.log('\n2. Exchange-based Publisher/Subscriber Demo');
    const exchangePublisher = await createExchangePublisher();
    const exchangeSubscriber = await createExchangeSubscriber('demo_exchange', [
      'sensor.temperature.*',
      'alert.critical.*'
    ]);

    // Publish to exchange with different routing keys
    exchangePublisher.publish('sensor.temperature.living_room', '25°C');
    exchangePublisher.publish('sensor.temperature.bedroom', '22°C');
    exchangePublisher.publish('alert.critical.fire', 'Fire detected!');
    exchangePublisher.publish('sensor.humidity.kitchen', '60%'); // Won't be received

    await new Promise(resolve => setTimeout(resolve, 2000));

    // 3. RPC Demo
    console.log('\n3. RPC Demo');
    const rpcServer = await createRpcServer();
    const rpcClient = await createRpcClient();

    // Make RPC calls
    const addResult = await rpcClient.call('math_service', 'add', [5, 3, 2]);
    console.log(`RPC Result (add): ${addResult}`);

    const multiplyResult = await rpcClient.call('math_service', 'multiply', [4, 5]);
    console.log(`RPC Result (multiply): ${multiplyResult}`);

    const factorialResult = await rpcClient.call('math_service', 'factorial', [5]);
    console.log(`RPC Result (factorial): ${factorialResult}`);

    // 4. Dead Letter Queue Demo
    console.log('\n4. Dead Letter Queue Demo');
    const dlqManager = await setupDeadLetterQueue();

    // Set up DLQ consumer
    dlqManager.consumeFromDLQ((message, properties) => {
      console.log(`💀 Dead Letter: ${message}`);
      console.log(`   Headers: ${JSON.stringify(properties.headers, null, 2)}`);
    });

    // Close connections
    setTimeout(() => {
      console.log('\n🔌 Closing connections...');
      producer.close();
      consumer.close();
      exchangePublisher.close();
      exchangeSubscriber.close();
      rpcServer.close();
      rpcClient.close();
      dlqManager.close();
    }, 3000);

  } catch (error) {
    console.error('❌ Demo error:', error);
  }
}

// Run demo if called directly
if (require.main === module) {
  runDemo();
}

module.exports = {
  createProducer,
  createConsumer,
  createExchangePublisher,
  createExchangeSubscriber,
  createRpcServer,
  createRpcClient,
  setupDeadLetterQueue
};

💻 Корпоративные паттерны AMQP javascript

🟡 intermediate ⭐⭐⭐⭐

Расширенные паттерны корпоративной передачи сообщений, включая очереди работы, тематическую маршрутизацию, очереди приоритетов и планирование сообщений

⏱️ 35 min 🏷️ amqp, enterprise, patterns, rabbitmq, queue
Prerequisites: AMQP basics, Enterprise patterns, Node.js async
// Enterprise AMQP Patterns
// Advanced patterns for production messaging systems

const amqp = require('amqplib/callback_api');
const EventEmitter = require('events');
const { v4: uuidv4 } = require('uuid');

// Configuration
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';

// 1. Work Queue Pattern (Task Distribution)
class WorkQueue {
  constructor(queueName = 'work_queue', options = {}) {
    this.queueName = queueName;
    this.options = {
      durable: true,
      prefetch: options.prefetch || 1,
      maxLength: options.maxLength || 10000,
      messageTTL: options.messageTTL || 3600000, // 1 hour
      ...options
    };
    this.connection = null;
    this.channel = null;
    this.workers = new Map();
  }

  async connect() {
    return new Promise((resolve, reject) => {
      amqp.connect(RABBITMQ_URL, (error0, connection) => {
        if (error0) {
          reject(error0);
          return;
        }

        connection.createChannel((error1, channel) => {
          if (error1) {
            reject(error1);
            return;
          }

          this.connection = connection;
          this.channel = channel;

          // Declare work queue with enterprise features
          channel.assertQueue(this.queueName, {
            durable: this.options.durable,
            maxLength: this.options.maxLength,
            messageTTL: this.options.messageTTL,
            arguments: {
              'x-queue-mode': 'lazy', // Save memory for large queues
              'x-max-priority': 10 // Enable message priorities
            }
          });

          // Set prefetch for fair dispatch
          channel.prefetch(this.options.prefetch);

          console.log(`✅ Work queue '${this.queueName}' ready`);
          resolve();
        });
      });
    });
  }

  // Add task to work queue
  addTask(task, priority = 5, delay = 0) {
    if (!this.channel) {
      throw new Error('WorkQueue not connected');
    }

    const message = {
      id: uuidv4(),
      task: task,
      createdAt: new Date().toISOString(),
      priority,
      delay
    };

    const headers = {};
    if (delay > 0) {
      headers['x-delay'] = delay;
    }

    this.channel.sendToQueue(
      this.queueName,
      Buffer.from(JSON.stringify(message)),
      {
        persistent: true,
        priority: priority,
        timestamp: Date.now(),
        messageId: message.id,
        headers
      }
    );

    console.log(`📋 Task added: ${task} (priority: ${priority})`);
    return message.id;
  }

  // Register worker
  async registerWorker(workerId, handler) {
    if (!this.channel) {
      throw new Error('WorkQueue not connected');
    }

    this.channel.consume(this.queueName, async (msg) => {
      if (msg) {
        try {
          const task = JSON.parse(msg.content.toString());
          console.log(`🔧 Worker ${workerId} processing task: ${task.task}`);

          // Process task
          const startTime = Date.now();
          await handler(task);
          const duration = Date.now() - startTime;

          console.log(`✅ Worker ${workerId} completed task in ${duration}ms`);
          this.channel.ack(msg);

          // Update worker stats
          const stats = this.workers.get(workerId) || { tasks: 0, totalTime: 0 };
          stats.tasks++;
          stats.totalTime += duration;
          this.workers.set(workerId, stats);

        } catch (error) {
          console.error(`❌ Worker ${workerId} error:`, error);

          // Reject message without requeue (failed tasks go to DLQ if configured)
          this.channel.nack(msg, false, false);
        }
      }
    }, {
      noAck: false,
      consumerTag: `worker_${workerId}`
    });

    console.log(`👷 Worker '${workerId}' registered`);
  }

  // Get worker statistics
  getStats() {
    return Array.from(this.workers.entries()).map(([workerId, stats]) => ({
      workerId,
      tasks: stats.tasks,
      totalTime: stats.totalTime,
      avgTime: stats.tasks > 0 ? stats.totalTime / stats.tasks : 0
    }));
  }

  // Close connections
  close() {
    if (this.channel && this.connection) {
      this.channel.close(() => {
        this.connection.close();
        console.log(`🔌 Work queue '${this.queueName}' closed`);
      });
    }
  }
}

// 2. Topic Exchange Pattern (Pub/Sub with routing)
class TopicPublisher extends EventEmitter {
  constructor(exchangeName = 'topics') {
    super();
    this.exchangeName = exchangeName;
    this.connection = null;
    this.channel = null;
    this.routingStats = new Map();
  }

  async connect() {
    return new Promise((resolve, reject) => {
      amqp.connect(RABBITMQ_URL, (error0, connection) => {
        if (error0) {
          reject(error0);
          return;
        }

        connection.createChannel((error1, channel) => {
          if (error1) {
            reject(error1);
            return;
          }

          this.connection = connection;
          this.channel = channel;

          // Declare topic exchange
          channel.assertExchange(this.exchangeName, 'topic', {
            durable: true,
            arguments: {
              'alternate-exchange': `${this.exchangeName}_alt` // Alternate exchange for unroutable messages
            }
          });

          // Declare alternate exchange
          channel.assertExchange(`${this.exchangeName}_alt`, 'fanout', {
            durable: true
          });

          console.log(`✅ Topic exchange '${this.exchangeName}' ready`);
          resolve();
        });
      });
    });
  }

  // Publish message with routing key
  publish(routingKey, message, options = {}) {
    if (!this.channel) {
      throw new Error('TopicPublisher not connected');
    }

    const messageData = {
      id: uuidv4(),
      routingKey,
      content: message,
      timestamp: new Date().toISOString(),
      ...options
    };

    this.channel.publish(
      this.exchangeName,
      routingKey,
      Buffer.from(JSON.stringify(messageData)),
      {
        persistent: true,
        timestamp: Date.now(),
        messageId: messageData.id,
        headers: options.headers || {}
      }
    );

    // Update routing statistics
    const count = this.routingStats.get(routingKey) || 0;
    this.routingStats.set(routingKey, count + 1);

    console.log(`📤 Published to '${routingKey}': ${message}`);
    this.emit('published', { routingKey, message: messageData });

    return messageData.id;
  }

  // Get routing statistics
  getRoutingStats() {
    return Object.fromEntries(this.routingStats);
  }

  close() {
    if (this.channel && this.connection) {
      this.channel.close(() => {
        this.connection.close();
        console.log(`🔌 Topic publisher '${this.exchangeName}' closed`);
      });
    }
  }
}

// 3. Topic Subscriber with pattern matching
class TopicSubscriber extends EventEmitter {
  constructor(exchangeName = 'topics', subscriberId = null) {
    super();
    this.exchangeName = exchangeName;
    this.subscriberId = subscriberId || `sub_${uuidv4().substr(0, 8)}`;
    this.connection = null;
    this.channel = null;
    this.queueName = null;
    this.bindings = new Set();
    this.messageStats = new Map();
  }

  async connect() {
    return new Promise((resolve, reject) => {
      amqp.connect(RABBITMQ_URL, (error0, connection) => {
        if (error0) {
          reject(error0);
          return;
        }

        connection.createChannel((error1, channel) => {
          if (error1) {
            reject(error1);
            return;
          }

          this.connection = connection;
          this.channel = channel;

          // Declare exchange
          channel.assertExchange(this.exchangeName, 'topic', {
            durable: true
          });

          // Create exclusive queue for this subscriber
          channel.assertQueue('', {
            exclusive: true,
            durable: false,
            arguments: {
              'x-message-ttl': 300000, // 5 minutes TTL
              'x-max-length': 1000 // Max 1000 messages
            }
          }, (error2, q) => {
            if (error2) {
              reject(error2);
              return;
            }

            this.queueName = q.queue;

            // Start consuming messages
            channel.consume(this.queueName, (msg) => {
              if (msg) {
                try {
                  const messageData = JSON.parse(msg.content.toString());
                  this.handleMessage(messageData, msg);
                  this.channel.ack(msg);
                } catch (error) {
                  console.error(`❌ Message processing error:`, error);
                  this.channel.nack(msg, false, false);
                }
              }
            }, {
              noAck: false,
              consumerTag: `subscriber_${this.subscriberId}`
            });

            console.log(`✅ Topic subscriber '${this.subscriberId}' connected`);
            resolve();
          });
        });
      });
    });
  }

  // Add binding pattern
  bind(pattern) {
    if (!this.channel) {
      throw new Error('TopicSubscriber not connected');
    }

    this.channel.bindQueue(this.queueName, this.exchangeName, pattern, {}, (error) => {
      if (!error) {
        this.bindings.add(pattern);
        console.log(`✅ Bound to pattern: ${pattern}`);
        this.emit('bound', { pattern });
      }
    });
  }

  // Remove binding pattern
  unbind(pattern) {
    if (!this.channel) {
      throw new Error('TopicSubscriber not connected');
    }

    this.channel.unbindQueue(this.queueName, this.exchangeName, pattern, {}, (error) => {
      if (!error) {
        this.bindings.delete(pattern);
        console.log(`✅ Unbound from pattern: ${pattern}`);
        this.emit('unbound', { pattern });
      }
    });
  }

  // Handle received message
  handleMessage(messageData, msg) {
    console.log(`📨 [${messageData.routingKey}] ${messageData.content}`);

    // Update message statistics
    const routingKey = messageData.routingKey;
    const count = this.messageStats.get(routingKey) || 0;
    this.messageStats.set(routingKey, count + 1);

    // Emit message event
    this.emit('message', messageData);

    // Emit specific routing key event
    this.emit(`message:${routingKey}`, messageData);
  }

  // Get message statistics
  getMessageStats() {
    return {
      bindings: Array.from(this.bindings),
      messageCounts: Object.fromEntries(this.messageStats),
      totalMessages: Array.from(this.messageStats.values()).reduce((a, b) => a + b, 0)
    };
  }

  close() {
    if (this.channel && this.connection) {
      this.channel.close(() => {
        this.connection.close();
        console.log(`🔌 Topic subscriber '${this.subscriberId}' closed`);
      });
    }
  }
}

// 4. Priority Queue Manager
class PriorityQueueManager {
  constructor(queueName = 'priority_queue') {
    this.queueName = queueName;
    this.connection = null;
    this.channel = null;
  }

  async connect() {
    return new Promise((resolve, reject) => {
      amqp.connect(RABBITMQ_URL, (error0, connection) => {
        if (error0) {
          reject(error0);
          return;
        }

        connection.createChannel((error1, channel) => {
          if (error1) {
            reject(error1);
            return;
          }

          this.connection = connection;
          this.channel = channel;

          // Declare priority queue
          channel.assertQueue(this.queueName, {
            durable: true,
            arguments: {
              'x-max-priority': 10 // Support priorities 0-10
            }
          });

          console.log(`✅ Priority queue '${this.queueName}' ready`);
          resolve();
        });
      });
    });
  }

  // Add message with priority
  addMessage(message, priority = 5) {
    if (!this.channel) {
      throw new Error('PriorityQueueManager not connected');
    }

    const messageData = {
      id: uuidv4(),
      content: message,
      priority,
      timestamp: new Date().toISOString()
    };

    this.channel.sendToQueue(
      this.queueName,
      Buffer.from(JSON.stringify(messageData)),
      {
        persistent: true,
        priority: Math.max(0, Math.min(10, priority)),
        timestamp: Date.now(),
        messageId: messageData.id
      }
    );

    console.log(`📋 Added message with priority ${priority}: ${message}`);
    return messageData.id;
  }

  // Process messages in priority order
  processMessages(handler) {
    if (!this.channel) {
      throw new Error('PriorityQueueManager not connected');
    }

    // Set prefetch to 1 to ensure priority order
    this.channel.prefetch(1);

    this.channel.consume(this.queueName, async (msg) => {
      if (msg) {
        try {
          const messageData = JSON.parse(msg.content.toString());
          console.log(`⚡ Processing priority ${messageData.priority}: ${messageData.content}`);

          await handler(messageData);
          this.channel.ack(msg);

        } catch (error) {
          console.error('❌ Processing error:', error);
          this.channel.nack(msg, false, false);
        }
      }
    }, {
      noAck: false
    });

    console.log(`🔄 Processing priority queue messages`);
  }

  close() {
    if (this.channel && this.connection) {
      this.channel.close(() => {
        this.connection.close();
        console.log(`🔌 Priority queue '${this.queueName}' closed`);
      });
    }
  }
}

// 5. Message Scheduler (Delayed Messages)
class MessageScheduler {
  constructor(queueName = 'scheduled_messages') {
    this.queueName = queueName;
    this.exchangeName = 'delayed_exchange';
    this.connection = null;
    this.channel = null;
  }

  async connect() {
    return new Promise((resolve, reject) => {
      amqp.connect(RABBITMQ_URL, (error0, connection) => {
        if (error0) {
          reject(error0);
          return;
        }

        connection.createChannel((error1, channel) => {
          if (error1) {
            reject(error1);
            return;
          }

          this.connection = connection;
          this.channel = channel;

          // Declare delayed message exchange (requires rabbitmq_delayed_message_exchange plugin)
          channel.assertExchange(this.exchangeName, 'x-delayed-message', {
            durable: true,
            arguments: {
              'x-delayed-type': 'direct'
            }
          });

          // Declare queue for scheduled messages
          channel.assertQueue(this.queueName, {
            durable: true
          });

          // Bind queue to exchange
          channel.bindQueue(this.queueName, this.exchangeName, this.queueName);

          console.log(`✅ Message scheduler ready`);
          resolve();
        });
      });
    });
  }

  // Schedule message for later delivery
  schedule(message, delayMs, routingKey = null) {
    if (!this.channel) {
      throw new Error('MessageScheduler not connected');
    }

    const messageData = {
      id: uuidv4(),
      content: message,
      scheduledFor: new Date(Date.now() + delayMs).toISOString(),
      delay: delayMs,
      timestamp: new Date().toISOString()
    };

    this.channel.publish(
      this.exchangeName,
      routingKey || this.queueName,
      Buffer.from(JSON.stringify(messageData)),
      {
        persistent: true,
        timestamp: Date.now(),
        messageId: messageData.id,
        headers: {
          'x-delay': delayMs
        }
      }
    );

    console.log(`⏰ Scheduled message for ${delayMs}ms: ${message}`);
    return messageData.id;
  }

  // Process scheduled messages
  processMessages(handler) {
    if (!this.channel) {
      throw new Error('MessageScheduler not connected');
    }

    this.channel.consume(this.queueName, async (msg) => {
      if (msg) {
        try {
          const messageData = JSON.parse(msg.content.toString());
          console.log(`✅ Scheduled message delivered: ${messageData.content}`);

          await handler(messageData);
          this.channel.ack(msg);

        } catch (error) {
          console.error('❌ Processing error:', error);
          this.channel.nack(msg, false, false);
        }
      }
    }, {
      noAck: false
    });

    console.log(`🔄 Processing scheduled messages`);
  }

  close() {
    if (this.channel && this.connection) {
      this.channel.close(() => {
        this.connection.close();
        console.log('🔌 Message scheduler closed');
      });
    }
  }
}

// 6. Enterprise Integration Demo
async function enterpriseDemo() {
  console.log('=== Enterprise AMQP Patterns Demo ===\n');

  try {
    // 1. Work Queue Demo
    console.log('1. Work Queue Pattern');
    const workQueue = new WorkQueue('enterprise_tasks', { prefetch: 2 });
    await workQueue.connect();

    // Register workers
    await workQueue.registerWorker('worker_1', async (task) => {
      await new Promise(resolve => setTimeout(resolve, 1000 + Math.random() * 1000));
    });

    await workQueue.registerWorker('worker_2', async (task) => {
      await new Promise(resolve => setTimeout(resolve, 500 + Math.random() * 1000));
    });

    // Add tasks with different priorities
    for (let i = 0; i < 10; i++) {
      const priority = Math.random() > 0.7 ? 10 : Math.floor(Math.random() * 5) + 1;
      workQueue.addTask(`Task ${i}`, priority);
    }

    // 2. Topic Exchange Demo
    console.log('\n2. Topic Exchange Pattern');
    const publisher = new TopicPublisher('enterprise_events');
    await publisher.connect();

    const subscriber1 = new TopicSubscriber('enterprise_events', 'sub_1');
    await subscriber1.connect();
    subscriber1.bind('order.*');
    subscriber1.bind('payment.*');

    const subscriber2 = new TopicSubscriber('enterprise_events', 'sub_2');
    await subscriber2.connect();
    subscriber2.bind('order.created');
    subscriber2.bind('inventory.*');

    // Publish events
    publisher.publish('order.created', 'New order #12345');
    publisher.publish('payment.processed', 'Payment received for order #12345');
    publisher.publish('inventory.updated', 'Stock reduced for order #12345');
    publisher.publish('shipment.dispatched', 'Order #12345 shipped');

    // 3. Priority Queue Demo
    console.log('\n3. Priority Queue Pattern');
    const priorityQueue = new PriorityQueueManager('urgent_tasks');
    await priorityQueue.connect();

    priorityQueue.processMessages(async (msg) => {
      console.log(`⚡ Processing urgent task: ${msg.content}`);
      await new Promise(resolve => setTimeout(resolve, 500));
    });

    // Add messages with different priorities
    priorityQueue.addMessage('Low priority task', 2);
    priorityQueue.addMessage('High priority task', 9);
    priorityQueue.addMessage('Medium priority task', 5);
    priorityQueue.addMessage('Critical task', 10);

    // 4. Message Scheduler Demo
    console.log('\n4. Message Scheduling');
    const scheduler = new MessageScheduler('scheduled_tasks');
    await scheduler.connect();

    scheduler.processMessages(async (msg) => {
      console.log(`⏰ Scheduled task executed: ${msg.content}`);
    });

    // Schedule messages for different times
    scheduler.schedule('Task in 2 seconds', 2000);
    scheduler.schedule('Task in 5 seconds', 5000);
    scheduler.schedule('Task in 10 seconds', 10000);

    // Show statistics after some time
    setTimeout(() => {
      console.log('\n📊 Work Queue Stats:');
      console.table(workQueue.getStats());

      console.log('\n📊 Topic Publisher Stats:');
      console.log(publisher.getRoutingStats());

      console.log('\n📊 Subscriber 1 Stats:');
      console.log(subscriber1.getMessageStats());

      console.log('\n📊 Subscriber 2 Stats:');
      console.log(subscriber2.getMessageStats());
    }, 3000);

    // Clean up
    setTimeout(() => {
      console.log('\n🔌 Closing connections...');
      workQueue.close();
      publisher.close();
      subscriber1.close();
      subscriber2.close();
      priorityQueue.close();
      scheduler.close();
    }, 15000);

  } catch (error) {
    console.error('❌ Demo error:', error);
  }
}

// Run demo
if (require.main === module) {
  enterpriseDemo();
}

module.exports = {
  WorkQueue,
  TopicPublisher,
  TopicSubscriber,
  PriorityQueueManager,
  MessageScheduler,
  enterpriseDemo
};

💻 Реализация AMQP на Python python

🟡 intermediate ⭐⭐⭐⭐

Реализация AMQP на Python с использованием библиотеки pika с корпоративными паттернами и обработкой ошибок

⏱️ 40 min 🏷️ amqp, python, rabbitmq, enterprise, rpc
Prerequisites: Python, AMQP concepts, Threading, JSON
# AMQP Python Examples
# pip install pika

import pika
import json
import time
import threading
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass
from enum import Enum
import uuid
import queue
import concurrent.futures

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configuration
RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USER = 'guest'
RABBITMQ_PASS = 'guest'
RABBITMQ_VHOST = '/'

class ExchangeType(Enum):
    DIRECT = 'direct'
    TOPIC = 'topic'
    FANOUT = 'fanout'
    HEADERS = 'headers'

@dataclass
class AMQPMessage:
    id: str
    content: Any
    timestamp: str
    routing_key: str
    headers: Dict[str, Any] = None
    priority: int = 5
    expiration: Optional[int] = None
    correlation_id: Optional[str] = None
    reply_to: Optional[str] = None

class AMQPConnection:
    """AMQP connection manager with automatic reconnection"""

    def __init__(self, host=RABBITMQ_HOST, port=RABBITMQ_PORT,
                 username=RABBITMQ_USER, password=RABBITMQ_PASS,
                 virtual_host=RABBITMQ_VHOST):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.virtual_host = virtual_host
        self.connection = None
        self.channel = None
        self.is_connected = False
        self.reconnect_delay = 5
        self.max_reconnect_attempts = 10
        self.reconnect_attempts = 0
        self.connection_callbacks = []

    def connect(self) -> bool:
        """Establish connection to RabbitMQ"""
        try:
            credentials = pika.PlainCredentials(self.username, self.password)
            parameters = pika.ConnectionParameters(
                host=self.host,
                port=self.port,
                virtual_host=self.virtual_host,
                credentials=credentials,
                heartbeat=600,
                blocked_connection_timeout=300,
                connection_attempts=3,
                retry_delay=5
            )

            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()
            self.is_connected = True
            self.reconnect_attempts = 0

            logger.info(f"Connected to RabbitMQ at {self.host}:{self.port}")

            # Notify callbacks
            for callback in self.connection_callbacks:
                callback(True)

            return True

        except Exception as e:
            logger.error(f"Failed to connect to RabbitMQ: {e}")
            self.is_connected = False
            return False

    def disconnect(self):
        """Close connection"""
        if self.connection and not self.connection.is_closed:
            self.connection.close()
            self.is_connected = False
            logger.info("Disconnected from RabbitMQ")

    def ensure_connection(self) -> bool:
        """Ensure connection is active, reconnect if necessary"""
        if not self.is_connected or (self.connection and self.connection.is_closed):
            logger.info("Connection lost, attempting to reconnect...")

            while self.reconnect_attempts < self.max_reconnect_attempts:
                self.reconnect_attempts += 1
                logger.info(f"Reconnection attempt {self.reconnect_attempts}")

                if self.connect():
                    return True

                time.sleep(self.reconnect_delay)

            logger.error(f"Failed to reconnect after {self.max_reconnect_attempts} attempts")
            return False

        return True

    def add_connection_callback(self, callback: Callable[[bool], None]):
        """Add callback for connection state changes"""
        self.connection_callbacks.append(callback)

class AMQPProducer:
    """AMQP message producer with advanced features"""

    def __init__(self, connection: AMQPConnection, exchange_name: str,
                 exchange_type: ExchangeType = ExchangeType.DIRECT):
        self.connection = connection
        self.exchange_name = exchange_name
        self.exchange_type = exchange_type.value
        self.confirms_enabled = True
        self.publish_timeout = 30
        self.message_count = 0
        self.failed_messages = 0

    def setup_exchange(self, durable: bool = True) -> bool:
        """Declare the exchange"""
        if not self.connection.ensure_connection():
            return False

        try:
            self.connection.channel.exchange_declare(
                exchange=self.exchange_name,
                exchange_type=self.exchange_type,
                durable=durable,
                arguments={
                    'alternate-exchange': f'{self.exchange_name}_alt'
                }
            )
            logger.info(f"Exchange '{self.exchange_name}' declared")
            return True
        except Exception as e:
            logger.error(f"Failed to declare exchange: {e}")
            return False

    def enable_confirms(self) -> bool:
        """Enable publisher confirms"""
        try:
            self.connection.channel.confirm_delivery()
            self.confirms_enabled = True
            logger.info("Publisher confirms enabled")
            return True
        except Exception as e:
            logger.error(f"Failed to enable confirms: {e}")
            return False

    def publish(self, routing_key: str, message: Any,
                headers: Dict[str, Any] = None,
                priority: int = 5,
                expiration: int = None,
                correlation_id: str = None,
                reply_to: str = None) -> bool:
        """Publish a message"""
        if not self.connection.ensure_connection():
            return False

        try:
            # Create AMQP message
            amqp_message = AMQPMessage(
                id=str(uuid.uuid4()),
                content=message,
                timestamp=datetime.now().isoformat(),
                routing_key=routing_key,
                headers=headers or {},
                priority=priority,
                expiration=expiration,
                correlation_id=correlation_id,
                reply_to=reply_to
            )

            # Prepare message properties
            properties = pika.BasicProperties(
                message_id=amqp_message.id,
                timestamp=int(time.time()),
                content_type='application/json',
                delivery_mode=2,  # Persistent
                priority=priority,
                headers=amqp_message.headers,
                correlation_id=correlation_id,
                reply_to=reply_to
            )

            # Add expiration if specified
            if expiration:
                properties.expiration = str(expiration)

            # Serialize message
            body = json.dumps({
                'id': amqp_message.id,
                'content': message,
                'timestamp': amqp_message.timestamp,
                'routing_key': routing_key
            }, default=str)

            # Publish message
            self.connection.channel.basic_publish(
                exchange=self.exchange_name,
                routing_key=routing_key,
                body=body,
                properties=properties,
                mandatory=True  # Return message if unroutable
            )

            # Wait for confirmation if confirms are enabled
            if self.confirms_enabled:
                self.connection.channel.wait_for_publish_confirms()

            self.message_count += 1
            logger.info(f"Published message {amqp_message.id} to '{routing_key}'")
            return True

        except Exception as e:
            self.failed_messages += 1
            logger.error(f"Failed to publish message: {e}")
            return False

    def publish_batch(self, messages: List[Dict]) -> Dict[str, int]:
        """Publish multiple messages efficiently"""
        success_count = 0
        failure_count = 0

        for msg_info in messages:
            if self.publish(**msg_info):
                success_count += 1
            else:
                failure_count += 1

        return {'success': success_count, 'failure': failure_count}

    def get_stats(self) -> Dict[str, Any]:
        """Get publisher statistics"""
        return {
            'messages_sent': self.message_count,
            'messages_failed': self.failed_messages,
            'success_rate': (self.message_count / max(1, self.message_count + self.failed_messages)) * 100,
            'exchange': self.exchange_name,
            'exchange_type': self.exchange_type,
            'confirms_enabled': self.confirms_enabled
        }

class AMQPConsumer:
    """AMQP message consumer with advanced features"""

    def __init__(self, connection: AMQPConnection, queue_name: str):
        self.connection = connection
        self.queue_name = queue_name
        self.consumer_tag = None
        self.message_handler = None
        self.processed_count = 0
        self.failed_count = 0
        self.is_consuming = False
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
        self.auto_ack = False

    def setup_queue(self, durable: bool = True,
                    max_length: int = None,
                    message_ttl: int = None,
                    dead_letter_exchange: str = None) -> bool:
        """Declare the queue with enterprise features"""
        if not self.connection.ensure_connection():
            return False

        try:
            arguments = {}

            if max_length:
                arguments['x-max-length'] = max_length

            if message_ttl:
                arguments['x-message-ttl'] = message_ttl

            if dead_letter_exchange:
                arguments['x-dead-letter-exchange'] = dead_letter_exchange
                arguments['x-dead-letter-routing-key'] = self.queue_name

            self.connection.channel.queue_declare(
                queue=self.queue_name,
                durable=durable,
                arguments=arguments
            )
            logger.info(f"Queue '{self.queue_name}' declared")
            return True
        except Exception as e:
            logger.error(f"Failed to declare queue: {e}")
            return False

    def bind_to_exchange(self, exchange_name: str, routing_key: str = None) -> bool:
        """Bind queue to exchange"""
        if not self.connection.ensure_connection():
            return False

        try:
            self.connection.channel.queue_bind(
                queue=self.queue_name,
                exchange=exchange_name,
                routing_key=routing_key or self.queue_name
            )
            logger.info(f"Bound queue '{self.queue_name}' to exchange '{exchange_name}'")
            return True
        except Exception as e:
            logger.error(f"Failed to bind queue: {e}")
            return False

    def set_prefetch_count(self, count: int) -> bool:
        """Set QoS prefetch count"""
        try:
            self.connection.channel.basic_qos(prefetch_count=count)
            logger.info(f"Set prefetch count to {count}")
            return True
        except Exception as e:
            logger.error(f"Failed to set prefetch count: {e}")
            return False

    def set_message_handler(self, handler: Callable[[AMQPMessage], None]):
        """Set the message handler function"""
        self.message_handler = handler

    def _process_message(self, ch, method, properties, body):
        """Process a single message"""
        try:
            # Parse message
            data = json.loads(body.decode('utf-8'))
            amqp_message = AMQPMessage(
                id=data.get('id', str(uuid.uuid4())),
                content=data.get('content'),
                timestamp=data.get('timestamp', datetime.now().isoformat()),
                routing_key=method.routing_key,
                headers=properties.headers or {},
                priority=getattr(properties, 'priority', 5),
                correlation_id=properties.correlation_id,
                reply_to=properties.reply_to
            )

            logger.info(f"Processing message {amqp_message.id} from '{method.routing_key}'")

            # Handle message in thread pool
            if self.message_handler:
                future = self.executor.submit(self.message_handler, amqp_message)

                # Wait for completion with timeout
                try:
                    future.result(timeout=30)
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                    self.processed_count += 1
                except concurrent.futures.TimeoutError:
                    logger.error(f"Message processing timeout for {amqp_message.id}")
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                    self.failed_count += 1
                except Exception as e:
                    logger.error(f"Message processing error for {amqp_message.id}: {e}")
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                    self.failed_count += 1
            else:
                logger.warning("No message handler set, acknowledging message")
                ch.basic_ack(delivery_tag=method.delivery_tag)

        except Exception as e:
            logger.error(f"Error processing message: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
            self.failed_count += 1

    def start_consuming(self, auto_ack: bool = False) -> bool:
        """Start consuming messages"""
        if not self.connection.ensure_connection():
            return False

        if not self.message_handler:
            logger.warning("No message handler set")

        try:
            self.auto_ack = auto_ack
            self.consumer_tag = self.connection.channel.basic_consume(
                queue=self.queue_name,
                on_message_callback=self._process_message if not auto_ack else None,
                auto_ack=auto_ack
            )
            self.is_consuming = True
            logger.info(f"Started consuming from queue '{self.queue_name}'")
            return True
        except Exception as e:
            logger.error(f"Failed to start consuming: {e}")
            return False

    def stop_consuming(self):
        """Stop consuming messages"""
        if self.consumer_tag and self.connection.channel:
            try:
                self.connection.channel.basic_cancel(self.consumer_tag)
                self.is_consuming = False
                logger.info("Stopped consuming messages")
            except Exception as e:
                logger.error(f"Failed to stop consuming: {e}")

    def get_stats(self) -> Dict[str, Any]:
        """Get consumer statistics"""
        return {
            'queue': self.queue_name,
            'messages_processed': self.processed_count,
            'messages_failed': self.failed_count,
            'is_consuming': self.is_consuming,
            'auto_ack': self.auto_ack
        }

class AMQPRPCServer:
    """AMQP RPC Server implementation"""

    def __init__(self, connection: AMQPConnection, service_name: str):
        self.connection = connection
        self.service_name = service_name
        self.methods = {}
        self.consumer = AMQPConsumer(connection, service_name)

    def register_method(self, method_name: str, handler: Callable):
        """Register an RPC method"""
        self.methods[method_name] = handler
        logger.info(f"Registered RPC method: {method_name}")

    def _handle_request(self, message: AMQPMessage):
        """Handle RPC request"""
        try:
            request_data = json.loads(message.content)
            method_name = request_data.get('method')
            params = request_data.get('params', [])
            request_id = request_data.get('request_id')

            if method_name not in self.methods:
                response = {
                    'request_id': request_id,
                    'error': f'Unknown method: {method_name}',
                    'timestamp': datetime.now().isoformat()
                }
            else:
                try:
                    result = self.methods[method_name](*params)
                    response = {
                        'request_id': request_id,
                        'result': result,
                        'timestamp': datetime.now().isoformat()
                    }
                except Exception as e:
                    response = {
                        'request_id': request_id,
                        'error': str(e),
                        'timestamp': datetime.now().isoformat()
                    }

            # Send response
            if message.reply_to:
                self._send_response(message.reply_to, response, message.correlation_id)

        except Exception as e:
            logger.error(f"Error handling RPC request: {e}")

    def _send_response(self, reply_to: str, response: Dict, correlation_id: str):
        """Send RPC response"""
        try:
            properties = pika.BasicProperties(
                correlation_id=correlation_id,
                content_type='application/json'
            )

            self.connection.channel.basic_publish(
                exchange='',
                routing_key=reply_to,
                body=json.dumps(response, default=str),
                properties=properties
            )
        except Exception as e:
            logger.error(f"Failed to send RPC response: {e}")

    def start(self) -> bool:
        """Start RPC server"""
        if not self.consumer.setup_queue(durable=False):
            return False

        self.consumer.set_message_handler(self._handle_request)
        return self.consumer.start_consuming()

    def stop(self):
        """Stop RPC server"""
        self.consumer.stop_consuming()

class AMQPRPCClient:
    """AMQP RPC Client implementation"""

    def __init__(self, connection: AMQPConnection):
        self.connection = connection
        self.callback_queue = None
        self.response_queue = queue.Queue()
        self.correlation_ids = set()

    def setup(self) -> bool:
        """Setup RPC client"""
        if not self.connection.ensure_connection():
            return False

        try:
            # Declare callback queue
            result = self.connection.channel.queue_declare(queue='', exclusive=True)
            self.callback_queue = result.method.queue

            # Start consuming responses
            self.connection.channel.basic_consume(
                queue=self.callback_queue,
                on_message_callback=self._on_response,
                auto_ack=True
            )
            return True
        except Exception as e:
            logger.error(f"Failed to setup RPC client: {e}")
            return False

    def _on_response(self, ch, method, properties, body):
        """Handle RPC response"""
        if properties.correlation_id in self.correlation_ids:
            self.correlation_ids.remove(properties.correlation_id)
            response = json.loads(body.decode('utf-8'))
            self.response_queue.put(response)

    def call(self, service_name: str, method: str, params: List = None,
             timeout: float = 30.0) -> Any:
        """Make RPC call"""
        if not self.setup():
            raise Exception("Failed to setup RPC client")

        correlation_id = str(uuid.uuid4())
        self.correlation_ids.add(correlation_id)

        request = {
            'method': method,
            'params': params or [],
            'request_id': correlation_id,
            'timestamp': datetime.now().isoformat()
        }

        try:
            # Send request
            self.connection.channel.basic_publish(
                exchange='',
                routing_key=service_name,
                properties=pika.BasicProperties(
                    reply_to=self.callback_queue,
                    correlation_id=correlation_id,
                    content_type='application/json'
                ),
                body=json.dumps(request, default=str)
            )

            # Wait for response
            start_time = time.time()
            while time.time() - start_time < timeout:
                try:
                    response = self.response_queue.get(timeout=1)
                    if 'error' in response:
                        raise Exception(response['error'])
                    return response['result']
                except queue.Empty:
                    continue

            raise TimeoutError(f"RPC call timed out after {timeout} seconds")

        finally:
            if correlation_id in self.correlation_ids:
                self.correlation_ids.remove(correlation_id)

# Example usage
def example_work_queue():
    """Example work queue implementation"""
    print("=== Work Queue Example ===")

    # Create connection
    conn = AMQPConnection()
    if not conn.connect():
        return

    # Create producer
    producer = AMQPProducer(conn, 'work_exchange', ExchangeType.DIRECT)
    producer.setup_exchange()

    # Create consumer
    consumer = AMQPConsumer(conn, 'work_queue')
    consumer.setup_queue()
    consumer.bind_to_exchange('work_exchange', 'work_tasks')
    consumer.set_prefetch_count(5)

    def handle_task(message: AMQPMessage):
        print(f"Processing task: {message.content}")
        time.sleep(1)  # Simulate work

    consumer.set_message_handler(handle_task)
    consumer.start_consuming()

    # Send tasks
    for i in range(10):
        producer.publish('work_tasks', f'Task {i}')

    time.sleep(5)
    consumer.stop_consuming()
    conn.disconnect()

def example_pub_sub():
    """Example publish/subscribe pattern"""
    print("=== Pub/Sub Example ===")

    conn = AMQPConnection()
    if not conn.connect():
        return

    # Create publisher
    publisher = AMQPProducer(conn, 'events', ExchangeType.TOPIC)
    publisher.setup_exchange()

    # Create subscribers
    def create_subscriber(name: str, bindings: List[str]):
        consumer = AMQPConsumer(conn, f'events_sub_{name}')
        consumer.setup_queue(durable=False)

        for binding in bindings:
            consumer.bind_to_exchange('events', binding)

        def handle_event(message: AMQPMessage):
            print(f"[{name}] {message.routing_key}: {message.content}")

        consumer.set_message_handler(handle_event)
        consumer.start_consuming()
        return consumer

    # Create multiple subscribers
    sub1 = create_subscriber('payment', ['payment.*'])
    sub2 = create_subscriber('order', ['order.*'])
    sub3 = create_subscriber('all', ['*.*'])

    # Publish events
    events = [
        ('order.created', 'Order #123 created'),
        ('payment.processed', 'Payment for order #123 processed'),
        ('order.shipped', 'Order #123 shipped'),
        ('payment.refunded', 'Refund for order #124 processed')
    ]

    for routing_key, content in events:
        publisher.publish(routing_key, content)
        time.sleep(0.5)

    time.sleep(2)

    # Cleanup
    for sub in [sub1, sub2, sub3]:
        sub.stop_consuming()
    conn.disconnect()

def example_rpc():
    """Example RPC implementation"""
    print("=== RPC Example ===")

    conn = AMQPConnection()
    if not conn.connect():
        return

    # Create RPC server
    rpc_server = AMQPRPCServer(conn, 'math_service')

    # Register methods
    def add(a, b):
        return a + b

    def multiply(a, b):
        return a * b

    def factorial(n):
        if n <= 1:
            return 1
        return n * factorial(n - 1)

    rpc_server.register_method('add', add)
    rpc_server.register_method('multiply', multiply)
    rpc_server.register_method('factorial', factorial)

    # Start server in separate thread
    server_thread = threading.Thread(target=rpc_server.start)
    server_thread.daemon = True
    server_thread.start()

    # Create RPC client
    rpc_client = AMQPRPCClient(conn)

    # Make RPC calls
    try:
        result1 = rpc_client.call('math_service', 'add', [5, 3])
        print(f"5 + 3 = {result1}")

        result2 = rpc_client.call('math_service', 'multiply', [4, 7])
        print(f"4 * 7 = {result2}")

        result3 = rpc_client.call('math_service', 'factorial', [5])
        print(f"5! = {result3}")

    except Exception as e:
        print(f"RPC error: {e}")

    time.sleep(1)
    rpc_server.stop()
    conn.disconnect()

if __name__ == "__main__":
    example_work_queue()
    print()
    example_pub_sub()
    print()
    example_rpc()