RabbitMQ Advanced Samples
Advanced RabbitMQ configuration examples including clustering, high availability, performance tuning, and enterprise patterns
Key Facts
- Category
- Message Queue
- Items
- 2
- Format Families
- sample
Sample Overview
Advanced RabbitMQ configuration examples including clustering, high availability, performance tuning, and enterprise patterns This sample set belongs to Message Queue and can be used to test related workflows inside Elysia Tools.
⚙️ RabbitMQ Cluster Configuration
🔴 complex
⭐⭐⭐⭐
Complete RabbitMQ cluster setup with high availability, failover, and monitoring for enterprise environments
⏱️ 60 min
🏷️ rabbitmq, message-queue, enterprise, clustering
Prerequisites:
Docker, PostgreSQL, Container orchestration, Messaging concepts
# RabbitMQ Advanced Cluster Configuration
# Enterprise-grade setup with clustering and high availability
# 1. Docker Compose for Cluster Setup
# docker-compose.yml
version: '3.8'
services:
rabbitmq1:
image: rabbitmq:3.12-management
hostname: rabbitmq1
container_name: rabbitmq1
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
RABBITMQ_USE_LONGNAME: "true"
RABBITMQ_NODENAME: rabbit@rabbitmq1
RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq2,rabbit@rabbitmq3
RABBITMQ_RAM_NODE: "false"
RABBITMQ_DISK_NODE: "true"
RABBITMQ_DISK_FREE_LIMIT: "1.0"
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
RABBITMQ_VM_MEMORY_LIMIT: "2000"
RABBITMQ_FLOW_ACTIVE_TIMEOUT: "60000000"
RABBITMQ_FLOW_PASSIVE_TIMEOUT: "90000000"
RABBITMQ_HEARTBEAT_TIMEOUT: "60"
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbit disk_free_limit.abs.disk_limit 1024MB -rabbit vm_memory_high_watermark 1000MB"
ports:
- "5672:5672" # AMQP port
- "15672:15672" # Management UI
- "25672:25672" # STOMP
- "61613:61613" # TLS AMQP
volumes:
- rabbitmq1_data:/var/lib/rabbitmq
- ./rabbitmq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./rabbitmq1/definitions.json:/etc/rabbitmq/definitions.json
networks:
- rabbitmq-cluster
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
restart: unless-stopped
rabbitmq2:
image: rabbitmq:3.12-management
hostname: rabbitmq2
container_name: rabbitmq2
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
RABBITMQ_USE_LONGNAME: "true"
RABBITMQ_NODENAME: rabbit@rabbitmq2
RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq1,rabbit@rabbitmq3
RABBITMQ_RAM_NODE: "false"
RABBITMQ_DISK_NODE: "true"
RABBITMQ_DISK_FREE_LIMIT: "1.0"
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
RABBITMQ_VM_MEMORY_LIMIT: "2000"
ports:
- "5673:5672"
- "15673:15672"
- "25673:25672"
- "61614:613"
volumes:
- rabbitmq2_data:/var/lib/rabbitmq
- ./rabbitmq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./rabbitmq2/definitions.json:/etc/rabbitmq/definitions.json
networks:
- rabbitmq-cluster
depends_on:
- rabbitmq1
restart: unless-stopped
rabbitmq3:
image: rabbitmq:3.12-management
hostname: rabbitmq3
container_name: rabbitmq3
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
RABBITMQ_USE_LONGNAME: "true"
RABBITMQ_NODENAME: rabbit@rabbitmq3
RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq1,rabbit@rabbitmq2
RABBITMQ_RAM_NODE: "false"
RABBITMQ_DISK_NODE: "true"
RABBITMQ_DISK_FREE_LIMIT: "1.0"
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
RABBITMQ_VM_MEMORY_LIMIT: "2000"
ports:
- "5674:5672"
- "15674:15672"
- "25674:25672"
- "61615:613"
volumes:
- rabbitmq3_data:/var/lib/rabbitmq
- ./rabbitmq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./rabbitmq3/definitions.json:/etc/rabbitmq/definitions.json
networks:
- rabbitmq-cluster
depends_on:
- rabbitmq1
restart: unless-stopped
# HAProxy for Load Balancing
haproxy:
image: haproxy:2.8
container_name: haproxy
ports:
- "5670:5670" # Main AMQP port
- "15670:15672" # Management UI
volumes:
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
depends_on:
- rabbitmq1
- rabbitmq2
- rabbitmq3
networks:
- rabbitmq-cluster
restart: unless-stopped
# Prometheus for Monitoring
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- rabbitmq-cluster
restart: unless-stopped
# Grafana for Visualization
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
networks:
- rabbitmq-cluster
depends_on:
- prometheus
restart: unless-stopped
# RabbitMQ Exporter
rabbitmq-exporter:
image: kbudry/rabbitmq-exporter:latest
container_name: rabbitmq-exporter
ports:
- "9419:9419"
environment:
RABBIT_URL: http://haproxy:15672
PUBLISH_PORT: 9419
RABBIT_USER: admin
RABBIT_PASS: ${RABBITMQ_PASSWORD}
EXCLUDE_QUEUES: "^amq\.default"
networks:
- rabbitmq-cluster
depends_on:
- haproxy
restart: unless-stopped
volumes:
rabbitmq1_data:
rabbitmq2_data:
rabbitmq3_data:
grafana_data:
networks:
rabbitmq-cluster:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/16
# 2. RabbitMQ Configuration for Cluster
# rabbitmq1/rabbitmq.conf
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq2
cluster_cluster_name = rabbitmq-cluster
cluster_formation.node_cleanup_interval = 30
cluster_formation.node_cleanup_only = false
cluster_formation.peer_discovery_interval = 5
# Disk and Memory Configuration
disk_free_limit.absolute = 2GB
disk_free_limit.percentage = 90
vm_memory_high_watermark.relative = 80
vm_memory_limit.absolute = 2GB
# Performance Tuning
heartbeat_timeout = 60
consumer_timeout = 3600000
consumer_prefetch_count = 10
message_ttl = 86400000
# Security
loopback_users.guest = false
listeners.tcp.default = 5672
listeners.ssl.default = 5671
# Management Plugin
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
management.tcp.disable = false
management.ssl.port = 15671
management.ssl.ip = 0.0.0.0
management.ssl.disable = false
# 3. HAProxy Configuration for Load Balancing
# haproxy.cfg
global
daemon
maxconn 4096
log /dev/log local0
log /dev/log local1 notice
defaults
mode tcp
timeout connect 5000
timeout client 50000
timeout server 50000
frontend ft_rabbitmq_main
bind *:5670
default_backend backend_rabbitmq_servers
maxconn 4096
timeout client 50000
timeout server 50000
option tcplog
option logasap
frontend ft_rabbitmq_management
bind *:15670
default_backend backend_rabbitmq_management
timeout client 50000
timeout server 50000
option tcplog
option logasap
backend backend_rabbitmq_servers
balance roundrobin
mode tcp
option tcp-check
server rabbitmq1 5672 check inter 5s rise 2 fall 3
server rabbitmq2 5673 check inter 5s rise 2 fall 3
server rabbitmq3 5674 check inter 5s rise 2 fall 3
maxconn 2000
timeout server 50000
backend backend_rabbitmq_management
balance roundrobin
mode http
option httpchk GET /api/healthcheck
option httplog
option logasap
server rabbitmq1 15672 check inter 5s rise 2 fall 3
maxconn 1000
timeout server 50000
# 4. Prometheus Configuration for RabbitMQ
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets:
- 'rabbitmq-exporter:9419'
metrics_path: /metrics
scrape_interval: 5s
scrape_timeout: 10s
# 5. RabbitMQ Definitions and Policies
# rabbitmq1/definitions.json
{
"users": [
{
"name": "admin",
"password": "${RABBITMQ_PASSWORD}",
"tags": ["administrator"],
"hashing_algorithm": "SHA-256",
"permissions": {
"configure": ".*",
"write": ".*",
"read": ".*"
}
},
{
"name": "service_user",
"password": "service_password",
"tags": ["service"],
"hashing_algorithm": "SHA-256",
"permissions": {
"configure": ".*",
"write": "service.*",
"read": "service.*"
}
},
{
"name": "monitoring_user",
"password": "monitoring_password",
"tags": ["monitoring"],
"hashing_algorithm": "backup",
"permissions": {
"configure": "metrics.*",
"read": "metrics.*"
}
}
],
"vhosts": [
{
"name": "/",
"description": "Default virtual host"
},
{
"name": "production",
"description": "Production environment"
},
{
"name": "staging",
"description": "Staging environment"
},
{
"name": "development",
"description": "Development environment"
}
],
"permissions": [
{
"user": "service_user",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "monitoring_user",
"vhost": "/",
"configure": "metrics.*",
"read": ".*"
}
],
"queues": [
{
"name": "user_events_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {
"x-max-length": 10000,
"x-overflow": "drop-head"
}
},
{
"name": "order_processing_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {
"x-max-priority": 10,
"x-queue-mode": "lazy"
}
},
{
"name": "notifications_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {
"x-message-ttl": 86400000,
"x-dead-letter-exchange": "notifications-dlx"
}
},
{
"name": "priority_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {
"x-max-priority": 10,
"x-queue-mode": "priority",
"x-queue-mode-legacy": "true"
}
}
],
"exchanges": [
{
"name": "user_events_exchange",
"vhost": "/",
"type": "topic",
"durable": true,
"auto_delete": false
},
{
"name": "orders_exchange",
"vhost": "/",
"type": "direct",
"durable": true,
"auto_delete": false
},
{
"name": "notifications_exchange",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false
}
],
"bindings": [
{
"source": "user_events_exchange",
"destination": "user_events_queue",
"destination_type": "queue",
"routing_key": "user.*.events",
"arguments": {}
},
{
"source": "orders_exchange",
"destination": "order_processing_queue",
"destination_type": "queue",
"routing_key": "order.created",
"arguments": {}
},
{
"source": "notifications_exchange",
"destination": "notifications_queue",
"destination_type": "queue",
"routing_key": "*",
"arguments": {}
}
]
}
# 6. High Availability Scripts
# setup-cluster.sh
#!/bin/bash
set -e
echo "Setting up RabbitMQ cluster..."
# Wait for RabbitMQ containers to be ready
echo "Waiting for RabbitMQ containers to start..."
until docker exec rabbitmq1 rabbitmq-diagnostics -q ping; do
echo "Waiting for rabbitmq1..."
sleep 5
done
until docker exec rabbitmq2 rabbitmq-diagnostics -q ping; do
echo "Waiting for rabbitmq2..."
sleep 5
done
until docker exec rabbitmq3 rabbitmq-diagnostics -s ping; do
echo "Waiting for rabbitmq3..."
sleep 5
done
echo "All RabbitMQ nodes are ready!"
# Verify cluster status
echo "Verifying cluster status..."
docker exec rabbitmq1 rabbitmqctl cluster_status
# Create federation upstream for external connectivity
echo "Setting up federation upstream..."
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/parameters/federation-upstreams \
-X PUT -H "content-type: application/json" -d '{
"value": {
"uri": "amqps://external-rabbitmq:5672",
"expires": 3600000,
"reconnect-delay": 5000
}
}'
# Create policies
echo "Creating policies..."
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/policies/hourly-rate-limit \
-X PUT -H "content-type:application/json" -d '{
"pattern": "^amq\.default",
"definition": {
"type": "classic",
"max-count": 10000,
"priority": 100
}
}'
curl -u admin:${RABMQ_PASSWORD} http://localhost:15672/api/policies/queue-size-limit \
-X PUT -H "content-type:application/json" -d '{
"pattern": "^amq\.default",
"definition": {
"max-length": 5000,
"priority": 50
}
}'
# Enable federation
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/parameters/federation-upstreams/external-rabbitmq/set-policy -X POST -H "content-type: application/json" -d '{
"value": {
"apply-to": "all"
}
}'
echo "RabbitMQ cluster setup completed!"
# 7. Performance Monitoring Script
# monitor-cluster.sh
#!/bin/bash
echo "Monitoring RabbitMQ cluster..."
# Get cluster status
echo "=== Cluster Status ==="
curl -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/cluster_status | jq '.'
# Get node details
echo -e "
=== Node Details ==="
for i in {1..3}; do
echo -e "
=== RabbitMQ$i ==="
curl -s -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/nodes/rabbit@rabbitmq$i | jq '.memory.used_bytes, memory.limit_bytes, fd_used, fd_total, running_processes'
done
# Get queue statistics
echo -e "
=== Queue Statistics ==="
curl -s -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/queues | jq '.[] | {name, messages, memory, consumers}'
# Get connection statistics
echo -e "
=== Connection Statistics ==="
curl -s -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/connections | jq '.[] | {name, state, channels, peer_addr, client_properties}'
# Get channel statistics
echo -e "
=== Channel Statistics ==="
curl -s -u admin:${RABBITMQ} http://localhost:15672/api/channels | jq '.[] | {name, state, connection_name, messages, consumer_count}' | head -20
# Get exchange statistics
echo -e "
=== Exchange Statistics ==="
curl -s -u admin:${RABBITMQ} http://localhost:15672/api/exchanges | jq '.[] | {name, type, messages, message_stats_in, message_stats_out}' | head -10
# Health check function
check_health() {
local port=$1
local host=${2:-localhost}
local path=${3:-/api/healthcheck}
echo "Performing health check on $host:$port..."
local response=$(curl -s -o /dev/null -w "%{http_code}" "http://$host:$port$path")
if [ "$response" = "200" ]; then
echo "✓ Health check passed"
return 0
else
echo "✗ Health check failed with status $response"
return 1
fi
}
# Health checks
check_health 5672 rabbitmq1
check_health 5672 rabbitmq2
check_health 5672 rabbitmq3
check_health 15672 haproxy
echo "Health checks completed!"
# 8. Environment Variables
# .env
RABBITMQ_PASSWORD=secure_password_here
GRAFANA_PASSWORD=admin_password_here
# Node-specific configurations
RABBITMQ1_NAME=rabbit1
RABBITMQ2_NAME=rabbit2
RABBITMQ3_NAME=rabbit3
# Network configuration
RABBITMQ_CLUSTER_COOKIE=very_secure_cluster_cookie
RABBITMQ_DEFAULT_USER=admin
RABBITMQ_DEFAULT_VHOST=/
RABBITMQ_NODENAME=rabbit@$(hostname)
# Performance tuning
RABBITMQ_VM_MEMORY_HIGH_WATERMARK=1000
RABBITMQ_VM_MEMORY_LIMIT=2000
RABBITMQ_DISK_FREE_LIMIT=1GB
RABBITMQ_FLOW_ACTIVE_TIMEOUT=60000000
RABBITMQ_FLOW_PASSIVE_TIMEOUT=90000000
# Monitoring and Logging
RABBITMQ_LOG_LEVEL=INFO
RABBITMQ_PROMETHEUS_ENABLED=true
RABBITMQ_PROMETHEUS_CONFIG_FILE=/etc/rabbitmq/prometheus.json
RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_prometheus,rabbitmq_shovel
# Security
RABBITMQ_SSL_ENABLED=false
RABBITMQ_SSL_VERIFY=verify_peer
RABBITMQ_SSL_FAIL_IF_NO_PEER_CERT=true
RABBITMQ_SSL_CERT_FILE=/etc/ssl/rabbitmq.crt
RABBITMQ_KEY_FILE=/etc/ssl/rabbitmq.key
RABBITMQ_CA_FILE=/etc/ssl/ca_bundle.crt
# 9. Application Connection Examples
# Node.js Connection with Connection Pool
const amqp = require('amqplib');
const cluster = require('amqplib/lib/cluster');
// Connection pool for high-traffic applications
const connectionPool = {
hostname: ['localhost:5672', 'localhost:5673', 'localhost:5674'],
port: 5672,
username: 'admin',
channelMax: 10,
heartbeat: 30,
connection_timeout: 10000,
retry: 3
};
async function createRabbitConnection() {
try {
const connection = await amqp.connect({
hostname: connectionPool.hostname[0],
port: connectionPool.port,
username: connectionPool.username,
password: process.env.RABBITMQ_PASSWORD,
heartbeat: connectionPool.heartbeat,
connection_timeout: connectionPool.connection_timeout
});
console.log('RabbitMQ connection established');
return connection;
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
throw error;
}
}
// Cluster connection with failover
async function createClusterConnection() {
return new Promise((resolve, reject) => {
const clusterConfig = {
addresses: [
'amqp://localhost:5672',
'amqp://localhost:5673',
'amqp://localhost:5674'
],
username: 'admin',
password: process.env.RABBITMQ_PASSWORD,
timeout: 10000,
heartbeat: 10,
reconnect: true,
reconnectBackoffStrategy: 'linear'
};
cluster.connect(clusterConfig, (err, connection) => {
if (err) {
console.error('Failed to connect to RabbitMQ cluster:', err);
reject(err);
} else {
console.log('RabbitMQ cluster connection established');
resolve(connection);
}
});
});
}
// Python Connection with Pika
import pika
from pika.connection import ConnectionParameters
from pika.exceptions import AMQPConnectionError
# High availability connection
def create_connection():
credentials = pika.PlainCredentials(
'admin',
os.environ['RABBITMQ_PASSWORD']
)
parameters = ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=30,
connection_attempts=3,
retry_delay=5
)
try:
connection = pika.BlockingConnection(parameters)
print("RabbitMQ connection established")
return connection
except AMQPConnectionError as e:
print(f"Failed to connect to RabbitMQ: {e}")
raise
# Connection with retry and circuit breaker
class RabbitMQConnectionManager:
def __init__(self, max_retries=3, retry_delay=5):
self.max_retries = max_retries
self.retry_delay = retry_delay
self.connection = None
self.circuit_breaker_open = False
self.circuit_breaker_timeout = 30000 # 30 seconds
self.circuit_breaker_reset_timeout = 60000 # 1 minute
def get_connection(self):
if self.circuit_breaker_open:
print("Circuit breaker is open, refusing connection attempts")
return None
if self.connection and self.connection.is_open:
return self.connection
return self._connect_with_retry()
def _connect_with_retry(self):
last_exception = None
for attempt in range(self.max_retries):
try:
connection = create_connection()
self.connection = connection
self.circuit_breaker_open = False
return connection
except Exception as e:
last_exception = e
if attempt < self.max_retries - 1:
print(f"Connection attempt {attempt + 1} failed, retrying in {self.retry_delay}s...")
time.sleep(self.retry_delay)
continue
else:
self.circuit_breaker_open = True
raise last_exception
# Set circuit breaker to close after timeout
threading.Timer(self.circuit_breaker_reset_timeout, self._reset_circuit_breaker).start()
raise last_exception
def _reset_circuit_breaker(self):
print("Resetting circuit breaker")
self.circuit_breaker_open = False
# 10. Production Deployment Configuration
# Kubernetes deployment
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq-cluster
namespace: production
labels:
app: rabbitmq
spec:
serviceName: rabbitmq
replicas: 3
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
terminationGracePeriodSeconds: 30
containers:
- name: rabbitmq
image: rabbitmq:3.12-management
ports:
- containerPort: 5672
name: amqp
- containerPort: 15672
name: management
- containerPort: 25672
name: stomp
env:
- name: RABBITMQ_DEFAULT_USER
value: "admin"
- name: RABBITMQ_DEFAULT_PASS
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: password
- name: RABBITMQ_ERLANG_COOKIE
value: "rabbitmq-cluster-cookie"
- name: RABBITMQ_USE_LONGNAME
value: "true"
- name:1:RABBITMQ_NODENAME
value: "rabbit@$(POD_NAME)-$(POD_HOSTNAME)"
- name: RABBITMQ_CLUSTER_NODES
value: "rabbit@$( (($(POD_NAME)-$(POD_HOSTNAME) | tr -d ' '))\,"
- name: RABBITMQ_DISK_NODE
value: "true"
- name: RABBITMQ_RAM_NODE
value: "false"
- name: RABBITMQ_DISK_FREE_LIMIT
value: "1.0"
- name: RABBITMQ_VM_MEMORY_HIGH_WATERMARK
value: "1000"
- name: RABBITMQ_VM_MEMORY_LIMIT
value: "2000"
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
- name: rabbitmq-config
mountPath: /etc/rabbitmq
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
exec:
command:
- rabbitmq-diagnostics
- -q
- ping
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
readinessProbe:
exec:
command:
- rabbitmq-diagnostics
- -q
- ping
initialDelaySeconds: 20
periodSeconds: 10
timeoutSeconds: 5
volumes:
- name: rabbitmq-data
persistentVolumeClaim:
claimName: rabbitmq-data
- name: rabbitmq-config
configMap:
name: rabbitmq-config
- name: rabbitmq-definitions
configMap:
name: rabbitmq-definitions
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: rabbitmq-data
namespace: production
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: fast-ssd
---
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-secret
namespace: production
type: Opaque
data:
password: <base64-encoded-password>
---
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-config
namespace: production
data:
rabbitmq.conf: |
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = [email protected]
cluster_formation.classic_config.nodes.2 = [email protected]
cluster_formation.classic_config.nodes.3 = [email protected]
cluster_formation.classic_config.nodes.4 = [email protected]
disk_free_limit.absolute = 2GB
vm_memory_high_watermark = 1000MB
vm_memory_limit = 2GB
heartbeat_timeout = 60
connection_timeout = 10
consumer_timeout = 3600000
consumer_prefetch_count = 10
message_ttl = 86400000
definitions.json: |
{
"users": [
{
"name": "admin",
"password": "${RABBITMQ_PASSWORD}",
"tags": ["administrator"],
"permissions": {
"configure": ".*",
"write": ".*",
"read": ".*"
}
}
],
"vhosts": [
{
"name": "/",
"description": "Production virtual host"
}
],
"queues": [
{
"name": "high-priority-queue",
"durable": true,
"arguments": {
"x-max-priority": 10
}
}
]
}
💻 RabbitMQ Microservices Patterns typescript
🔴 complex
⭐⭐⭐⭐
RabbitMQ patterns for microservices architecture including saga pattern, event sourcing, and CQRS
⏱️ 55 min
🏷️ rabbitmq, microservices, patterns, architecture
Prerequisites:
Node.js, TypeScript, Message Queue, Event-Driven Architecture
// RabbitMQ Microservices Architecture Patterns
// Event-driven architecture with RabbitMQ
// 1. Saga Pattern Implementation
// saga-manager.ts
import { Connection, Channel, ConsumeMessage } from 'amqplib/connection';
import { EventEmitter } from 'events';
interface SagaStep {
id: string;
execute: () => Promise<any>;
compensate?: () => Promise<any>;
onFail?: (error: Error) => Promise<void>;
onSuccess?: (result: any) => Promise<void>;
}
interface Saga {
id: string;
name: string;
steps: SagaStep[];
timeout: number;
}
interface SagaState {
id: string;
currentStep: number;
status: 'pending' | 'running' | 'completed' | 'failed' | 'compensating';
data: Record<string, any>;
compensations: Array<() => Promise<void>>;
error?: Error;
startTime: Date;
endTime?: Date;
}
export class SagaManager {
private connection: Connection;
private channel: Channel;
private sagas: Map<string, Saga> = new Map();
private sagaStates: Map<string, SagaState> = new Map();
private eventEmitter = new EventEmitter();
constructor(connection: Connection) {
this.connection = connection;
}
async initialize(): Promise<void> {
this.channel = await this.connection.createChannel();
console.log('Saga Manager initialized');
}
// Create a new saga
createSaga(saga: Saga): void {
this.sagas.set(saga.id, saga);
console.log(`Created saga: ${saga.name} (${saga.id})`);
}
// Execute a saga
async executeSaga(saga: Saga, data: any = {}): Promise<void> {
const sagaState: SagaState = {
id: saga.id,
currentStep: 0,
status: 'pending',
data,
compensations: [],
startTime: new Date()
};
this.sagaStates.set(saga.id, sagaState);
try {
for (let i = 0; i < saga.steps.length; i++) {
const step = saga.steps[i];
sagaState.currentStep = i;
sagaState.status = 'running';
console.log(`Executing saga ${saga.name}, step ${i + 1}/${saga.steps.length}: ${step.constructor.name}`);
// Record compensation if available
if (step.compensate) {
sagaState.compensations.push(step.compensate);
}
// Execute step with timeout
const result = await this.executeStepWithTimeout(step, saga.timeout);
// Store step result
sagaState.data[`step_${i}`] = result;
// Notify progress
this.eventEmitter.emit('stepCompleted', {
sagaId: saga.id,
sagaName: saga.name,
step: i + 1,
totalSteps: saga.steps.length,
result
});
if (step.onSuccess) {
await step.onSuccess(result);
}
}
sagaState.status = 'completed';
sagaState.endTime = new Date();
console.log(`Saga ${saga.name} completed successfully in ${sagaState.endTime.getTime() - sagaState.startTime.getTime()}ms`);
this.eventEmitter.emit('sagaCompleted', sagaState);
} catch (error) {
sagaState.status = 'failed';
sagaState.error = error;
sagaState.endTime = new Date();
console.error(`Saga ${saga.name} failed: ${error.message}`);
// Execute compensations in reverse order
console.log(`Executing compensations for saga ${saga.name}...`);
await this.executeCompensations(sagaState.compensations);
if (step.onFail) {
await step.onFail(error);
}
sagaState.status = 'compensated';
this.eventEmitter.emit('sagaCompensated', sagaState);
throw error;
}
}
private async executeStepWithTimeout(step: SagaStep, timeout: number): Promise<any> {
return Promise.race([
step.execute(),
new Promise((_, reject) => {
setTimeout(() => reject(new Error(`Saga step timeout after ${timeout}ms`)), timeout);
})
]);
}
private async executeCompensations(compensations: Array<() => Promise<void>>): Promise<void> {
for (let i = compensations.length - 1; i >= 0; i--) {
try {
await compensations[i]();
} catch (error) {
console.error(`Compensation failed: ${error.message}`);
// Continue with other compensations even if one fails
}
}
}
// Event listener methods
onSagaCompleted(callback: (state: SagaState) => void): void {
this.eventEmitter.on('sagaCompleted', callback);
}
onSagaCompensated(callback: (state: SetState) => void): void {
this.eventEmitter.on('sagaCompensated', callback);
}
onStepCompleted(callback: (data: any) => void => void {
this.eventEmitter.on('stepCompleted', callback);
}
}
// 2. Event Sourcing Pattern
// event-store.ts
interface Event {
id: string;
type: string;
data: any;
metadata: Record<string, any>;
timestamp: Date;
aggregateId: string;
sequence: number;
version: number;
}
interface AggregateState {
id: string;
type: string;
version: number;
data: Record<string, any>;
lastEvent: Date;
}
export class EventStore {
private connection: Connection;
private channel: Channel;
private events: Map<string, Event[]> = new Map();
private aggregates: Map<string, AggregateState> = new Map();
constructor(connection: Connection) {
this.connection = connection;
}
async initialize(): Promise<void> {
this.channel = await this.connection.createChannel();
await this.channel.assertQueue('events', { durable: true });
await this.channel.assertQueue('events-replay', { durable: true });
await this.channel.assertQueue('events-snapshot', { durable: true });
// Start consuming events
this.startEventConsumption();
console.log('Event Store initialized');
}
async storeEvent(event: Event): Promise<void> {
const eventMessage = {
id: event.id,
type: event.type,
data: event.data,
metadata: event.metadata,
timestamp: event.timestamp,
aggregateId: event.aggregateId,
sequence: event.sequence,
version: event.version
};
await this.channel.sendToQueue('events', Buffer.from(JSON.stringify(eventMessage)));
console.log(`Event stored: ${event.type} for aggregate ${event.aggregateId}`);
}
async getEventsForAggregate(aggregateId: string): Promise<Event[]> {
const events = this.events.get(aggregateId) || [];
if (events.length > 0) {
return events;
}
// Replay events from queue
try {
const q = await this.channel.getQueue('events-replay', { durable: true });
let hasMore = true;
while (hasMore) {
const msg = await q.get({ noAck: false });
if (msg === false) break;
const event = JSON.parse(msg.content.toString());
if (event.aggregateId === aggregateId) {
events.push(event);
events.push(event);
}
hasMore = false;
// In a real implementation, you'd track sequence numbers properly
}
this.events.set(aggregateId, events);
return events;
} catch (error) {
console.error(`Failed to replay events for aggregate ${aggregateId}: ${error}`);
return [];
}
}
async getAggregateState(aggregateId: string): Promise<AggregateState | null> {
const events = await this.getEventsForAggregate(aggregateId);
if (events.length === 0) {
return null;
}
// Apply events in order
let state: AggregateState = {
id: aggregateId,
type: events[0].type.split('.')[1],
version: events[0].version,
data: events[0].data,
lastEvent: events[events.length - 1].timestamp
};
for (let i = 1; i < events.length; i++) {
const event = events[i];
state = this.applyEventToState(state, event);
}
return state;
}
private applyEventToState(state: AggregateState, event: Event): AggregateState {
const [entityType, action] = event.type.split('.');
switch (action) {
case 'Created':
state.data = event.data;
state.version = event.version;
break;
'Updated':
state.data = { ...state.data, ...event.data };
state.version = event.version;
break;
'Deleted':
state.data = {};
state.version = event.version;
break;
'Versioned':
state.data = { ...state.data, ...event.data };
state.version = event.version;
break;
default:
state.data = { ...state.data, ...event.data };
}
state.lastEvent = event.timestamp;
return state;
}
}
private startEventConsumption(): void {
this.channel.consume('events', { noAck: true }, async (msg) => {
const event = JSON.parse(msg.content.toString());
const aggregateId = event.aggregateId;
let events = this.events.get(aggregateId) || [];
events.push(event);
this.events.set(aggregateId, events);
// Keep only last 1000 events per aggregate
if (events.length > 1000) {
events = events.slice(-1000);
this.events.set(aggregateId, events);
}
this.channel.ack(msg);
this.channel.nack(msg);
});
}
// Create snapshot of aggregate state
async createSnapshot(aggregateId: string, state: AggregateState): Promise<void> {
const snapshotMessage = {
aggregateId,
state,
timestamp: new Date(),
version: state.version
};
await this.channel.sendToQueue('events-snapshot', Buffer.from(JSON.stringify(snapshotMessage)));
console.log(`Created snapshot for aggregate ${aggregateId}, version ${state.version}`);
}
// Restore from snapshot
async restoreFromSnapshot(aggregateId: string): Promise<AggregateState | null> {
try {
const q = await this.channel.getQueue('events-snapshot', { durable: true });
const msg = await q.get({ noAck: true });
if (msg === false) {
return null;
}
const snapshot = JSON.parse(msg.content.toString());
return snapshot.state;
} catch (error) {
console.error(`Failed to restore snapshot for aggregate ${aggregateId}: ${error}`);
return null;
}
}
}
// 3. CQRS Pattern Implementation
// query-service.ts and command-service.ts
// Query Service (Read Model)
export class QueryService {
private eventStore: EventStore;
constructor(eventStore: EventStore) {
this.eventStore = eventStore;
}
async getUserById(userId: string): Promise<any> {
const state = await this.eventStore.getAggregateState(userId);
return state?.data;
}
async getUsersByRole(role: string): Promise<any[]> {
const users = [];
// Get all user aggregates
const allUserIds = await this.getAllUserIds();
for (const userId of allUserIds) {
const user = await this.getUserById(userId);
if (user && user.role === role) {
users.push(user);
}
}
return users;
}
async getAllUserIds(): Promise<string[]> {
// In a real implementation, you'd maintain a registry of all aggregate IDs
return ['user-1', 'user-2', 'user-3'];
}
async searchUsers(query: string): Promise<any[]> {
const results = [];
// Simple text search implementation
const allUserIds = await this.getAllUserIds();
for (const userId of allUserIds) {
const user = await this.getUserById(userId);
if (user && (
user.name.toLowerCase().includes(query.toLowerCase()) ||
user.email.toLowerCase().includes(query.toLowerCase())
)) {
results.push(user);
}
}
return results;
}
}
// Command Service (Write Model)
export class CommandService {
private eventStore: EventStore;
constructor(eventStore: EventStore) {
this.eventStore = eventStore;
}
async createUser(userData: Promise<string> {
const userId = user-data.id || `user-${Date.now()}`;
const event: Event = {
id: `event-${Date.now()}`,
type: 'UserCreated',
data: userData,
metadata: {
source: 'command-service',
timestamp: new Date()
},
timestamp: new Date(),
aggregateId: userId,
sequence: 1,
version: 1
};
await this.eventStore.storeEvent(event);
return userId;
}
async updateUser(userId: string, updates: Record<string, any>): Promise<void> {
const currentState = await this.eventStore.getAggregateState(userId);
const event: Event = {
id: `event-${Date.now()}`,
type: 'UserUpdated',
data: updates,
metadata: {
source: 'command-service',
timestamp: new Date()
},
timestamp: new Date(),
aggregateId: userId,
sequence: (currentState?.version || 0) + 1,
version: (currentState?.version || 0) + 1
};
await this.eventStore.storeEvent(event);
}
async deleteUser(userId: string): Promise<void> {
const event: Event = {
id: `event-${Date.now()}`,
type: 'UserDeleted',
data: { deleted: true, deletedAt: new Date() },
metadata: {
source: 'command-service',
timestamp: new Date()
},
timestamp: new Date(),
aggregateId: userId,
sequence: 1,
version: 1
};
await this.eventStore.storeEvent(event);
}
}
// 4. Event Bus Implementation
// event-bus.ts
import { EventEmitter } from 'events';
interface EventBusConfig {
retryAttempts: number;
retryDelay: number;
deadLetterExchange: string;
deadLetterQueue: string;
}
export class EventBus extends EventEmitter {
private channel: Channel;
private config: EventBusConfig;
constructor(channel: Channel, config?: Partial<EventBusConfig>) {
super();
this.channel = channel;
this.config = {
retryAttempts: 3,
retryDelay: 1000,
deadLetterExchange: 'exchange.dlx',
deadLetterQueue: 'queue.dlq',
...config
};
}
async publishEvent(event: Event, routingKey?: string, options?: any): Promise<void> {
const eventName = event.type;
let routingKey = routingKey || eventName;
try {
const publishPromise = new Promise<void>((resolve, reject) => {
const options = {
expiration: options?.expiration || '24h',
persistent: options?.persistent || false,
timestamp: options?.timestamp || false,
content_type: 'application/json',
headers: options?.headers || {}
};
const success = this.channel.publish(
event.type,
routingKey,
Buffer.from(JSON.stringify(event)),
options,
(err, ok) => {
if (err) reject(err);
else resolve(ok);
}
);
// Handle success callback
success.then(() => {
console.log(`Event ${eventName} published to ${routingKey}`);
resolve();
});
});
await publishPromise;
} catch (error) {
console.error(`Failed to publish event ${eventName}: ${error}`);
// Send to dead letter queue if configured
if (this.config.deadLetterExchange) {
this.publishToDeadLetterQueue(event, error);
}
throw error;
}
} catch (error) {
console.error(`Critical error in event bus: ${error}`);
throw error;
}
}
private async publishToDeadLetterEvent(event: Event, originalError: Error): Promise<void> {
const dlxEvent = {
id: `dlx-event-${Date.now()}`,
type: event.type,
routing_key: 'dead-letter',
data: {
originalEvent: event,
error: {
message: originalError.message,
stack: originalError.stack
}
},
timestamp: new Date()
};
const dlxPromise = new Promise<void>((resolve, reject) => {
const success = this.channel.publish(
this.config.deadLetterExchange,
'dead-letter',
Buffer.from(JSON.stringify(dlxEvent))
);
});
await dlxPromise;
}
async subscribe(pattern: string, handler: (event: Event) => Promise<void>, queueName?: string): Promise<void> {
const queueOptions = {
durable: true,
exclusive: false,
autoDelete: false,
arguments: [pattern],
headers: {
'x-dead-letter-exchange': this.config.deadLetterExchange
}
};
const queueNameSuffix = queueName || pattern;
const q = await this.channel.assertQueue(queueNameSuffix, queueOptions);
await q.consume(async (msg) => {
const event: Event = JSON.parse(msg.content.toString());
await handler(event);
q.ack();
});
console.log(`Subscribed to pattern ${pattern} on queue ${queueNameSuffix}`);
}
async createEvent(eventData: Record<string, any>, eventType: string): Promise<Event> {
const event: Event = {
id: `event-${Date.now()}`,
type: eventType,
data: eventData,
metadata: {
source: 'application',
timestamp: new Date()
},
timestamp: new Date()
};
return event;
}
}
// 5. Circuit Breaker for External Dependencies
// circuit-breaker.ts
interface CircuitBreakerOptions {
timeout: number;
failureThreshold: number;
successThreshold: number;
resetTimeout: number;
retryDelay: number;
}
enum CircuitBreakerState {
CLOSED = 'CLOSED',
OPEN = 'OPEN',
HALF_OPEN = 'HALF_OPEN'
}
export class CircuitBreaker {
private state: CircuitBreakerState;
private failureCount: number = 0;
private successCount: number = 0;
private lastFailureTime: Date | null = null;
private nextAttempt: Date | null = null;
private options: CircuitBreakerOptions;
constructor(options: Partial<CircuitBreakerOptions>) {
this.options = {
timeout: 30000,
failureThreshold: 5,
successThreshold: 2,
resetTimeout: 60000,
retryDelay: 1000,
...options
};
this.state = CircuitBreakerState.CLOSED;
}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === CircuitBreakerOPEN) {
throw new Error('Circuit breaker is open');
}
const now = Date.now();
// Reset if reset timeout has passed
if (this.options.resetTimeout > 0 &&
this.lastFailureTime &&
(now.getTime() - this.lastFailureTime.getTime()) > this.options.resetTimeout) {
this.reset();
}
if (this.state === CircuitBreaker.HALF_OPEN) {
// In half-open state, allow a single attempt
try {
const result = await this.executeWithTimeout(operation);
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
this.state = CircuitBreaker.OPEN;
throw error;
}
}
try {
const result = await this.executeWithTimeout(operation);
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
private async executeWithTimeout<T>(operation: () => Promise<T>): Promise<T> {
return Promise.race([
operation(),
new Promise((_, reject) => {
setTimeout(() => reject(new Error('Operation timeout')), this.options.timeout);
})
]);
}
private recordSuccess(): void {
this.successCount++;
if (this.successCount >= this.options.successThreshold) {
this.state = CircuitBreakerState.CLOSED;
} else if (this.state === CircuitBreaker.HALF_OPEN) {
this.state = CircuitBreaker.CLOSED;
}
this.failureCount = 0;
}
private recordFailure(): void {
this.failureCount++;
this.lastFailureTime = new Date();
if (this.failureCount >= this.options.failureThreshold) {
this.state = CircuitBreaker.OPEN;
} else if (this.state === CircuitBreaker.CLOSED &&
this.failureCount >= Math.floor(this.options.failureThreshold / 2)) {
this.state = CircuitBreaker.HALF_OPEN;
}
this.successCount = 0;
}
private reset(): void {
this.state = CircuitBreaker.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.lastFailureTime = null;
this.nextAttempt = null;
}
getState(): CircuitBreakerState {
return this.state;
}
getStatus(): string {
return this.state;
}
}
// 6. Dead Letter Queue Handler
// dead-letter-queue-handler.ts
interface DeadLetterEvent {
event: Event;
error: Error;
retryCount: number;
lastAttempt: Date;
originalExchange: string;
originalRoutingKey: string;
}
export class DeadLetterQueueHandler {
private channel: Channel;
private retryAttempts: Map<string, number> = new Map();
constructor(channel: Channel) {
this.channel = channel;
this.initializeDeadLetterQueue();
}
private async initializeDeadLetterQueue(): Promise<void> {
await this.channel.assertExchange('exchange.dlx', { durable: true });
await this.channel.assertQueue('queue.dlq', { durable: true });
await this.channel.bindQueue('queue.dlq', 'exchange.dlx');
console.log('Dead letter queue initialized');
}
async handleDeadLetter(event: Event, error: Error, retryCount: number): Promise<void> {
const dlxEvent: DeadLetterEvent = {
event,
error,
retryCount,
lastAttempt: new Date(),
originalExchange: 'exchange.main',
originalRoutingKey: event.type
};
console.error(`Dead letter event for ${event.type}: ${error.message}`);
// Check if we should retry
const eventKey = `${event.type}:${event.aggregateId}`;
const currentRetries = this.retryAttempts.get(eventKey) || 0;
if (currentRetries < 3) {
console.log(`Retrying dead letter event for ${event.type} (attempt ${currentRetries + 1}/3)`);
this.retryAttempts.set(eventKey, currentRetries + 1);
// Add retry delay with exponential backoff
const retryDelay = Math.pow(2, currentRetries) * 1000;
await new Promise(resolve => setTimeout(resolve, retryDelay));
// Re-queue the event
await this.requeueEvent(event);
} else {
// Max retries exceeded, log and persist
console.error(`Max retries exceeded for event ${event.type}, logging to persistent storage`);
await this.persistFailedEvent(dlxEvent);
}
}
private async requeueEvent(event: Event): Promise<void> {
// Find the original exchange and routing key
const originalExchange = 'exchange.main';
const routingKey = event.type;
await this.channel.publish(
originalExchange,
routingKey,
Buffer.from(JSON.stringify(event))
);
}
private async persistFailedEvent(dlxEvent: DeadLetterEvent): Promise<void> {
// In production, persist to database or log system
console.error(`\n[PERSISTENT DEAD LETTER EVENT]\nEvent: ${dlxEvent.event.type}\nError: ${dlxEvent.error.message}\nRetry Count: ${dlxEvent.retryCount}\n`);
}
}
// 7. Usage Examples
// main.ts - Application entry point
import { Connection } from 'amqplib/connection';
async function main() {
try {
// Establish RabbitMQ connection
const connection = await connect({
host: 'localhost',
port: 5672,
username: 'admin',
password: process.env.RABBITMQ_PASSWORD,
});
console.log('Connected to RabbitMQ');
// Initialize services
const sagaManager = new SagaManager(connection);
await sagaManager.initialize();
const eventStore = new EventStore(connection);
await eventStore.initialize();
const eventBus = new EventBus(eventStore.channel);
const queryService = new QueryService(eventStore);
const commandService = new CommandService(eventStore);
// Create sample saga
const orderSaga = {
id: 'order-saga-1',
name: 'Order Processing Saga',
timeout: 30000,
steps: [
{
id: 'step-1',
name: 'Validate Order Data',
execute: async () => {
// Validate order data
return { valid: true, orderId: 'order-123' };
},
compensate: async () => {
console.log('Compensating order creation');
}
},
{
'id': 'step-2',
name: 'Process Payment',
execute: async () => {
// Process payment
return { success: true, paymentId: 'pay-456' };
},
compensate: async () => {
console.log('Refunding payment');
}
},
{
id: 'step-3',
name: 'Create Order',
execute: async () => {
// Create order
return { orderId: 'order-123', status: 'created' };
},
compensate: async () => {
console.log('Cancel order');
}
},
{
id: 'step-4',
name: 'Send Confirmation',
execute: async () => {
// Send confirmation
console.log('Order confirmed');
}
}
]
};
sagaManager.createSaga(orderSaga);
// Handle incoming events
eventBus.subscribe('order.*', async (event) => {
console.log('Received order event:', event.type, event.data);
// Handle business logic here
});
// Execute sample saga
const orderData = {
id: 'order-123',
customer: 'John Doe',
items: [
{ id: 'item-1', name: 'Product 1', price: 100 },
{ id: 'message-id', name: 'Message ID: message-id-123', price: 50 }
],
total: 150
};
await sagaManager.executeSaga(orderSaga, orderData);
} catch (error) {
console.error('Application startup failed:', error);
process.exit(1);
}
}
// 8. Environment Configuration for Different Environments
// config/development.ts
export const developmentConfig = {
connection: {
host: 'localhost',
port: 5672,
username: 'admin',
password: 'dev_password',
frameMax: 4096,
heartbeat: 30,
timeout: 10000
},
eventBus: {
retryAttempts: 3,
retryDelay: 1000,
deadLetterExchange: 'exchange.dlx',
deadLetterQueue: 'queue.dlq'
},
sagas: {
timeout: 30000,
maxConcurrent: 10
}
};
// config/production.ts
export const productionConfig = {
connection: {
urls: [
'amqps://rabbitmq-main:5672',
'amqps://rabbitmq-backup:5673'
],
credentials: {
username: 'prod-user',
password: process.env.RABBITMQ_PASSWORD,
heartbeat: 20,
timeout: 5000
}
},
eventBus: {
retryAttempts: 5,
retryDelay: 5000,
deadLetterExchange: 'exchange.dlx.production',
deadLetterQueue: 'queue.dlq.production'
},
sagas: {
timeout: 60000,
maxConcurrent: 50
},
monitoring: {
metricsEnabled: true,
tracingEnabled: true,
loggingLevel: 'warn'
}
};
// config/test.ts
export const testConfig = {
connection: {
host: 'localhost',
port: 5672,
username: 'test',
password: 'test_password',
frameMax: 1024
},
eventBus: {
retryAttempts: 1,
retryDelay: 500
}
};