RabbitMQ Advanced Samples

Advanced RabbitMQ configuration examples including clustering, high availability, performance tuning, and enterprise patterns

⚙️ RabbitMQ Cluster Configuration

🔴 complex ⭐⭐⭐⭐

Complete RabbitMQ cluster setup with high availability, failover, and monitoring for enterprise environments

⏱️ 60 min 🏷️ rabbitmq, message-queue, enterprise, clustering
Prerequisites: Docker, PostgreSQL, Container orchestration, Messaging concepts
# RabbitMQ Advanced Cluster Configuration
# Enterprise-grade setup with clustering and high availability

# 1. Docker Compose for Cluster Setup
# docker-compose.yml
version: '3.8'

services:
  rabbitmq1:
    image: rabbitmq:3.12-management
    hostname: rabbitmq1
    container_name: rabbitmq1
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
      RABBITMQ_USE_LONGNAME: "true"
      RABBITMQ_NODENAME: rabbit@rabbitmq1
      RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq2,rabbit@rabbitmq3
      RABBITMQ_RAM_NODE: "false"
      RABBITMQ_DISK_NODE: "true"
      RABBITMQ_DISK_FREE_LIMIT: "1.0"
      RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
      RABBITMQ_VM_MEMORY_LIMIT: "2000"
      RABBITMQ_FLOW_ACTIVE_TIMEOUT: "60000000"
      RABBITMQ_FLOW_PASSIVE_TIMEOUT: "90000000"
      RABBITMQ_HEARTBEAT_TIMEOUT: "60"
      RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbit disk_free_limit.abs.disk_limit 1024MB -rabbit vm_memory_high_watermark 1000MB"
    ports:
      - "5672:5672"     # AMQP port
      - "15672:15672"   # Management UI
      - "25672:25672"   # STOMP
      - "61613:61613"   # TLS AMQP
    volumes:
      - rabbitmq1_data:/var/lib/rabbitmq
      - ./rabbitmq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./rabbitmq1/definitions.json:/etc/rabbitmq/definitions.json
    networks:
      - rabbitmq-cluster
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s
    restart: unless-stopped

  rabbitmq2:
    image: rabbitmq:3.12-management
    hostname: rabbitmq2
    container_name: rabbitmq2
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
      RABBITMQ_USE_LONGNAME: "true"
      RABBITMQ_NODENAME: rabbit@rabbitmq2
      RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq1,rabbit@rabbitmq3
      RABBITMQ_RAM_NODE: "false"
      RABBITMQ_DISK_NODE: "true"
      RABBITMQ_DISK_FREE_LIMIT: "1.0"
      RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
      RABBITMQ_VM_MEMORY_LIMIT: "2000"
    ports:
      - "5673:5672"
      - "15673:15672"
      - "25673:25672"
      - "61614:613"
    volumes:
      - rabbitmq2_data:/var/lib/rabbitmq
      - ./rabbitmq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./rabbitmq2/definitions.json:/etc/rabbitmq/definitions.json
    networks:
      - rabbitmq-cluster
    depends_on:
      - rabbitmq1
    restart: unless-stopped

  rabbitmq3:
    image: rabbitmq:3.12-management
    hostname: rabbitmq3
    container_name: rabbitmq3
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
      RABBITMQ_USE_LONGNAME: "true"
      RABBITMQ_NODENAME: rabbit@rabbitmq3
      RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq1,rabbit@rabbitmq2
      RABBITMQ_RAM_NODE: "false"
      RABBITMQ_DISK_NODE: "true"
      RABBITMQ_DISK_FREE_LIMIT: "1.0"
      RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
      RABBITMQ_VM_MEMORY_LIMIT: "2000"
    ports:
      - "5674:5672"
      - "15674:15672"
      - "25674:25672"
      - "61615:613"
    volumes:
      - rabbitmq3_data:/var/lib/rabbitmq
      - ./rabbitmq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./rabbitmq3/definitions.json:/etc/rabbitmq/definitions.json
    networks:
      - rabbitmq-cluster
    depends_on:
      - rabbitmq1
    restart: unless-stopped

  # HAProxy for Load Balancing
  haproxy:
    image: haproxy:2.8
    container_name: haproxy
    ports:
      - "5670:5670"   # Main AMQP port
      - "15670:15672" # Management UI
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
    depends_on:
      - rabbitmq1
      - rabbitmq2
      - rabbitmq3
    networks:
      - rabbitmq-cluster
    restart: unless-stopped

  # Prometheus for Monitoring
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    networks:
      - rabbitmq-cluster
    restart: unless-stopped

  # Grafana for Visualization
  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana/datasources:/etc/grafana/provisioning/datasources
    networks:
      - rabbitmq-cluster
    depends_on:
      - prometheus
    restart: unless-stopped

  # RabbitMQ Exporter
  rabbitmq-exporter:
    image: kbudry/rabbitmq-exporter:latest
    container_name: rabbitmq-exporter
    ports:
      - "9419:9419"
    environment:
      RABBIT_URL: http://haproxy:15672
      PUBLISH_PORT: 9419
      RABBIT_USER: admin
      RABBIT_PASS: ${RABBITMQ_PASSWORD}
      EXCLUDE_QUEUES: "^amq\.default"
    networks:
      - rabbitmq-cluster
    depends_on:
      - haproxy
    restart: unless-stopped

volumes:
  rabbitmq1_data:
  rabbitmq2_data:
  rabbitmq3_data:
  grafana_data:

networks:
  rabbitmq-cluster:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

# 2. RabbitMQ Configuration for Cluster
# rabbitmq1/rabbitmq.conf
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq2
cluster_cluster_name = rabbitmq-cluster
cluster_formation.node_cleanup_interval = 30
cluster_formation.node_cleanup_only = false
cluster_formation.peer_discovery_interval = 5

# Disk and Memory Configuration
disk_free_limit.absolute = 2GB
disk_free_limit.percentage = 90
vm_memory_high_watermark.relative = 80
vm_memory_limit.absolute = 2GB

# Performance Tuning
heartbeat_timeout = 60
consumer_timeout = 3600000
consumer_prefetch_count = 10
message_ttl = 86400000

# Security
loopback_users.guest = false
listeners.tcp.default = 5672
listeners.ssl.default = 5671

