Exemples Avancés RabbitMQ

Configuration RabbitMQ avancée incluant clustering, haute disponibilité, optimisation et patterns d'entreprise

Key Facts

Category
Message Queue
Items
2
Format Families
sample

Sample Overview

Configuration RabbitMQ avancée incluant clustering, haute disponibilité, optimisation et patterns d'entreprise This sample set belongs to Message Queue and can be used to test related workflows inside Elysia Tools.

⚙️ Configuration Cluster RabbitMQ

🔴 complex ⭐⭐⭐⭐

Configuration de cluster RabbitMQ haute disponibilité avec basculement et basculement

⏱️ 60 min 🏷️ rabbitmq, message-queue, enterprise, clustering
Prerequisites: Docker, PostgreSQL, Container orchestration, Messaging concepts
# RabbitMQ Advanced Cluster Configuration
# Enterprise-grade setup with clustering and high availability

# 1. Docker Compose for Cluster Setup
# docker-compose.yml
version: '3.8'

services:
  rabbitmq1:
    image: rabbitmq:3.12-management
    hostname: rabbitmq1
    container_name: rabbitmq1
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
      RABBITMQ_USE_LONGNAME: "true"
      RABBITMQ_NODENAME: rabbit@rabbitmq1
      RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq2,rabbit@rabbitmq3
      RABBITMQ_RAM_NODE: "false"
      RABBITMQ_DISK_NODE: "true"
      RABBITMQ_DISK_FREE_LIMIT: "1.0"
      RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
      RABBITMQ_VM_MEMORY_LIMIT: "2000"
      RABBITMQ_FLOW_ACTIVE_TIMEOUT: "60000000"
      RABBITMQ_FLOW_PASSIVE_TIMEOUT: "90000000"
      RABBITMQ_HEARTBEAT_TIMEOUT: "60"
      RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbit disk_free_limit.abs.disk_limit 1024MB -rabbit vm_memory_high_watermark 1000MB"
    ports:
      - "5672:5672"     # AMQP port
      - "15672:15672"   # Management UI
      - "25672:25672"   # STOMP
      - "61613:61613"   # TLS AMQP
    volumes:
      - rabbitmq1_data:/var/lib/rabbitmq
      - ./rabbitmq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./rabbitmq1/definitions.json:/etc/rabbitmq/definitions.json
    networks:
      - rabbitmq-cluster
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s
    restart: unless-stopped

  rabbitmq2:
    image: rabbitmq:3.12-management
    hostname: rabbitmq2
    container_name: rabbitmq2
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
      RABBITMQ_USE_LONGNAME: "true"
      RABBITMQ_NODENAME: rabbit@rabbitmq2
      RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq1,rabbit@rabbitmq3
      RABBITMQ_RAM_NODE: "false"
      RABBITMQ_DISK_NODE: "true"
      RABBITMQ_DISK_FREE_LIMIT: "1.0"
      RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
      RABBITMQ_VM_MEMORY_LIMIT: "2000"
    ports:
      - "5673:5672"
      - "15673:15672"
      - "25673:25672"
      - "61614:613"
    volumes:
      - rabbitmq2_data:/var/lib/rabbitmq
      - ./rabbitmq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./rabbitmq2/definitions.json:/etc/rabbitmq/definitions.json
    networks:
      - rabbitmq-cluster
    depends_on:
      - rabbitmq1
    restart: unless-stopped

  rabbitmq3:
    image: rabbitmq:3.12-management
    hostname: rabbitmq3
    container_name: rabbitmq3
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
      RABBITMQ_USE_LONGNAME: "true"
      RABBITMQ_NODENAME: rabbit@rabbitmq3
      RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq1,rabbit@rabbitmq2
      RABBITMQ_RAM_NODE: "false"
      RABBITMQ_DISK_NODE: "true"
      RABBITMQ_DISK_FREE_LIMIT: "1.0"
      RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
      RABBITMQ_VM_MEMORY_LIMIT: "2000"
    ports:
      - "5674:5672"
      - "15674:15672"
      - "25674:25672"
      - "61615:613"
    volumes:
      - rabbitmq3_data:/var/lib/rabbitmq
      - ./rabbitmq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./rabbitmq3/definitions.json:/etc/rabbitmq/definitions.json
    networks:
      - rabbitmq-cluster
    depends_on:
      - rabbitmq1
    restart: unless-stopped

  # HAProxy for Load Balancing
  haproxy:
    image: haproxy:2.8
    container_name: haproxy
    ports:
      - "5670:5670"   # Main AMQP port
      - "15670:15672" # Management UI
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
    depends_on:
      - rabbitmq1
      - rabbitmq2
      - rabbitmq3
    networks:
      - rabbitmq-cluster
    restart: unless-stopped

  # Prometheus for Monitoring
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    networks:
      - rabbitmq-cluster
    restart: unless-stopped

  # Grafana for Visualization
  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana/datasources:/etc/grafana/provisioning/datasources
    networks:
      - rabbitmq-cluster
    depends_on:
      - prometheus
    restart: unless-stopped

  # RabbitMQ Exporter
  rabbitmq-exporter:
    image: kbudry/rabbitmq-exporter:latest
    container_name: rabbitmq-exporter
    ports:
      - "9419:9419"
    environment:
      RABBIT_URL: http://haproxy:15672
      PUBLISH_PORT: 9419
      RABBIT_USER: admin
      RABBIT_PASS: ${RABBITMQ_PASSWORD}
      EXCLUDE_QUEUES: "^amq\.default"
    networks:
      - rabbitmq-cluster
    depends_on:
      - haproxy
    restart: unless-stopped

volumes:
  rabbitmq1_data:
  rabbitmq2_data:
  rabbitmq3_data:
  grafana_data:

networks:
  rabbitmq-cluster:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

# 2. RabbitMQ Configuration for Cluster
# rabbitmq1/rabbitmq.conf
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq2
cluster_cluster_name = rabbitmq-cluster
cluster_formation.node_cleanup_interval = 30
cluster_formation.node_cleanup_only = false
cluster_formation.peer_discovery_interval = 5

# Disk and Memory Configuration
disk_free_limit.absolute = 2GB
disk_free_limit.percentage = 90
vm_memory_high_watermark.relative = 80
vm_memory_limit.absolute = 2GB