# Management Plugin
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
management.tcp.disable = false
management.ssl.port = 15671
management.ssl.ip = 0.0.0.0
management.ssl.disable = false

# 3. HAProxy Configuration for Load Balancing
# haproxy.cfg
global
  daemon
  maxconn 4096
  log /dev/log local0
  log /dev/log local1 notice
  defaults
    mode tcp
    timeout connect 5000
    timeout client 50000
    timeout server 50000
  frontend ft_rabbitmq_main
    bind *:5670
    default_backend backend_rabbitmq_servers
    maxconn 4096
    timeout client 50000
    timeout server 50000
    option tcplog
    option logasap
  frontend ft_rabbitmq_management
    bind *:15670
    default_backend backend_rabbitmq_management
    timeout client 50000
    timeout server 50000
    option tcplog
    option logasap
  backend backend_rabbitmq_servers
    balance roundrobin
    mode tcp
    option tcp-check
    server rabbitmq1 5672 check inter 5s rise 2 fall 3
    server rabbitmq2 5673 check inter 5s rise 2 fall 3
    server rabbitmq3 5674 check inter 5s rise 2 fall 3
    maxconn 2000
    timeout server 50000
  backend backend_rabbitmq_management
    balance roundrobin
    mode http
    option httpchk GET /api/healthcheck
    option httplog
    option logasap
    server rabbitmq1 15672 check inter 5s rise 2 fall 3
    maxconn 1000
    timeout server 50000

# 4. Prometheus Configuration for RabbitMQ
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets:
        - 'rabbitmq-exporter:9419'
    metrics_path: /metrics
    scrape_interval: 5s
    scrape_timeout: 10s

# 5. RabbitMQ Definitions and Policies
# rabbitmq1/definitions.json
{
  "users": [
    {
      "name": "admin",
      "password": "${RABBITMQ_PASSWORD}",
      "tags": ["administrator"],
      "hashing_algorithm": "SHA-256",
      "permissions": {
        "configure": ".*",
        "write": ".*",
        "read": ".*"
      }
    },
    {
      "name": "service_user",
      "password": "service_password",
      "tags": ["service"],
      "hashing_algorithm": "SHA-256",
      "permissions": {
        "configure": ".*",
        "write": "service.*",
        "read": "service.*"
      }
    },
    {
      "name": "monitoring_user",
      "password": "monitoring_password",
      "tags": ["monitoring"],
      "hashing_algorithm": "backup",
      "permissions": {
        "configure": "metrics.*",
        "read": "metrics.*"
      }
    }
  ],
  "vhosts": [
    {
      "name": "/",
      "description": "Default virtual host"
    },
    {
      "name": "production",
      "description": "Production environment"
    },
    {
      "name": "staging",
      "description": "Staging environment"
    },
    {
      "name": "development",
      "description": "Development environment"
    }
  ],
  "permissions": [
    {
      "user": "service_user",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "monitoring_user",
      "vhost": "/",
      "configure": "metrics.*",
      "read": ".*"
    }
  ],
  "queues": [
    {
      "name": "user_events_queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-max-length": 10000,
        "x-overflow": "drop-head"
      }
    },
    {
      "name": "order_processing_queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-max-priority": 10,
        "x-queue-mode": "lazy"
      }
    },
    {
      "name": "notifications_queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-message-ttl": 86400000,
        "x-dead-letter-exchange": "notifications-dlx"
      }
    },
    {
      "name": "priority_queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-max-priority": 10,
        "x-queue-mode": "priority",
        "x-queue-mode-legacy": "true"
      }
    }
  ],
  "exchanges": [
    {
      "name": "user_events_exchange",
      "vhost": "/",
      "type": "topic",
      "durable": true,
      "auto_delete": false
    },
    {
      "name": "orders_exchange",
      "vhost": "/",
      "type": "direct",
      "durable": true,
      "auto_delete": false
    },
    {
      "name": "notifications_exchange",
      "vhost": "/",
      "type": "fanout",
      "durable": true,
      "auto_delete": false
    }
  ],
  "bindings": [
    {
      "source": "user_events_exchange",
      "destination": "user_events_queue",
      "destination_type": "queue",
      "routing_key": "user.*.events",
      "arguments": {}
    },
    {
      "source": "orders_exchange",
      "destination": "order_processing_queue",
      "destination_type": "queue",
      "routing_key": "order.created",
      "arguments": {}
    },
    {
      "source": "notifications_exchange",
      "destination": "notifications_queue",
      "destination_type": "queue",
      "routing_key": "*",
      "arguments": {}
    }
  ]
}

# 6. High Availability Scripts
# setup-cluster.sh
#!/bin/bash

set -e

echo "Setting up RabbitMQ cluster..."

# Wait for RabbitMQ containers to be ready
echo "Waiting for RabbitMQ containers to start..."
until docker exec rabbitmq1 rabbitmq-diagnostics -q ping; do
  echo "Waiting for rabbitmq1..."
  sleep 5
done

until docker exec rabbitmq2 rabbitmq-diagnostics -q ping; do
  echo "Waiting for rabbitmq2..."
  sleep 5
done

until docker exec rabbitmq3 rabbitmq-diagnostics -s ping; do
  echo "Waiting for rabbitmq3..."
  sleep 5
done

echo "All RabbitMQ nodes are ready!"

# Verify cluster status
echo "Verifying cluster status..."
docker exec rabbitmq1 rabbitmqctl cluster_status

# Create federation upstream for external connectivity
echo "Setting up federation upstream..."
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/parameters/federation-upstreams \
  -X PUT -H "content-type: application/json" -d '{
    "value": {
      "uri": "amqps://external-rabbitmq:5672",
      "expires": 3600000,
      "reconnect-delay": 5000
    }
  }'

# Create policies
echo "Creating policies..."
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/policies/hourly-rate-limit \
  -X PUT -H "content-type:application/json" -d '{
    "pattern": "^amq\.default",
    "definition": {
      "type": "classic",
      "max-count": 10000,
      "priority": 100
    }
  }'

curl -u admin:${RABMQ_PASSWORD} http://localhost:15672/api/policies/queue-size-limit \
  -X PUT -H "content-type:application/json" -d '{
    "pattern": "^amq\.default",
    "definition": {
      "max-length": 5000,
      "priority": 50
    }
  }'

# Enable federation
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/parameters/federation-upstreams/external-rabbitmq/set-policy   -X POST -H "content-type: application/json" -d '{
    "value": {
      "apply-to": "all"
    }
  }'

echo "RabbitMQ cluster setup completed!"

# 7. Performance Monitoring Script
# monitor-cluster.sh
#!/bin/bash

echo "Monitoring RabbitMQ cluster..."

# Get cluster status
echo "=== Cluster Status ==="
curl -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/cluster_status | jq '.'

# Get node details
echo -e "
=== Node Details ==="
for i in {1..3}; do
  echo -e "
=== RabbitMQ$i ==="
  curl -s -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/nodes/rabbit@rabbitmq$i | jq '.memory.used_bytes, memory.limit_bytes, fd_used, fd_total, running_processes'
done

# Get queue statistics
echo -e "
=== Queue Statistics ==="
curl -s -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/queues | jq '.[] | {name, messages, memory, consumers}'

# Get connection statistics
echo -e "
=== Connection Statistics ==="
curl -s -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/connections | jq '.[] | {name, state, channels, peer_addr, client_properties}'

# Get channel statistics
echo -e "
=== Channel Statistics ==="
curl -s -u admin:${RABBITMQ} http://localhost:15672/api/channels | jq '.[] | {name, state, connection_name, messages, consumer_count}' | head -20

# Get exchange statistics
echo -e "
=== Exchange Statistics ==="
curl -s -u admin:${RABBITMQ} http://localhost:15672/api/exchanges | jq '.[] | {name, type, messages, message_stats_in, message_stats_out}' | head -10

# Health check function
check_health() {
  local port=$1
  local host=${2:-localhost}
  local path=${3:-/api/healthcheck}

  echo "Performing health check on $host:$port..."

  local response=$(curl -s -o /dev/null -w "%{http_code}" "http://$host:$port$path")

  if [ "$response" = "200" ]; then
    echo "✓ Health check passed"
    return 0
  else
    echo "✗ Health check failed with status $response"
    return 1
  fi
}

# Health checks
check_health 5672 rabbitmq1
check_health 5672 rabbitmq2
check_health 5672 rabbitmq3
check_health 15672 haproxy

echo "Health checks completed!"

# 8. Environment Variables
# .env
RABBITMQ_PASSWORD=secure_password_here
GRAFANA_PASSWORD=admin_password_here

# Node-specific configurations
RABBITMQ1_NAME=rabbit1
RABBITMQ2_NAME=rabbit2
RABBITMQ3_NAME=rabbit3

# Network configuration
RABBITMQ_CLUSTER_COOKIE=very_secure_cluster_cookie
RABBITMQ_DEFAULT_USER=admin
RABBITMQ_DEFAULT_VHOST=/
RABBITMQ_NODENAME=rabbit@$(hostname)

# Performance tuning
RABBITMQ_VM_MEMORY_HIGH_WATERMARK=1000
RABBITMQ_VM_MEMORY_LIMIT=2000
RABBITMQ_DISK_FREE_LIMIT=1GB
RABBITMQ_FLOW_ACTIVE_TIMEOUT=60000000
RABBITMQ_FLOW_PASSIVE_TIMEOUT=90000000

# Monitoring and Logging
RABBITMQ_LOG_LEVEL=INFO
RABBITMQ_PROMETHEUS_ENABLED=true
RABBITMQ_PROMETHEUS_CONFIG_FILE=/etc/rabbitmq/prometheus.json
RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_prometheus,rabbitmq_shovel

# Security
RABBITMQ_SSL_ENABLED=false
RABBITMQ_SSL_VERIFY=verify_peer
RABBITMQ_SSL_FAIL_IF_NO_PEER_CERT=true
RABBITMQ_SSL_CERT_FILE=/etc/ssl/rabbitmq.crt
RABBITMQ_KEY_FILE=/etc/ssl/rabbitmq.key
RABBITMQ_CA_FILE=/etc/ssl/ca_bundle.crt

# 9. Application Connection Examples

# Node.js Connection with Connection Pool
const amqp = require('amqplib');
const cluster = require('amqplib/lib/cluster');

// Connection pool for high-traffic applications
const connectionPool = {
  hostname: ['localhost:5672', 'localhost:5673', 'localhost:5674'],
  port: 5672,
  username: 'admin',
  channelMax: 10,
  heartbeat: 30,
  connection_timeout: 10000,
  retry: 3
};

async function createRabbitConnection() {
  try {
    const connection = await amqp.connect({
      hostname: connectionPool.hostname[0],
      port: connectionPool.port,
      username: connectionPool.username,
      password: process.env.RABBITMQ_PASSWORD,
      heartbeat: connectionPool.heartbeat,
      connection_timeout: connectionPool.connection_timeout
    });

    console.log('RabbitMQ connection established');
    return connection;
  } catch (error) {
    console.error('Failed to connect to RabbitMQ:', error);
    throw error;
  }
}

// Cluster connection with failover
async function createClusterConnection() {
  return new Promise((resolve, reject) => {
    const clusterConfig = {
      addresses: [
        'amqp://localhost:5672',
        'amqp://localhost:5673',
        'amqp://localhost:5674'
      ],
      username: 'admin',
      password: process.env.RABBITMQ_PASSWORD,
      timeout: 10000,
      heartbeat: 10,
      reconnect: true,
      reconnectBackoffStrategy: 'linear'
    };

    cluster.connect(clusterConfig, (err, connection) => {
      if (err) {
        console.error('Failed to connect to RabbitMQ cluster:', err);
        reject(err);
      } else {
        console.log('RabbitMQ cluster connection established');
        resolve(connection);
      }
    });
  });
}

// Python Connection with Pika
import pika
from pika.connection import ConnectionParameters
from pika.exceptions import AMQPConnectionError

# High availability connection
def create_connection():
    credentials = pika.PlainCredentials(
        'admin',
        os.environ['RABBITMQ_PASSWORD']
    )

    parameters = ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=credentials,
        heartbeat=600,
        blocked_connection_timeout=30,
        connection_attempts=3,
        retry_delay=5
    )

    try:
        connection = pika.BlockingConnection(parameters)
        print("RabbitMQ connection established")
        return connection
    except AMQPConnectionError as e:
        print(f"Failed to connect to RabbitMQ: {e}")
        raise