# Performance Tuning
heartbeat_timeout = 60
consumer_timeout = 3600000
consumer_prefetch_count = 10
message_ttl = 86400000

# Security
loopback_users.guest = false
listeners.tcp.default = 5672
listeners.ssl.default = 5671

# Management Plugin
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
management.tcp.disable = false
management.ssl.port = 15671
management.ssl.ip = 0.0.0.0
management.ssl.disable = false

# 3. HAProxy Configuration for Load Balancing
# haproxy.cfg
global
  daemon
  maxconn 4096
  log /dev/log local0
  log /dev/log local1 notice
  defaults
    mode tcp
    timeout connect 5000
    timeout client 50000
    timeout server 50000
  frontend ft_rabbitmq_main
    bind *:5670
    default_backend backend_rabbitmq_servers
    maxconn 4096
    timeout client 50000
    timeout server 50000
    option tcplog
    option logasap
  frontend ft_rabbitmq_management
    bind *:15670
    default_backend backend_rabbitmq_management
    timeout client 50000
    timeout server 50000
    option tcplog
    option logasap
  backend backend_rabbitmq_servers
    balance roundrobin
    mode tcp
    option tcp-check
    server rabbitmq1 5672 check inter 5s rise 2 fall 3
    server rabbitmq2 5673 check inter 5s rise 2 fall 3
    server rabbitmq3 5674 check inter 5s rise 2 fall 3
    maxconn 2000
    timeout server 50000
  backend backend_rabbitmq_management
    balance roundrobin
    mode http
    option httpchk GET /api/healthcheck
    option httplog
    option logasap
    server rabbitmq1 15672 check inter 5s rise 2 fall 3
    maxconn 1000
    timeout server 50000

# 4. Prometheus Configuration for RabbitMQ
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets:
        - 'rabbitmq-exporter:9419'
    metrics_path: /metrics
    scrape_interval: 5s
    scrape_timeout: 10s

# 5. RabbitMQ Definitions and Policies
# rabbitmq1/definitions.json
{
  "users": [
    {
      "name": "admin",
      "password": "${RABBITMQ_PASSWORD}",
      "tags": ["administrator"],
      "hashing_algorithm": "SHA-256",
      "permissions": {
        "configure": ".*",
        "write": ".*",
        "read": ".*"
      }
    },
    {
      "name": "service_user",
      "password": "service_password",
      "tags": ["service"],
      "hashing_algorithm": "SHA-256",
      "permissions": {
        "configure": ".*",
        "write": "service.*",
        "read": "service.*"
      }
    },
    {
      "name": "monitoring_user",
      "password": "monitoring_password",
      "tags": ["monitoring"],
      "hashing_algorithm": "backup",
      "permissions": {
        "configure": "metrics.*",
        "read": "metrics.*"
      }
    }
  ],
  "vhosts": [
    {
      "name": "/",
      "description": "Default virtual host"
    },
    {
      "name": "production",
      "description": "Production environment"
    },
    {
      "name": "staging",
      "description": "Staging environment"
    },
    {
      "name": "development",
      "description": "Development environment"
    }
  ],
  "permissions": [
    {
      "user": "service_user",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "monitoring_user",
      "vhost": "/",
      "configure": "metrics.*",
      "read": ".*"
    }
  ],
  "queues": [
    {
      "name": "user_events_queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-max-length": 10000,
        "x-overflow": "drop-head"
      }
    },
    {
      "name": "order_processing_queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-max-priority": 10,
        "x-queue-mode": "lazy"
      }
    },
    {
      "name": "notifications_queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-message-ttl": 86400000,
        "x-dead-letter-exchange": "notifications-dlx"
      }
    },
    {
      "name": "priority_queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-max-priority": 10,
        "x-queue-mode": "priority",
        "x-queue-mode-legacy": "true"
      }
    }
  ],
  "exchanges": [
    {
      "name": "user_events_exchange",
      "vhost": "/",
      "type": "topic",
      "durable": true,
      "auto_delete": false
    },
    {
      "name": "orders_exchange",
      "vhost": "/",
      "type": "direct",
      "durable": true,
      "auto_delete": false
    },
    {
      "name": "notifications_exchange",
      "vhost": "/",
      "type": "fanout",
      "durable": true,
      "auto_delete": false
    }
  ],
  "bindings": [
    {
      "source": "user_events_exchange",
      "destination": "user_events_queue",
      "destination_type": "queue",
      "routing_key": "user.*.events",
      "arguments": {}
    },
    {
      "source": "orders_exchange",
      "destination": "order_processing_queue",
      "destination_type": "queue",
      "routing_key": "order.created",
      "arguments": {}
    },
    {
      "source": "notifications_exchange",
      "destination": "notifications_queue",
      "destination_type": "queue",
      "routing_key": "*",
      "arguments": {}
    }
  ]
}

# 6. High Availability Scripts
# setup-cluster.sh
#!/bin/bash

set -e

echo "Setting up RabbitMQ cluster..."

# Wait for RabbitMQ containers to be ready
echo "Waiting for RabbitMQ containers to start..."
until docker exec rabbitmq1 rabbitmq-diagnostics -q ping; do
  echo "Waiting for rabbitmq1..."
  sleep 5
done

until docker exec rabbitmq2 rabbitmq-diagnostics -q ping; do
  echo "Waiting for rabbitmq2..."
  sleep 5
done

until docker exec rabbitmq3 rabbitmq-diagnostics -s ping; do
  echo "Waiting for rabbitmq3..."
  sleep 5
done

echo "All RabbitMQ nodes are ready!"

# Verify cluster status
echo "Verifying cluster status..."
docker exec rabbitmq1 rabbitmqctl cluster_status

# Create federation upstream for external connectivity
echo "Setting up federation upstream..."
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/parameters/federation-upstreams \
  -X PUT -H "content-type: application/json" -d '{
    "value": {
      "uri": "amqps://external-rabbitmq:5672",
      "expires": 3600000,
      "reconnect-delay": 5000
    }
  }'

# Create policies
echo "Creating policies..."
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/policies/hourly-rate-limit \
  -X PUT -H "content-type:application/json" -d '{
    "pattern": "^amq\.default",
    "definition": {
      "type": "classic",
      "max-count": 10000,
      "priority": 100
    }
  }'