# Connection with retry and circuit breaker
class RabbitMQConnectionManager:
    def __init__(self, max_retries=3, retry_delay=5):
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.connection = None
        self.circuit_breaker_open = False
        self.circuit_breaker_timeout = 30000  # 30 seconds
        self.circuit_breaker_reset_timeout = 60000  # 1 minute

    def get_connection(self):
        if self.circuit_breaker_open:
            print("Circuit breaker is open, refusing connection attempts")
            return None

        if self.connection and self.connection.is_open:
            return self.connection

        return self._connect_with_retry()

    def _connect_with_retry(self):
        last_exception = None
        for attempt in range(self.max_retries):
            try:
                connection = create_connection()
                self.connection = connection
                self.circuit_breaker_open = False
                return connection
            except Exception as e:
                last_exception = e
                if attempt < self.max_retries - 1:
                    print(f"Connection attempt {attempt + 1} failed, retrying in {self.retry_delay}s...")
                    time.sleep(self.retry_delay)
                continue
                else:
                    self.circuit_breaker_open = True
                    raise last_exception

        # Set circuit breaker to close after timeout
        threading.Timer(self.circuit_breaker_reset_timeout, self._reset_circuit_breaker).start()
        raise last_exception

    def _reset_circuit_breaker(self):
        print("Resetting circuit breaker")
        self.circuit_breaker_open = False

# 10. Production Deployment Configuration
# Kubernetes deployment
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq-cluster
  namespace: production
  labels:
    app: rabbitmq
spec:
  serviceName: rabbitmq
  replicas: 3
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      terminationGracePeriodSeconds: 30
      containers:
      - name: rabbitmq
        image: rabbitmq:3.12-management
        ports:
        - containerPort: 5672
          name: amqp
        - containerPort: 15672
          name: management
        - containerPort: 25672
          name: stomp
        env:
        - name: RABBITMQ_DEFAULT_USER
          value: "admin"
        - name: RABBITMQ_DEFAULT_PASS
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password
        - name: RABBITMQ_ERLANG_COOKIE
          value: "rabbitmq-cluster-cookie"
        - name: RABBITMQ_USE_LONGNAME
          value: "true"
        - name:1:RABBITMQ_NODENAME
          value: "rabbit@$(POD_NAME)-$(POD_HOSTNAME)"
        - name: RABBITMQ_CLUSTER_NODES
          value: "rabbit@$( (($(POD_NAME)-$(POD_HOSTNAME) | tr -d ' '))\,"
        - name: RABBITMQ_DISK_NODE
          value: "true"
        - name: RABBITMQ_RAM_NODE
          value: "false"
        - name: RABBITMQ_DISK_FREE_LIMIT
          value: "1.0"
        - name: RABBITMQ_VM_MEMORY_HIGH_WATERMARK
          value: "1000"
        - name: RABBITMQ_VM_MEMORY_LIMIT
          value: "2000"
        volumeMounts:
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
        - name: rabbitmq-config
          mountPath: /etc/rabbitmq
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - -q
            - ping
          initialDelaySeconds: 60
          periodSeconds: 30
          timeoutSeconds: 10
        readinessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - -q
            - ping
          initialDelaySeconds: 20
          periodSeconds: 10
          timeoutSeconds: 5
      volumes:
      - name: rabbitmq-data
        persistentVolumeClaim:
          claimName: rabbitmq-data
      - name: rabbitmq-config
        configMap:
          name: rabbitmq-config
      - name: rabbitmq-definitions
        configMap:
          name: rabbitmq-definitions

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: rabbitmq-data
  namespace: production
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: fast-ssd

---
apiVersion: v1
kind: Secret
metadata:
  name: rabbitmq-secret
  namespace: production
type: Opaque
data:
  password: <base64-encoded-password>

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
  namespace: production
data:
  rabbitmq.conf: |
    cluster_formation.peer_discovery_backend = classic_config
    cluster_formation.classic_config.nodes.1 = [email protected]
    cluster_formation.classic_config.nodes.2 = [email protected]
    cluster_formation.classic_config.nodes.3 = [email protected]
    cluster_formation.classic_config.nodes.4 = [email protected]
    disk_free_limit.absolute = 2GB
    vm_memory_high_watermark = 1000MB
    vm_memory_limit = 2GB
    heartbeat_timeout = 60
    connection_timeout = 10
    consumer_timeout = 3600000
    consumer_prefetch_count = 10
    message_ttl = 86400000

  definitions.json: |
    {
      "users": [
        {
          "name": "admin",
          "password": "${RABBITMQ_PASSWORD}",
          "tags": ["administrator"],
          "permissions": {
            "configure": ".*",
            "write": ".*",
            "read": ".*"
          }
        }
      ],
      "vhosts": [
        {
          "name": "/",
          "description": "Production virtual host"
        }
      ],
      "queues": [
        {
          "name": "high-priority-queue",
          "durable": true,
          "arguments": {
            "x-max-priority": 10
          }
        }
      ]
    }

💻 RabbitMQ Microservices Patterns typescript

🔴 complex ⭐⭐⭐⭐

RabbitMQ patterns for microservices architecture including saga pattern, event sourcing, and CQRS

⏱️ 55 min 🏷️ rabbitmq, microservices, patterns, architecture
Prerequisites: Node.js, TypeScript, Message Queue, Event-Driven Architecture
// RabbitMQ Microservices Architecture Patterns
// Event-driven architecture with RabbitMQ

// 1. Saga Pattern Implementation
// saga-manager.ts

import { Connection, Channel, ConsumeMessage } from 'amqplib/connection';
import { EventEmitter } from 'events';

interface SagaStep {
  id: string;
  execute: () => Promise<any>;
  compensate?: () => Promise<any>;
  onFail?: (error: Error) => Promise<void>;
  onSuccess?: (result: any) => Promise<void>;
}

interface Saga {
  id: string;
  name: string;
  steps: SagaStep[];
  timeout: number;
}

interface SagaState {
  id: string;
  currentStep: number;
  status: 'pending' | 'running' | 'completed' | 'failed' | 'compensating';
  data: Record<string, any>;
  compensations: Array<() => Promise<void>>;
  error?: Error;
  startTime: Date;
  endTime?: Date;
}