curl -u admin:${RABMQ_PASSWORD} http://localhost:15672/api/policies/queue-size-limit \
  -X PUT -H "content-type:application/json" -d '{
    "pattern": "^amq\.default",
    "definition": {
      "max-length": 5000,
      "priority": 50
    }
  }'

# Enable federation
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/parameters/federation-upstreams/external-rabbitmq/set-policy   -X POST -H "content-type: application/json" -d '{
    "value": {
      "apply-to": "all"
    }
  }'

echo "RabbitMQ cluster setup completed!"

# 7. Performance Monitoring Script
# monitor-cluster.sh
#!/bin/bash

echo "Monitoring RabbitMQ cluster..."

# Get cluster status
echo "=== Cluster Status ==="
curl -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/cluster_status | jq '.'

# Get node details
echo -e "
=== Node Details ==="
for i in {1..3}; do
  echo -e "
=== RabbitMQ$i ==="
  curl -s -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/nodes/rabbit@rabbitmq$i | jq '.memory.used_bytes, memory.limit_bytes, fd_used, fd_total, running_processes'
done

# Get queue statistics
echo -e "
=== Queue Statistics ==="
curl -s -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/queues | jq '.[] | {name, messages, memory, consumers}'

# Get connection statistics
echo -e "
=== Connection Statistics ==="
curl -s -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/connections | jq '.[] | {name, state, channels, peer_addr, client_properties}'

# Get channel statistics
echo -e "
=== Channel Statistics ==="
curl -s -u admin:${RABBITMQ} http://localhost:15672/api/channels | jq '.[] | {name, state, connection_name, messages, consumer_count}' | head -20

# Get exchange statistics
echo -e "
=== Exchange Statistics ==="
curl -s -u admin:${RABBITMQ} http://localhost:15672/api/exchanges | jq '.[] | {name, type, messages, message_stats_in, message_stats_out}' | head -10

# Health check function
check_health() {
  local port=$1
  local host=${2:-localhost}
  local path=${3:-/api/healthcheck}

  echo "Performing health check on $host:$port..."

  local response=$(curl -s -o /dev/null -w "%{http_code}" "http://$host:$port$path")

  if [ "$response" = "200" ]; then
    echo "✓ Health check passed"
    return 0
  else
    echo "✗ Health check failed with status $response"
    return 1
  fi
}

# Health checks
check_health 5672 rabbitmq1
check_health 5672 rabbitmq2
check_health 5672 rabbitmq3
check_health 15672 haproxy

echo "Health checks completed!"

# 8. Environment Variables
# .env
RABBITMQ_PASSWORD=secure_password_here
GRAFANA_PASSWORD=admin_password_here

# Node-specific configurations
RABBITMQ1_NAME=rabbit1
RABBITMQ2_NAME=rabbit2
RABBITMQ3_NAME=rabbit3

# Network configuration
RABBITMQ_CLUSTER_COOKIE=very_secure_cluster_cookie
RABBITMQ_DEFAULT_USER=admin
RABBITMQ_DEFAULT_VHOST=/
RABBITMQ_NODENAME=rabbit@$(hostname)

# Performance tuning
RABBITMQ_VM_MEMORY_HIGH_WATERMARK=1000
RABBITMQ_VM_MEMORY_LIMIT=2000
RABBITMQ_DISK_FREE_LIMIT=1GB
RABBITMQ_FLOW_ACTIVE_TIMEOUT=60000000
RABBITMQ_FLOW_PASSIVE_TIMEOUT=90000000

# Monitoring and Logging
RABBITMQ_LOG_LEVEL=INFO
RABBITMQ_PROMETHEUS_ENABLED=true
RABBITMQ_PROMETHEUS_CONFIG_FILE=/etc/rabbitmq/prometheus.json
RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_prometheus,rabbitmq_shovel

# Security
RABBITMQ_SSL_ENABLED=false
RABBITMQ_SSL_VERIFY=verify_peer
RABBITMQ_SSL_FAIL_IF_NO_PEER_CERT=true
RABBITMQ_SSL_CERT_FILE=/etc/ssl/rabbitmq.crt
RABBITMQ_KEY_FILE=/etc/ssl/rabbitmq.key
RABBITMQ_CA_FILE=/etc/ssl/ca_bundle.crt

# 9. Application Connection Examples

# Node.js Connection with Connection Pool
const amqp = require('amqplib');
const cluster = require('amqplib/lib/cluster');

// Connection pool for high-traffic applications
const connectionPool = {
  hostname: ['localhost:5672', 'localhost:5673', 'localhost:5674'],
  port: 5672,
  username: 'admin',
  channelMax: 10,
  heartbeat: 30,
  connection_timeout: 10000,
  retry: 3
};

async function createRabbitConnection() {
  try {
    const connection = await amqp.connect({
      hostname: connectionPool.hostname[0],
      port: connectionPool.port,
      username: connectionPool.username,
      password: process.env.RABBITMQ_PASSWORD,
      heartbeat: connectionPool.heartbeat,
      connection_timeout: connectionPool.connection_timeout
    });

    console.log('RabbitMQ connection established');
    return connection;
  } catch (error) {
    console.error('Failed to connect to RabbitMQ:', error);
    throw error;
  }
}

// Cluster connection with failover
async function createClusterConnection() {
  return new Promise((resolve, reject) => {
    const clusterConfig = {
      addresses: [
        'amqp://localhost:5672',
        'amqp://localhost:5673',
        'amqp://localhost:5674'
      ],
      username: 'admin',
      password: process.env.RABBITMQ_PASSWORD,
      timeout: 10000,
      heartbeat: 10,
      reconnect: true,
      reconnectBackoffStrategy: 'linear'
    };

    cluster.connect(clusterConfig, (err, connection) => {
      if (err) {
        console.error('Failed to connect to RabbitMQ cluster:', err);
        reject(err);
      } else {
        console.log('RabbitMQ cluster connection established');
        resolve(connection);
      }
    });
  });
}

// Python Connection with Pika
import pika
from pika.connection import ConnectionParameters
from pika.exceptions import AMQPConnectionError

# High availability connection
def create_connection():
    credentials = pika.PlainCredentials(
        'admin',
        os.environ['RABBITMQ_PASSWORD']
    )

    parameters = ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=credentials,
        heartbeat=600,
        blocked_connection_timeout=30,
        connection_attempts=3,
        retry_delay=5
    )

    try:
        connection = pika.BlockingConnection(parameters)
        print("RabbitMQ connection established")
        return connection
    except AMQPConnectionError as e:
        print(f"Failed to connect to RabbitMQ: {e}")
        raise