export class SagaManager {
  private connection: Connection;
  private channel: Channel;
  private sagas: Map<string, Saga> = new Map();
  private sagaStates: Map<string, SagaState> = new Map();
  private eventEmitter = new EventEmitter();

  constructor(connection: Connection) {
    this.connection = connection;
  }

  async initialize(): Promise<void> {
    this.channel = await this.connection.createChannel();
    console.log('Saga Manager initialized');
  }

  // Create a new saga
  createSaga(saga: Saga): void {
    this.sagas.set(saga.id, saga);
    console.log(`Created saga: ${saga.name} (${saga.id})`);
  }

  // Execute a saga
  async executeSaga(saga: Saga, data: any = {}): Promise<void> {
    const sagaState: SagaState = {
      id: saga.id,
      currentStep: 0,
      status: 'pending',
      data,
      compensations: [],
      startTime: new Date()
    };

    this.sagaStates.set(saga.id, sagaState);

    try {
      for (let i = 0; i < saga.steps.length; i++) {
      const step = saga.steps[i];
      sagaState.currentStep = i;
      sagaState.status = 'running';

      console.log(`Executing saga ${saga.name}, step ${i + 1}/${saga.steps.length}: ${step.constructor.name}`);

      // Record compensation if available
      if (step.compensate) {
        sagaState.compensations.push(step.compensate);
      }

      // Execute step with timeout
      const result = await this.executeStepWithTimeout(step, saga.timeout);

      // Store step result
      sagaState.data[`step_${i}`] = result;

      // Notify progress
      this.eventEmitter.emit('stepCompleted', {
        sagaId: saga.id,
        sagaName: saga.name,
        step: i + 1,
        totalSteps: saga.steps.length,
        result
      });

      if (step.onSuccess) {
        await step.onSuccess(result);
      }
    }

    sagaState.status = 'completed';
    sagaState.endTime = new Date();

    console.log(`Saga ${saga.name} completed successfully in ${sagaState.endTime.getTime() - sagaState.startTime.getTime()}ms`);
    this.eventEmitter.emit('sagaCompleted', sagaState);

  } catch (error) {
    sagaState.status = 'failed';
    sagaState.error = error;
    sagaState.endTime = new Date();

    console.error(`Saga ${saga.name} failed: ${error.message}`);

    // Execute compensations in reverse order
    console.log(`Executing compensations for saga ${saga.name}...`);
    await this.executeCompensations(sagaState.compensations);

    if (step.onFail) {
      await step.onFail(error);
    }

    sagaState.status = 'compensated';
    this.eventEmitter.emit('sagaCompensated', sagaState);

    throw error;
  }
  }

  private async executeStepWithTimeout(step: SagaStep, timeout: number): Promise<any> {
    return Promise.race([
      step.execute(),
      new Promise((_, reject) => {
        setTimeout(() => reject(new Error(`Saga step timeout after ${timeout}ms`)), timeout);
      })
    ]);
  }

  private async executeCompensations(compensations: Array<() => Promise<void>>): Promise<void> {
    for (let i = compensations.length - 1; i >= 0; i--) {
      try {
        await compensations[i]();
      } catch (error) {
        console.error(`Compensation failed: ${error.message}`);
        // Continue with other compensations even if one fails
      }
    }
  }

  // Event listener methods
  onSagaCompleted(callback: (state: SagaState) => void): void {
    this.eventEmitter.on('sagaCompleted', callback);
  }

  onSagaCompensated(callback: (state: SetState) => void): void {
    this.eventEmitter.on('sagaCompensated', callback);
  }

  onStepCompleted(callback: (data: any) => void => void {
    this.eventEmitter.on('stepCompleted', callback);
  }
}

// 2. Event Sourcing Pattern
// event-store.ts

interface Event {
  id: string;
  type: string;
  data: any;
  metadata: Record<string, any>;
  timestamp: Date;
  aggregateId: string;
  sequence: number;
  version: number;
}

interface AggregateState {
  id: string;
  type: string;
  version: number;
  data: Record<string, any>;
  lastEvent: Date;
}

export class EventStore {
  private connection: Connection;
  private channel: Channel;
  private events: Map<string, Event[]> = new Map();
  private aggregates: Map<string, AggregateState> = new Map();

  constructor(connection: Connection) {
    this.connection = connection;
  }

  async initialize(): Promise<void> {
    this.channel = await this.connection.createChannel();
    await this.channel.assertQueue('events', { durable: true });
    await this.channel.assertQueue('events-replay', { durable: true });
    await this.channel.assertQueue('events-snapshot', { durable: true });

    // Start consuming events
    this.startEventConsumption();
    console.log('Event Store initialized');
  }

  async storeEvent(event: Event): Promise<void> {
      const eventMessage = {
        id: event.id,
        type: event.type,
        data: event.data,
        metadata: event.metadata,
        timestamp: event.timestamp,
        aggregateId: event.aggregateId,
        sequence: event.sequence,
        version: event.version
      };

      await this.channel.sendToQueue('events', Buffer.from(JSON.stringify(eventMessage)));
      console.log(`Event stored: ${event.type} for aggregate ${event.aggregateId}`);
  }

  async getEventsForAggregate(aggregateId: string): Promise<Event[]> {
      const events = this.events.get(aggregateId) || [];
      if (events.length > 0) {
        return events;
      }

      // Replay events from queue
      try {
        const q = await this.channel.getQueue('events-replay', { durable: true });

        let hasMore = true;
        while (hasMore) {
          const msg = await q.get({ noAck: false });
          if (msg === false) break;

          const event = JSON.parse(msg.content.toString());
          if (event.aggregateId === aggregateId) {
            events.push(event);
            events.push(event);
          }

          hasMore = false;
          // In a real implementation, you'd track sequence numbers properly
        }

        this.events.set(aggregateId, events);
        return events;
      } catch (error) {
        console.error(`Failed to replay events for aggregate ${aggregateId}: ${error}`);
        return [];
      }
  }