# Connection with retry and circuit breaker
class RabbitMQConnectionManager:
    def __init__(self, max_retries=3, retry_delay=5):
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.connection = None
        self.circuit_breaker_open = False
        self.circuit_breaker_timeout = 30000  # 30 seconds
        self.circuit_breaker_reset_timeout = 60000  # 1 minute

    def get_connection(self):
        if self.circuit_breaker_open:
            print("Circuit breaker is open, refusing connection attempts")
            return None

        if self.connection and self.connection.is_open:
            return self.connection

        return self._connect_with_retry()

    def _connect_with_retry(self):
        last_exception = None
        for attempt in range(self.max_retries):
            try:
                connection = create_connection()
                self.connection = connection
                self.circuit_breaker_open = False
                return connection
            except Exception as e:
                last_exception = e
                if attempt < self.max_retries - 1:
                    print(f"Connection attempt {attempt + 1} failed, retrying in {self.retry_delay}s...")
                    time.sleep(self.retry_delay)
                continue
                else:
                    self.circuit_breaker_open = True
                    raise last_exception

        # Set circuit breaker to close after timeout
        threading.Timer(self.circuit_breaker_reset_timeout, self._reset_circuit_breaker).start()
        raise last_exception

    def _reset_circuit_breaker(self):
        print("Resetting circuit breaker")
        self.circuit_breaker_open = False

# 10. Production Deployment Configuration
# Kubernetes deployment
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq-cluster
  namespace: production
  labels:
    app: rabbitmq
spec:
  serviceName: rabbitmq
  replicas: 3
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      terminationGracePeriodSeconds: 30
      containers:
      - name: rabbitmq
        image: rabbitmq:3.12-management
        ports:
        - containerPort: 5672
          name: amqp
        - containerPort: 15672
          name: management
        - containerPort: 25672
          name: stomp
        env:
        - name: RABBITMQ_DEFAULT_USER
          value: "admin"
        - name: RABBITMQ_DEFAULT_PASS
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password
        - name: RABBITMQ_ERLANG_COOKIE
          value: "rabbitmq-cluster-cookie"
        - name: RABBITMQ_USE_LONGNAME
          value: "true"
        - name:1:RABBITMQ_NODENAME
          value: "rabbit@$(POD_NAME)-$(POD_HOSTNAME)"
        - name: RABBITMQ_CLUSTER_NODES
          value: "rabbit@$( (($(POD_NAME)-$(POD_HOSTNAME) | tr -d ' '))\,"
        - name: RABBITMQ_DISK_NODE
          value: "true"
        - name: RABBITMQ_RAM_NODE
          value: "false"
        - name: RABBITMQ_DISK_FREE_LIMIT
          value: "1.0"
        - name: RABBITMQ_VM_MEMORY_HIGH_WATERMARK
          value: "1000"
        - name: RABBITMQ_VM_MEMORY_LIMIT
          value: "2000"
        volumeMounts:
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
        - name: rabbitmq-config
          mountPath: /etc/rabbitmq
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - -q
            - ping
          initialDelaySeconds: 60
          periodSeconds: 30
          timeoutSeconds: 10
        readinessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - -q
            - ping
          initialDelaySeconds: 20
          periodSeconds: 10
          timeoutSeconds: 5
      volumes:
      - name: rabbitmq-data
        persistentVolumeClaim:
          claimName: rabbitmq-data
      - name: rabbitmq-config
        configMap:
          name: rabbitmq-config
      - name: rabbitmq-definitions
        configMap:
          name: rabbitmq-definitions

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: rabbitmq-data
  namespace: production
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  storageClassName: fast-ssd

---
apiVersion: v1
kind: Secret
metadata:
  name: rabbitmq-secret
  namespace: production
type: Opaque
data:
  password: <base64-encoded-password>

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
  namespace: production
data:
  rabbitmq.conf: |
    cluster_formation.peer_discovery_backend = classic_config
    cluster_formation.classic_config.nodes.1 = [email protected]
    cluster_formation.classic_config.nodes.2 = [email protected]
    cluster_formation.classic_config.nodes.3 = [email protected]
    cluster_formation.classic_config.nodes.4 = [email protected]
    disk_free_limit.absolute = 2GB
    vm_memory_high_watermark = 1000MB
    vm_memory_limit = 2GB
    heartbeat_timeout = 60
    connection_timeout = 10
    consumer_timeout = 3600000
    consumer_prefetch_count = 10
    message_ttl = 86400000

  definitions.json: |
    {
      "users": [
        {
          "name": "admin",
          "password": "${RABBITMQ_PASSWORD}",
          "tags": ["administrator"],
          "permissions": {
            "configure": ".*",
            "write": ".*",
            "read": ".*"
          }
        }
      ],
      "vhosts": [
        {
          "name": "/",
          "description": "Production virtual host"
        }
      ],
      "queues": [
        {
          "name": "high-priority-queue",
          "durable": true,
          "arguments": {
            "x-max-priority": 10
          }
        }
      ]
    }

💻 Patterns Microservices RabbitMQ typescript

🔴 complex ⭐⭐⭐⭐

Patterns d'architecture microservices RabbitMQ incluant Saga pattern, event sourcing et CQRS

⏱️ 55 min 🏷️ rabbitmq, microservices, patterns, architecture
Prerequisites: Node.js, TypeScript, Message Queue, Event-Driven Architecture
// RabbitMQ Microservices Architecture Patterns
// Event-driven architecture with RabbitMQ

// 1. Saga Pattern Implementation
// saga-manager.ts

import { Connection, Channel, ConsumeMessage } from 'amqplib/connection';
import { EventEmitter } from 'events';

interface SagaStep {
  id: string;
  execute: () => Promise<any>;
  compensate?: () => Promise<any>;
  onFail?: (error: Error) => Promise<void>;
  onSuccess?: (result: any) => Promise<void>;
}

interface Saga {
  id: string;
  name: string;
  steps: SagaStep[];
  timeout: number;
}