  async getAggregateState(aggregateId: string): Promise<AggregateState | null> {
      const events = await this.getEventsForAggregate(aggregateId);
      if (events.length === 0) {
        return null;
      }

      // Apply events in order
      let state: AggregateState = {
        id: aggregateId,
        type: events[0].type.split('.')[1],
        version: events[0].version,
        data: events[0].data,
        lastEvent: events[events.length - 1].timestamp
      };

      for (let i = 1; i < events.length; i++) {
        const event = events[i];
        state = this.applyEventToState(state, event);
      }

      return state;
  }

  private applyEventToState(state: AggregateState, event: Event): AggregateState {
      const [entityType, action] = event.type.split('.');

      switch (action) {
        case 'Created':
          state.data = event.data;
          state.version = event.version;
          break;
        'Updated':
          state.data = { ...state.data, ...event.data };
          state.version = event.version;
          break;
        'Deleted':
          state.data = {};
          state.version = event.version;
          break;
        'Versioned':
          state.data = { ...state.data, ...event.data };
          state.version = event.version;
          break;
        default:
          state.data = { ...state.data, ...event.data };
      }

      state.lastEvent = event.timestamp;
      return state;
    }
  }

  private startEventConsumption(): void {
      this.channel.consume('events', { noAck: true }, async (msg) => {
        const event = JSON.parse(msg.content.toString());
        const aggregateId = event.aggregateId;

        let events = this.events.get(aggregateId) || [];
        events.push(event);
        this.events.set(aggregateId, events);

        // Keep only last 1000 events per aggregate
        if (events.length > 1000) {
          events = events.slice(-1000);
          this.events.set(aggregateId, events);
        }

        this.channel.ack(msg);
        this.channel.nack(msg);
      });
  }

  // Create snapshot of aggregate state
  async createSnapshot(aggregateId: string, state: AggregateState): Promise<void> {
      const snapshotMessage = {
        aggregateId,
        state,
        timestamp: new Date(),
        version: state.version
      };

      await this.channel.sendToQueue('events-snapshot', Buffer.from(JSON.stringify(snapshotMessage)));
      console.log(`Created snapshot for aggregate ${aggregateId}, version ${state.version}`);
  }

  // Restore from snapshot
  async restoreFromSnapshot(aggregateId: string): Promise<AggregateState | null> {
      try {
        const q = await this.channel.getQueue('events-snapshot', { durable: true });
        const msg = await q.get({ noAck: true });
        if (msg === false) {
          return null;
        }

        const snapshot = JSON.parse(msg.content.toString());
        return snapshot.state;
      } catch (error) {
        console.error(`Failed to restore snapshot for aggregate ${aggregateId}: ${error}`);
        return null;
      }
  }
}

// 3. CQRS Pattern Implementation
// query-service.ts and command-service.ts

// Query Service (Read Model)
export class QueryService {
  private eventStore: EventStore;

  constructor(eventStore: EventStore) {
    this.eventStore = eventStore;
  }

  async getUserById(userId: string): Promise<any> {
    const state = await this.eventStore.getAggregateState(userId);
    return state?.data;
  }

  async getUsersByRole(role: string): Promise<any[]> {
    const users = [];

    // Get all user aggregates
    const allUserIds = await this.getAllUserIds();
    for (const userId of allUserIds) {
      const user = await this.getUserById(userId);
      if (user && user.role === role) {
        users.push(user);
      }
    }

    return users;
  }

  async getAllUserIds(): Promise<string[]> {
      // In a real implementation, you'd maintain a registry of all aggregate IDs
      return ['user-1', 'user-2', 'user-3'];
  }

  async searchUsers(query: string): Promise<any[]> {
      const results = [];

      // Simple text search implementation
      const allUserIds = await this.getAllUserIds();
      for (const userId of allUserIds) {
        const user = await this.getUserById(userId);
        if (user && (
          user.name.toLowerCase().includes(query.toLowerCase()) ||
          user.email.toLowerCase().includes(query.toLowerCase())
        )) {
          results.push(user);
        }
      }

      return results;
  }
}

// Command Service (Write Model)
export class CommandService {
  private eventStore: EventStore;

  constructor(eventStore: EventStore) {
    this.eventStore = eventStore;
  }

  async createUser(userData: Promise<string> {
    const userId = user-data.id || `user-${Date.now()}`;
    const event: Event = {
      id: `event-${Date.now()}`,
      type: 'UserCreated',
      data: userData,
      metadata: {
        source: 'command-service',
        timestamp: new Date()
      },
      timestamp: new Date(),
      aggregateId: userId,
      sequence: 1,
      version: 1
    };

    await this.eventStore.storeEvent(event);
    return userId;
  }

  async updateUser(userId: string, updates: Record<string, any>): Promise<void> {
    const currentState = await this.eventStore.getAggregateState(userId);
    const event: Event = {
      id: `event-${Date.now()}`,
      type: 'UserUpdated',
      data: updates,
      metadata: {
        source: 'command-service',
        timestamp: new Date()
      },
      timestamp: new Date(),
      aggregateId: userId,
      sequence: (currentState?.version || 0) + 1,
      version: (currentState?.version || 0) + 1
    };

    await this.eventStore.storeEvent(event);
  }

  async deleteUser(userId: string): Promise<void> {
    const event: Event = {
      id: `event-${Date.now()}`,
      type: 'UserDeleted',
      data: { deleted: true, deletedAt: new Date() },
      metadata: {
        source: 'command-service',
        timestamp: new Date()
      },
      timestamp: new Date(),
      aggregateId: userId,
      sequence: 1,
      version: 1
    };

    await this.eventStore.storeEvent(event);
  }
}

// 4. Event Bus Implementation
// event-bus.ts

import { EventEmitter } from 'events';

interface EventBusConfig {
  retryAttempts: number;
  retryDelay: number;
  deadLetterExchange: string;
  deadLetterQueue: string;
}

export class EventBus extends EventEmitter {
  private channel: Channel;
  private config: EventBusConfig;

  constructor(channel: Channel, config?: Partial<EventBusConfig>) {
    super();
    this.channel = channel;
    this.config = {
      retryAttempts: 3,
      retryDelay: 1000,
      deadLetterExchange: 'exchange.dlx',
      deadLetterQueue: 'queue.dlq',
      ...config
    };
  }

  async publishEvent(event: Event, routingKey?: string, options?: any): Promise<void> {
      const eventName = event.type;
      let routingKey = routingKey || eventName;

      try {
        const publishPromise = new Promise<void>((resolve, reject) => {
          const options = {
            expiration: options?.expiration || '24h',
            persistent: options?.persistent || false,
            timestamp: options?.timestamp || false,
            content_type: 'application/json',
            headers: options?.headers || {}
          };

          const success = this.channel.publish(
            event.type,
            routingKey,
            Buffer.from(JSON.stringify(event)),
            options,
            (err, ok) => {
              if (err) reject(err);
              else resolve(ok);
            }
          );

          // Handle success callback
          success.then(() => {
            console.log(`Event ${eventName} published to ${routingKey}`);
            resolve();
          });
        });

        await publishPromise;

      } catch (error) {
          console.error(`Failed to publish event ${eventName}: ${error}`);

          // Send to dead letter queue if configured
          if (this.config.deadLetterExchange) {
            this.publishToDeadLetterQueue(event, error);
          }

          throw error;
        }

      } catch (error) {
        console.error(`Critical error in event bus: ${error}`);
        throw error;
      }
  }

  private async publishToDeadLetterEvent(event: Event, originalError: Error): Promise<void> {
      const dlxEvent = {
        id: `dlx-event-${Date.now()}`,
        type: event.type,
        routing_key: 'dead-letter',
        data: {
          originalEvent: event,
          error: {
            message: originalError.message,
            stack: originalError.stack
          }
        },
        timestamp: new Date()
      };

      const dlxPromise = new Promise<void>((resolve, reject) => {
        const success = this.channel.publish(
          this.config.deadLetterExchange,
          'dead-letter',
          Buffer.from(JSON.stringify(dlxEvent))
        );
      });

      await dlxPromise;
  }

  async subscribe(pattern: string, handler: (event: Event) => Promise<void>, queueName?: string): Promise<void> {
      const queueOptions = {
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: [pattern],
        headers: {
          'x-dead-letter-exchange': this.config.deadLetterExchange
        }
      };

      const queueNameSuffix = queueName || pattern;

      const q = await this.channel.assertQueue(queueNameSuffix, queueOptions);

      await q.consume(async (msg) => {
        const event: Event = JSON.parse(msg.content.toString());
        await handler(event);
        q.ack();
      });

      console.log(`Subscribed to pattern ${pattern} on queue ${queueNameSuffix}`);
  }

  async createEvent(eventData: Record<string, any>, eventType: string): Promise<Event> {
    const event: Event = {
      id: `event-${Date.now()}`,
      type: eventType,
      data: eventData,
      metadata: {
        source: 'application',
        timestamp: new Date()
      },
      timestamp: new Date()
    };

    return event;
  }
}

// 5. Circuit Breaker for External Dependencies
// circuit-breaker.ts
interface CircuitBreakerOptions {
  timeout: number;
  failureThreshold: number;
  successThreshold: number;
  resetTimeout: number;
  retryDelay: number;
}

enum CircuitBreakerState {
  CLOSED = 'CLOSED',
  OPEN = 'OPEN',
  HALF_OPEN = 'HALF_OPEN'
}

export class CircuitBreaker {
  private state: CircuitBreakerState;
  private failureCount: number = 0;
  private successCount: number = 0;
  private lastFailureTime: Date | null = null;
  private nextAttempt: Date | null = null;
  private options: CircuitBreakerOptions;

  constructor(options: Partial<CircuitBreakerOptions>) {
    this.options = {
      timeout: 30000,
      failureThreshold: 5,
      successThreshold: 2,
      resetTimeout: 60000,
      retryDelay: 1000,
      ...options
    };
    this.state = CircuitBreakerState.CLOSED;
  }

  async execute<T>(operation: () => Promise<T>): Promise<T> {
      if (this.state === CircuitBreakerOPEN) {
        throw new Error('Circuit breaker is open');
      }

      const now = Date.now();

      // Reset if reset timeout has passed
      if (this.options.resetTimeout > 0 &&
          this.lastFailureTime &&
          (now.getTime() - this.lastFailureTime.getTime()) > this.options.resetTimeout) {
        this.reset();
      }

      if (this.state === CircuitBreaker.HALF_OPEN) {
        // In half-open state, allow a single attempt
        try {
          const result = await this.executeWithTimeout(operation);
          this.recordSuccess();
          return result;
        } catch (error) {
          this.recordFailure();
          this.state = CircuitBreaker.OPEN;
          throw error;
        }
      }

      try {
        const result = await this.executeWithTimeout(operation);
        this.recordSuccess();
        return result;
      } catch (error) {
        this.recordFailure();
        throw error;
      }
  }

  private async executeWithTimeout<T>(operation: () => Promise<T>): Promise<T> {
      return Promise.race([
        operation(),
        new Promise((_, reject) => {
          setTimeout(() => reject(new Error('Operation timeout')), this.options.timeout);
        })
      ]);
  }

  private recordSuccess(): void {
    this.successCount++;
    if (this.successCount >= this.options.successThreshold) {
      this.state = CircuitBreakerState.CLOSED;
    } else if (this.state === CircuitBreaker.HALF_OPEN) {
      this.state = CircuitBreaker.CLOSED;
    }
    this.failureCount = 0;
  }

  private recordFailure(): void {
    this.failureCount++;
    this.lastFailureTime = new Date();

    if (this.failureCount >= this.options.failureThreshold) {
      this.state = CircuitBreaker.OPEN;
    } else if (this.state === CircuitBreaker.CLOSED &&
               this.failureCount >= Math.floor(this.options.failureThreshold / 2)) {
      this.state = CircuitBreaker.HALF_OPEN;
    }

    this.successCount = 0;
  }

  private reset(): void {
    this.state = CircuitBreaker.CLOSED;
    this.failureCount = 0;
    this.successCount = 0;
    this.lastFailureTime = null;
    this.nextAttempt = null;
  }

  getState(): CircuitBreakerState {
    return this.state;
  }

  getStatus(): string {
    return this.state;
  }
}

// 6. Dead Letter Queue Handler
// dead-letter-queue-handler.ts

interface DeadLetterEvent {
  event: Event;
  error: Error;
  retryCount: number;
  lastAttempt: Date;
  originalExchange: string;
  originalRoutingKey: string;
}

export class DeadLetterQueueHandler {
  private channel: Channel;
  private retryAttempts: Map<string, number> = new Map();