interface SagaState {
  id: string;
  currentStep: number;
  status: 'pending' | 'running' | 'completed' | 'failed' | 'compensating';
  data: Record<string, any>;
  compensations: Array<() => Promise<void>>;
  error?: Error;
  startTime: Date;
  endTime?: Date;
}

export class SagaManager {
  private connection: Connection;
  private channel: Channel;
  private sagas: Map<string, Saga> = new Map();
  private sagaStates: Map<string, SagaState> = new Map();
  private eventEmitter = new EventEmitter();

  constructor(connection: Connection) {
    this.connection = connection;
  }

  async initialize(): Promise<void> {
    this.channel = await this.connection.createChannel();
    console.log('Saga Manager initialized');
  }

  // Create a new saga
  createSaga(saga: Saga): void {
    this.sagas.set(saga.id, saga);
    console.log(`Created saga: ${saga.name} (${saga.id})`);
  }

  // Execute a saga
  async executeSaga(saga: Saga, data: any = {}): Promise<void> {
    const sagaState: SagaState = {
      id: saga.id,
      currentStep: 0,
      status: 'pending',
      data,
      compensations: [],
      startTime: new Date()
    };

    this.sagaStates.set(saga.id, sagaState);

    try {
      for (let i = 0; i < saga.steps.length; i++) {
      const step = saga.steps[i];
      sagaState.currentStep = i;
      sagaState.status = 'running';

      console.log(`Executing saga ${saga.name}, step ${i + 1}/${saga.steps.length}: ${step.constructor.name}`);

      // Record compensation if available
      if (step.compensate) {
        sagaState.compensations.push(step.compensate);
      }

      // Execute step with timeout
      const result = await this.executeStepWithTimeout(step, saga.timeout);

      // Store step result
      sagaState.data[`step_${i}`] = result;

      // Notify progress
      this.eventEmitter.emit('stepCompleted', {
        sagaId: saga.id,
        sagaName: saga.name,
        step: i + 1,
        totalSteps: saga.steps.length,
        result
      });

      if (step.onSuccess) {
        await step.onSuccess(result);
      }
    }

    sagaState.status = 'completed';
    sagaState.endTime = new Date();

    console.log(`Saga ${saga.name} completed successfully in ${sagaState.endTime.getTime() - sagaState.startTime.getTime()}ms`);
    this.eventEmitter.emit('sagaCompleted', sagaState);

  } catch (error) {
    sagaState.status = 'failed';
    sagaState.error = error;
    sagaState.endTime = new Date();

    console.error(`Saga ${saga.name} failed: ${error.message}`);

    // Execute compensations in reverse order
    console.log(`Executing compensations for saga ${saga.name}...`);
    await this.executeCompensations(sagaState.compensations);

    if (step.onFail) {
      await step.onFail(error);
    }

    sagaState.status = 'compensated';
    this.eventEmitter.emit('sagaCompensated', sagaState);

    throw error;
  }
  }

  private async executeStepWithTimeout(step: SagaStep, timeout: number): Promise<any> {
    return Promise.race([
      step.execute(),
      new Promise((_, reject) => {
        setTimeout(() => reject(new Error(`Saga step timeout after ${timeout}ms`)), timeout);
      })
    ]);
  }

  private async executeCompensations(compensations: Array<() => Promise<void>>): Promise<void> {
    for (let i = compensations.length - 1; i >= 0; i--) {
      try {
        await compensations[i]();
      } catch (error) {
        console.error(`Compensation failed: ${error.message}`);
        // Continue with other compensations even if one fails
      }
    }
  }

  // Event listener methods
  onSagaCompleted(callback: (state: SagaState) => void): void {
    this.eventEmitter.on('sagaCompleted', callback);
  }

  onSagaCompensated(callback: (state: SetState) => void): void {
    this.eventEmitter.on('sagaCompensated', callback);
  }

  onStepCompleted(callback: (data: any) => void => void {
    this.eventEmitter.on('stepCompleted', callback);
  }
}

// 2. Event Sourcing Pattern
// event-store.ts

interface Event {
  id: string;
  type: string;
  data: any;
  metadata: Record<string, any>;
  timestamp: Date;
  aggregateId: string;
  sequence: number;
  version: number;
}

interface AggregateState {
  id: string;
  type: string;
  version: number;
  data: Record<string, any>;
  lastEvent: Date;
}

export class EventStore {
  private connection: Connection;
  private channel: Channel;
  private events: Map<string, Event[]> = new Map();
  private aggregates: Map<string, AggregateState> = new Map();

  constructor(connection: Connection) {
    this.connection = connection;
  }

  async initialize(): Promise<void> {
    this.channel = await this.connection.createChannel();
    await this.channel.assertQueue('events', { durable: true });
    await this.channel.assertQueue('events-replay', { durable: true });
    await this.channel.assertQueue('events-snapshot', { durable: true });

    // Start consuming events
    this.startEventConsumption();
    console.log('Event Store initialized');
  }

  async storeEvent(event: Event): Promise<void> {
      const eventMessage = {
        id: event.id,
        type: event.type,
        data: event.data,
        metadata: event.metadata,
        timestamp: event.timestamp,
        aggregateId: event.aggregateId,
        sequence: event.sequence,
        version: event.version
      };

      await this.channel.sendToQueue('events', Buffer.from(JSON.stringify(eventMessage)));
      console.log(`Event stored: ${event.type} for aggregate ${event.aggregateId}`);
  }

  async getEventsForAggregate(aggregateId: string): Promise<Event[]> {
      const events = this.events.get(aggregateId) || [];
      if (events.length > 0) {
        return events;
      }

      // Replay events from queue
      try {
        const q = await this.channel.getQueue('events-replay', { durable: true });

        let hasMore = true;
        while (hasMore) {
          const msg = await q.get({ noAck: false });
          if (msg === false) break;

          const event = JSON.parse(msg.content.toString());
          if (event.aggregateId === aggregateId) {
            events.push(event);
            events.push(event);
          }

          hasMore = false;
          // In a real implementation, you'd track sequence numbers properly
        }

        this.events.set(aggregateId, events);
        return events;
      } catch (error) {
        console.error(`Failed to replay events for aggregate ${aggregateId}: ${error}`);
        return [];
      }
  }