  constructor(channel: Channel) {
    this.channel = channel;
    this.initializeDeadLetterQueue();
  }

  private async initializeDeadLetterQueue(): Promise<void> {
    await this.channel.assertExchange('exchange.dlx', { durable: true });
    await this.channel.assertQueue('queue.dlq', { durable: true });
    await this.channel.bindQueue('queue.dlq', 'exchange.dlx');

    console.log('Dead letter queue initialized');
  }

  async handleDeadLetter(event: Event, error: Error, retryCount: number): Promise<void> {
    const dlxEvent: DeadLetterEvent = {
      event,
      error,
      retryCount,
      lastAttempt: new Date(),
      originalExchange: 'exchange.main',
      originalRoutingKey: event.type
    };

    console.error(`Dead letter event for ${event.type}: ${error.message}`);

    // Check if we should retry
    const eventKey = ${event.type}:${event.aggregateId}`;
    const currentRetries = this.retryAttempts.get(eventKey) || 0;

    if (currentRetries < 3) {
      console.log(`Retrying dead letter event for ${event.type} (attempt ${currentRetries + 1}/3)`);

      this.retryAttempts.set(eventKey, currentRetries + 1);

      // Add retry delay with exponential backoff
      const retryDelay = Math.pow(2, currentRetries) * 1000;
      await new Promise(resolve => setTimeout(resolve, retryDelay));

      // Re-queue the event
      await this.requeueEvent(event);
    } else {
      // Max retries exceeded, log and persist
      console.error(`Max retries exceeded for event ${event.type}, logging to persistent storage`);
      await this.persistFailedEvent(dlxEvent);
    }
  }

  private async requeueEvent(event: Event): Promise<void> {
    // Find the original exchange and routing key
    const originalExchange = 'exchange.main';
    const routingKey = event.type;

    await this.channel.publish(
      originalExchange,
      routingKey,
      Buffer.from(JSON.stringify(event))
    );
  }

  private async persistFailedEvent(dlxEVent: DeadLetterEvent): Promise<void> {
    // In production, persist to database or log system
    console.error(
[PERSISTENT DEAD LETTER EVENT]
Event: ${dlxEvent.event.type}
Error: ${dlxVent.error.message}
Retry Count: ${dlxVent.retryCount}
`);
  }
}

// 7. Usage Examples
// main.ts - Application entry point

import { Connection } from 'amqplib/connection';

async function main() {
  try {
    // Establish RabbitMQ connection
    const connection = await connect({
      host: 'localhost',
      port: 5672,
      username: 'admin',
      password: process.env.RABBITMQ_PASSWORD,
    });

    console.log('Connected to RabbitMQ');

    // Initialize services
    const sagaManager = new SagaManager(connection);
    await sagaManager.initialize();

    const eventStore = new EventStore(connection);
    await eventStore.initialize();

    const eventBus = new EventBus(eventStore.channel);

    const queryService = new QueryService(eventStore);
    const commandService = new CommandService(eventStore);

    // Create sample saga
    const orderSaga = {
      id: 'order-saga-1',
      name: 'Order Processing Saga',
      timeout: 30000,
      steps: [
        {
          id: 'step-1',
          name: 'Validate Order Data',
          execute: async () => {
            // Validate order data
            return { valid: true, orderId: 'order-123' };
          },
          compensate: async () => {
            console.log('Compensating order creation');
          }
        },
        {
          'id': 'step-2',
          name: 'Process Payment',
          execute: async () => {
            // Process payment
            return { success: true, paymentId: 'pay-456' };
          },
          compensate: async () => {
            console.log('Refunding payment');
          }
        },
        {
          'id: 'step-3',
          'name: 'Create Order',
          execute: async () => {
            // Create order
            return { orderId: 'order-123', status: 'created' };
          },
          compensate: async () => {
            console.log('Cancel order');
          }
        },
        {
          'id: 'step-4',
          'name: 'Send Confirmation',
          execute: async () => {
            // Send confirmation
            console.log('Order confirmed');
          }
        }
      ]
    };

    sagaManager.createSaga(orderSaga);

    // Handle incoming events
    eventBus.subscribe('order.*', async (event) => {
      console.log('Received order event:', event.type, event.data);
      // Handle business logic here
    });

    // Execute sample saga
    const orderData = {
      id: 'order-123',
      customer: 'John Doe',
      items: [
        { id: 'item-1', name: 'Product 1', price: 100 },
        { id: 'message-id', name: 'Message ID: message-id-123', price: 50 }
      ],
      total: 150
    };

    await sagaManager.executeSaga(orderSaga, orderData);

  } catch (error) {
    console.error('Application startup failed:', error);
    process.exit(1);
  }
}

// 8. Environment Configuration for Different Environments
// config/development.ts
export const developmentConfig = {
  connection: {
    host: 'localhost',
    port: 5672,
    username: 'admin',
    password: 'dev_password',
    frameMax: 4096,
    heartbeat: 30
    timeout: 10000
  },
  eventBus: {
    retryAttempts: 3,
    retryDelay: 1000,
    deadLetterExchange: 'exchange.dlx',
    deadLetterQueue: 'queue.dlq'
  },
  sagas: {
    timeout: 30000,
    maxConcurrent: 10
  }
};

// config/production.ts
export const productionConfig = {
  connection: {
    urls: [
      'amqps://rabbitmq-main:5672',
      'amqps://rabbitmq-backup:5673'
    ],
    credentials: {
      username: 'prod-user',
      password: process.env.RABBITMQ_PASSWORD,
      heartbeat: 20,
      timeout: 5000
    }
  },
  eventBus: {
    retryAttempts: 5,
    retryDelay: 5000,
    deadLetterExchange: 'exchange.dlx.production',
    deadLetterQueue: 'queue.dlq.production'
  },
  sagas: {
    timeout: 60000,
    maxConcurrent: 50
  },
  monitoring: {
    metricsEnabled: true,
    tracingEnabled: true,
    loggingLevel: 'warn'
  }
};

// config/test.ts
export const testConfig = {
  connection: {
    host: 'localhost',
    port: 5672,
    username: 'test',
    password: 'test_password',
    frameMax: 1024
  },
  eventBus: {
    retryAttempts: 1,
    retryDelay: 500
  }
};