  async getAggregateState(aggregateId: string): Promise<AggregateState | null> {
      const events = await this.getEventsForAggregate(aggregateId);
      if (events.length === 0) {
        return null;
      }

      // Apply events in order
      let state: AggregateState = {
        id: aggregateId,
        type: events[0].type.split('.')[1],
        version: events[0].version,
        data: events[0].data,
        lastEvent: events[events.length - 1].timestamp
      };

      for (let i = 1; i < events.length; i++) {
        const event = events[i];
        state = this.applyEventToState(state, event);
      }

      return state;
  }

  private applyEventToState(state: AggregateState, event: Event): AggregateState {
      const [entityType, action] = event.type.split('.');

      switch (action) {
        case 'Created':
          state.data = event.data;
          state.version = event.version;
          break;
        'Updated':
          state.data = { ...state.data, ...event.data };
          state.version = event.version;
          break;
        'Deleted':
          state.data = {};
          state.version = event.version;
          break;
        'Versioned':
          state.data = { ...state.data, ...event.data };
          state.version = event.version;
          break;
        default:
          state.data = { ...state.data, ...event.data };
      }

      state.lastEvent = event.timestamp;
      return state;
    }
  }

  private startEventConsumption(): void {
      this.channel.consume('events', { noAck: true }, async (msg) => {
        const event = JSON.parse(msg.content.toString());
        const aggregateId = event.aggregateId;

        let events = this.events.get(aggregateId) || [];
        events.push(event);
        this.events.set(aggregateId, events);

        // Keep only last 1000 events per aggregate
        if (events.length > 1000) {
          events = events.slice(-1000);
          this.events.set(aggregateId, events);
        }

        this.channel.ack(msg);
        this.channel.nack(msg);
      });
  }

  // Create snapshot of aggregate state
  async createSnapshot(aggregateId: string, state: AggregateState): Promise<void> {
      const snapshotMessage = {
        aggregateId,
        state,
        timestamp: new Date(),
        version: state.version
      };

      await this.channel.sendToQueue('events-snapshot', Buffer.from(JSON.stringify(snapshotMessage)));
      console.log(`Created snapshot for aggregate ${aggregateId}, version ${state.version}`);
  }

  // Restore from snapshot
  async restoreFromSnapshot(aggregateId: string): Promise<AggregateState | null> {
      try {
        const q = await this.channel.getQueue('events-snapshot', { durable: true });
        const msg = await q.get({ noAck: true });
        if (msg === false) {
          return null;
        }

        const snapshot = JSON.parse(msg.content.toString());
        return snapshot.state;
      } catch (error) {
        console.error(`Failed to restore snapshot for aggregate ${aggregateId}: ${error}`);
        return null;
      }
  }
}

// 3. CQRS Pattern Implementation
// query-service.ts and command-service.ts

// Query Service (Read Model)
export class QueryService {
  private eventStore: EventStore;

  constructor(eventStore: EventStore) {
    this.eventStore = eventStore;
  }

  async getUserById(userId: string): Promise<any> {
    const state = await this.eventStore.getAggregateState(userId);
    return state?.data;
  }

  async getUsersByRole(role: string): Promise<any[]> {
    const users = [];

    // Get all user aggregates
    const allUserIds = await this.getAllUserIds();
    for (const userId of allUserIds) {
      const user = await this.getUserById(userId);
      if (user && user.role === role) {
        users.push(user);
      }
    }

    return users;
  }

  async getAllUserIds(): Promise<string[]> {
      // In a real implementation, you'd maintain a registry of all aggregate IDs
      return ['user-1', 'user-2', 'user-3'];
  }

  async searchUsers(query: string): Promise<any[]> {
      const results = [];

      // Simple text search implementation
      const allUserIds = await this.getAllUserIds();
      for (const userId of allUserIds) {
        const user = await this.getUserById(userId);
        if (user && (
          user.name.toLowerCase().includes(query.toLowerCase()) ||
          user.email.toLowerCase().includes(query.toLowerCase())
        )) {
          results.push(user);
        }
      }

      return results;
  }
}

// Command Service (Write Model)
export class CommandService {
  private eventStore: EventStore;

  constructor(eventStore: EventStore) {
    this.eventStore = eventStore;
  }

  async createUser(userData: Promise<string> {
    const userId = user-data.id || `user-${Date.now()}`;
    const event: Event = {
      id: `event-${Date.now()}`,
      type: 'UserCreated',
      data: userData,
      metadata: {
        source: 'command-service',
        timestamp: new Date()
      },
      timestamp: new Date(),
      aggregateId: userId,
      sequence: 1,
      version: 1
    };

    await this.eventStore.storeEvent(event);
    return userId;
  }

  async updateUser(userId: string, updates: Record<string, any>): Promise<void> {
    const currentState = await this.eventStore.getAggregateState(userId);
    const event: Event = {
      id: `event-${Date.now()}`,
      type: 'UserUpdated',
      data: updates,
      metadata: {
        source: 'command-service',
        timestamp: new Date()
      },
      timestamp: new Date(),
      aggregateId: userId,
      sequence: (currentState?.version || 0) + 1,
      version: (currentState?.version || 0) + 1
    };

    await this.eventStore.storeEvent(event);
  }

  async deleteUser(userId: string): Promise<void> {
    const event: Event = {
      id: `event-${Date.now()}`,
      type: 'UserDeleted',
      data: { deleted: true, deletedAt: new Date() },
      metadata: {
        source: 'command-service',
        timestamp: new Date()
      },
      timestamp: new Date(),
      aggregateId: userId,
      sequence: 1,
      version: 1
    };

    await this.eventStore.storeEvent(event);
  }
}

// 4. Event Bus Implementation
// event-bus.ts

import { EventEmitter } from 'events';

interface EventBusConfig {
  retryAttempts: number;
  retryDelay: number;
  deadLetterExchange: string;
  deadLetterQueue: string;
}

export class EventBus extends EventEmitter {
  private channel: Channel;
  private config: EventBusConfig;

  constructor(channel: Channel, config?: Partial<EventBusConfig>) {
    super();
    this.channel = channel;
    this.config = {
      retryAttempts: 3,
      retryDelay: 1000,
      deadLetterExchange: 'exchange.dlx',
      deadLetterQueue: 'queue.dlq',
      ...config
    };
  }

  async publishEvent(event: Event, routingKey?: string, options?: any): Promise<void> {
      const eventName = event.type;
      let routingKey = routingKey || eventName;

      try {
        const publishPromise = new Promise<void>((resolve, reject) => {
          const options = {
            expiration: options?.expiration || '24h',
            persistent: options?.persistent || false,
            timestamp: options?.timestamp || false,
            content_type: 'application/json',
            headers: options?.headers || {}
          };

          const success = this.channel.publish(
            event.type,
            routingKey,
            Buffer.from(JSON.stringify(event)),
            options,
            (err, ok) => {
              if (err) reject(err);
              else resolve(ok);
            }
          );

          // Handle success callback
          success.then(() => {
            console.log(`Event ${eventName} published to ${routingKey}`);
            resolve();
          });
        });

        await publishPromise;

      } catch (error) {
          console.error(`Failed to publish event ${eventName}: ${error}`);

          // Send to dead letter queue if configured
          if (this.config.deadLetterExchange) {
            this.publishToDeadLetterQueue(event, error);
          }

          throw error;
        }

      } catch (error) {
        console.error(`Critical error in event bus: ${error}`);
        throw error;
      }
  }

  private async publishToDeadLetterEvent(event: Event, originalError: Error): Promise<void> {
      const dlxEvent = {
        id: `dlx-event-${Date.now()}`,
        type: event.type,
        routing_key: 'dead-letter',
        data: {
          originalEvent: event,
          error: {
            message: originalError.message,
            stack: originalError.stack
          }
        },
        timestamp: new Date()
      };

      const dlxPromise = new Promise<void>((resolve, reject) => {
        const success = this.channel.publish(
          this.config.deadLetterExchange,
          'dead-letter',
          Buffer.from(JSON.stringify(dlxEvent))
        );
      });

      await dlxPromise;
  }

  async subscribe(pattern: string, handler: (event: Event) => Promise<void>, queueName?: string): Promise<void> {
      const queueOptions = {
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: [pattern],
        headers: {
          'x-dead-letter-exchange': this.config.deadLetterExchange
        }
      };

      const queueNameSuffix = queueName || pattern;

      const q = await this.channel.assertQueue(queueNameSuffix, queueOptions);

      await q.consume(async (msg) => {
        const event: Event = JSON.parse(msg.content.toString());
        await handler(event);
        q.ack();
      });

      console.log(`Subscribed to pattern ${pattern} on queue ${queueNameSuffix}`);
  }

  async createEvent(eventData: Record<string, any>, eventType: string): Promise<Event> {
    const event: Event = {
      id: `event-${Date.now()}`,
      type: eventType,
      data: eventData,
      metadata: {
        source: 'application',
        timestamp: new Date()
      },
      timestamp: new Date()
    };

    return event;
  }
}

// 5. Circuit Breaker for External Dependencies
// circuit-breaker.ts
interface CircuitBreakerOptions {
  timeout: number;
  failureThreshold: number;
  successThreshold: number;
  resetTimeout: number;
  retryDelay: number;
}

enum CircuitBreakerState {
  CLOSED = 'CLOSED',
  OPEN = 'OPEN',
  HALF_OPEN = 'HALF_OPEN'
}

export class CircuitBreaker {
  private state: CircuitBreakerState;
  private failureCount: number = 0;
  private successCount: number = 0;
  private lastFailureTime: Date | null = null;
  private nextAttempt: Date | null = null;
  private options: CircuitBreakerOptions;

  constructor(options: Partial<CircuitBreakerOptions>) {
    this.options = {
      timeout: 30000,
      failureThreshold: 5,
      successThreshold: 2,
      resetTimeout: 60000,
      retryDelay: 1000,
      ...options
    };
    this.state = CircuitBreakerState.CLOSED;
  }

  async execute<T>(operation: () => Promise<T>): Promise<T> {
      if (this.state === CircuitBreakerOPEN) {
        throw new Error('Circuit breaker is open');
      }

      const now = Date.now();

      // Reset if reset timeout has passed
      if (this.options.resetTimeout > 0 &&
          this.lastFailureTime &&
          (now.getTime() - this.lastFailureTime.getTime()) > this.options.resetTimeout) {
        this.reset();
      }

      if (this.state === CircuitBreaker.HALF_OPEN) {
        // In half-open state, allow a single attempt
        try {
          const result = await this.executeWithTimeout(operation);
          this.recordSuccess();
          return result;
        } catch (error) {
          this.recordFailure();
          this.state = CircuitBreaker.OPEN;
          throw error;
        }
      }

      try {
        const result = await this.executeWithTimeout(operation);
        this.recordSuccess();
        return result;
      } catch (error) {
        this.recordFailure();
        throw error;
      }
  }

  private async executeWithTimeout<T>(operation: () => Promise<T>): Promise<T> {
      return Promise.race([
        operation(),
        new Promise((_, reject) => {
          setTimeout(() => reject(new Error('Operation timeout')), this.options.timeout);
        })
      ]);
  }

  private recordSuccess(): void {
    this.successCount++;
    if (this.successCount >= this.options.successThreshold) {
      this.state = CircuitBreakerState.CLOSED;
    } else if (this.state === CircuitBreaker.HALF_OPEN) {
      this.state = CircuitBreaker.CLOSED;
    }
    this.failureCount = 0;
  }

  private recordFailure(): void {
    this.failureCount++;
    this.lastFailureTime = new Date();

    if (this.failureCount >= this.options.failureThreshold) {
      this.state = CircuitBreaker.OPEN;
    } else if (this.state === CircuitBreaker.CLOSED &&
               this.failureCount >= Math.floor(this.options.failureThreshold / 2)) {
      this.state = CircuitBreaker.HALF_OPEN;
    }

    this.successCount = 0;
  }

  private reset(): void {
    this.state = CircuitBreaker.CLOSED;
    this.failureCount = 0;
    this.successCount = 0;
    this.lastFailureTime = null;
    this.nextAttempt = null;
  }

  getState(): CircuitBreakerState {
    return this.state;
  }

  getStatus(): string {
    return this.state;
  }
}

// 6. Dead Letter Queue Handler
// dead-letter-queue-handler.ts

interface DeadLetterEvent {
  event: Event;
  error: Error;
  retryCount: number;
  lastAttempt: Date;
  originalExchange: string;
  originalRoutingKey: string;
}

export class DeadLetterQueueHandler {
  private channel: Channel;
  private retryAttempts: Map<string, number> = new Map();

  constructor(channel: Channel) {
    this.channel = channel;
    this.initializeDeadLetterQueue();
  }

  private async initializeDeadLetterQueue(): Promise<void> {
    await this.channel.assertExchange('exchange.dlx', { durable: true });
    await this.channel.assertQueue('queue.dlq', { durable: true });
    await this.channel.bindQueue('queue.dlq', 'exchange.dlx');

    console.log('Dead letter queue initialized');
  }

  async handleDeadLetter(event: Event, error: Error, retryCount: number): Promise<void> {
    const dlxEvent: DeadLetterEvent = {
      event,
      error,
      retryCount,
      lastAttempt: new Date(),
      originalExchange: 'exchange.main',
      originalRoutingKey: event.type
    };

    console.error(`Dead letter event for ${event.type}: ${error.message}`);

    // Check if we should retry
    const eventKey = `${event.type}:${event.aggregateId}`;
    const currentRetries = this.retryAttempts.get(eventKey) || 0;

    if (currentRetries < 3) {
      console.log(`Retrying dead letter event for ${event.type} (attempt ${currentRetries + 1}/3)`);

      this.retryAttempts.set(eventKey, currentRetries + 1);

      // Add retry delay with exponential backoff
      const retryDelay = Math.pow(2, currentRetries) * 1000;
      await new Promise(resolve => setTimeout(resolve, retryDelay));

      // Re-queue the event
      await this.requeueEvent(event);
    } else {
      // Max retries exceeded, log and persist
      console.error(`Max retries exceeded for event ${event.type}, logging to persistent storage`);
      await this.persistFailedEvent(dlxEvent);
    }
  }

  private async requeueEvent(event: Event): Promise<void> {
    // Find the original exchange and routing key
    const originalExchange = 'exchange.main';
    const routingKey = event.type;

    await this.channel.publish(
      originalExchange,
      routingKey,
      Buffer.from(JSON.stringify(event))
    );
  }

  private async persistFailedEvent(dlxEvent: DeadLetterEvent): Promise<void> {
    // In production, persist to database or log system
    console.error(`\n[PERSISTENT DEAD LETTER EVENT]\nEvent: ${dlxEvent.event.type}\nError: ${dlxEvent.error.message}\nRetry Count: ${dlxEvent.retryCount}\n`);
  }
}

// 7. Usage Examples
// main.ts - Application entry point

import { Connection } from 'amqplib/connection';

async function main() {
  try {
    // Establish RabbitMQ connection
    const connection = await connect({
      host: 'localhost',
      port: 5672,
      username: 'admin',
      password: process.env.RABBITMQ_PASSWORD,
    });

    console.log('Connected to RabbitMQ');

    // Initialize services
    const sagaManager = new SagaManager(connection);
    await sagaManager.initialize();

    const eventStore = new EventStore(connection);
    await eventStore.initialize();

    const eventBus = new EventBus(eventStore.channel);

    const queryService = new QueryService(eventStore);
    const commandService = new CommandService(eventStore);

    // Create sample saga
    const orderSaga = {
      id: 'order-saga-1',
      name: 'Order Processing Saga',
      timeout: 30000,
      steps: [
        {
          id: 'step-1',
          name: 'Validate Order Data',
          execute: async () => {
            // Validate order data
            return { valid: true, orderId: 'order-123' };
          },
          compensate: async () => {
            console.log('Compensating order creation');
          }
        },
        {
          'id': 'step-2',
          name: 'Process Payment',
          execute: async () => {
            // Process payment
            return { success: true, paymentId: 'pay-456' };
          },
          compensate: async () => {
            console.log('Refunding payment');
          }
        },
        {
          id: 'step-3',
          name: 'Create Order',
          execute: async () => {
            // Create order
            return { orderId: 'order-123', status: 'created' };
          },
          compensate: async () => {
            console.log('Cancel order');
          }
        },
        {
          id: 'step-4',
          name: 'Send Confirmation',
          execute: async () => {
            // Send confirmation
            console.log('Order confirmed');
          }
        }
      ]
    };

    sagaManager.createSaga(orderSaga);

    // Handle incoming events
    eventBus.subscribe('order.*', async (event) => {
      console.log('Received order event:', event.type, event.data);
      // Handle business logic here
    });

    // Execute sample saga
    const orderData = {
      id: 'order-123',
      customer: 'John Doe',
      items: [
        { id: 'item-1', name: 'Product 1', price: 100 },
        { id: 'message-id', name: 'Message ID: message-id-123', price: 50 }
      ],
      total: 150
    };

    await sagaManager.executeSaga(orderSaga, orderData);

  } catch (error) {
    console.error('Application startup failed:', error);
    process.exit(1);
  }
}

// 8. Environment Configuration for Different Environments
// config/development.ts
export const developmentConfig = {
  connection: {
    host: 'localhost',
    port: 5672,
    username: 'admin',
    password: 'dev_password',
    frameMax: 4096,
    heartbeat: 30,
    timeout: 10000
  },
  eventBus: {
    retryAttempts: 3,
    retryDelay: 1000,
    deadLetterExchange: 'exchange.dlx',
    deadLetterQueue: 'queue.dlq'
  },
  sagas: {
    timeout: 30000,
    maxConcurrent: 10
  }
};

// config/production.ts
export const productionConfig = {
  connection: {
    urls: [
      'amqps://rabbitmq-main:5672',
      'amqps://rabbitmq-backup:5673'
    ],
    credentials: {
      username: 'prod-user',
      password: process.env.RABBITMQ_PASSWORD,
      heartbeat: 20,
      timeout: 5000
    }
  },
  eventBus: {
    retryAttempts: 5,
    retryDelay: 5000,
    deadLetterExchange: 'exchange.dlx.production',
    deadLetterQueue: 'queue.dlq.production'
  },
  sagas: {
    timeout: 60000,
    maxConcurrent: 50
  },
  monitoring: {
    metricsEnabled: true,
    tracingEnabled: true,
    loggingLevel: 'warn'
  }
};

// config/test.ts
export const testConfig = {
  connection: {
    host: 'localhost',
    port: 5672,
    username: 'test',
    password: 'test_password',
    frameMax: 1024
  },
  eventBus: {
    retryAttempts: 1,
    retryDelay: 500
  }
};