🎯 Exemplos recomendados
Balanced sample collections from various categories for you to explore
Exemplos Avançados do RabbitMQ
Configuração avançada do RabbitMQ incluindo clustering, alta disponibilidade, otimização e padrões empresariais
⚙️ Configuração de Cluster RabbitMQ
🔴 complex
⭐⭐⭐⭐
Configuração de cluster RabbitMQ com alta disponibilidade e failover
⏱️ 60 min
🏷️ rabbitmq, message-queue, enterprise, clustering
Prerequisites:
Docker, PostgreSQL, Container orchestration, Messaging concepts
# RabbitMQ Advanced Cluster Configuration
# Enterprise-grade setup with clustering and high availability
# 1. Docker Compose for Cluster Setup
# docker-compose.yml
version: '3.8'
services:
rabbitmq1:
image: rabbitmq:3.12-management
hostname: rabbitmq1
container_name: rabbitmq1
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
RABBITMQ_USE_LONGNAME: "true"
RABBITMQ_NODENAME: rabbit@rabbitmq1
RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq2,rabbit@rabbitmq3
RABBITMQ_RAM_NODE: "false"
RABBITMQ_DISK_NODE: "true"
RABBITMQ_DISK_FREE_LIMIT: "1.0"
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
RABBITMQ_VM_MEMORY_LIMIT: "2000"
RABBITMQ_FLOW_ACTIVE_TIMEOUT: "60000000"
RABBITMQ_FLOW_PASSIVE_TIMEOUT: "90000000"
RABBITMQ_HEARTBEAT_TIMEOUT: "60"
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbit disk_free_limit.abs.disk_limit 1024MB -rabbit vm_memory_high_watermark 1000MB"
ports:
- "5672:5672" # AMQP port
- "15672:15672" # Management UI
- "25672:25672" # STOMP
- "61613:61613" # TLS AMQP
volumes:
- rabbitmq1_data:/var/lib/rabbitmq
- ./rabbitmq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./rabbitmq1/definitions.json:/etc/rabbitmq/definitions.json
networks:
- rabbitmq-cluster
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
restart: unless-stopped
rabbitmq2:
image: rabbitmq:3.12-management
hostname: rabbitmq2
container_name: rabbitmq2
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
RABBITMQ_USE_LONGNAME: "true"
RABBITMQ_NODENAME: rabbit@rabbitmq2
RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq1,rabbit@rabbitmq3
RABBITMQ_RAM_NODE: "false"
RABBITMQ_DISK_NODE: "true"
RABBITMQ_DISK_FREE_LIMIT: "1.0"
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
RABBITMQ_VM_MEMORY_LIMIT: "2000"
ports:
- "5673:5672"
- "15673:15672"
- "25673:25672"
- "61614:613"
volumes:
- rabbitmq2_data:/var/lib/rabbitmq
- ./rabbitmq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./rabbitmq2/definitions.json:/etc/rabbitmq/definitions.json
networks:
- rabbitmq-cluster
depends_on:
- rabbitmq1
restart: unless-stopped
rabbitmq3:
image: rabbitmq:3.12-management
hostname: rabbitmq3
container_name: rabbitmq3
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_ERLANG_COOKIE: rabbitmq-cluster-cookie
RABBITMQ_USE_LONGNAME: "true"
RABBITMQ_NODENAME: rabbit@rabbitmq3
RABBITMQ_CLUSTER_NODES: rabbit@rabbitmq1,rabbit@rabbitmq2
RABBITMQ_RAM_NODE: "false"
RABBITMQ_DISK_NODE: "true"
RABBITMQ_DISK_FREE_LIMIT: "1.0"
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: "1000"
RABBITMQ_VM_MEMORY_LIMIT: "2000"
ports:
- "5674:5672"
- "15674:15672"
- "25674:25672"
- "61615:613"
volumes:
- rabbitmq3_data:/var/lib/rabbitmq
- ./rabbitmq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./rabbitmq3/definitions.json:/etc/rabbitmq/definitions.json
networks:
- rabbitmq-cluster
depends_on:
- rabbitmq1
restart: unless-stopped
# HAProxy for Load Balancing
haproxy:
image: haproxy:2.8
container_name: haproxy
ports:
- "5670:5670" # Main AMQP port
- "15670:15672" # Management UI
volumes:
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
depends_on:
- rabbitmq1
- rabbitmq2
- rabbitmq3
networks:
- rabbitmq-cluster
restart: unless-stopped
# Prometheus for Monitoring
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- rabbitmq-cluster
restart: unless-stopped
# Grafana for Visualization
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
networks:
- rabbitmq-cluster
depends_on:
- prometheus
restart: unless-stopped
# RabbitMQ Exporter
rabbitmq-exporter:
image: kbudry/rabbitmq-exporter:latest
container_name: rabbitmq-exporter
ports:
- "9419:9419"
environment:
RABBIT_URL: http://haproxy:15672
PUBLISH_PORT: 9419
RABBIT_USER: admin
RABBIT_PASS: ${RABBITMQ_PASSWORD}
EXCLUDE_QUEUES: "^amq\.default"
networks:
- rabbitmq-cluster
depends_on:
- haproxy
restart: unless-stopped
volumes:
rabbitmq1_data:
rabbitmq2_data:
rabbitmq3_data:
grafana_data:
networks:
rabbitmq-cluster:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/16
# 2. RabbitMQ Configuration for Cluster
# rabbitmq1/rabbitmq.conf
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq2
cluster_cluster_name = rabbitmq-cluster
cluster_formation.node_cleanup_interval = 30
cluster_formation.node_cleanup_only = false
cluster_formation.peer_discovery_interval = 5
# Disk and Memory Configuration
disk_free_limit.absolute = 2GB
disk_free_limit.percentage = 90
vm_memory_high_watermark.relative = 80
vm_memory_limit.absolute = 2GB
# Performance Tuning
heartbeat_timeout = 60
consumer_timeout = 3600000
consumer_prefetch_count = 10
message_ttl = 86400000
# Security
loopback_users.guest = false
listeners.tcp.default = 5672
listeners.ssl.default = 5671
# Management Plugin
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
management.tcp.disable = false
management.ssl.port = 15671
management.ssl.ip = 0.0.0.0
management.ssl.disable = false
# 3. HAProxy Configuration for Load Balancing
# haproxy.cfg
global
daemon
maxconn 4096
log /dev/log local0
log /dev/log local1 notice
defaults
mode tcp
timeout connect 5000
timeout client 50000
timeout server 50000
frontend ft_rabbitmq_main
bind *:5670
default_backend backend_rabbitmq_servers
maxconn 4096
timeout client 50000
timeout server 50000
option tcplog
option logasap
frontend ft_rabbitmq_management
bind *:15670
default_backend backend_rabbitmq_management
timeout client 50000
timeout server 50000
option tcplog
option logasap
backend backend_rabbitmq_servers
balance roundrobin
mode tcp
option tcp-check
server rabbitmq1 5672 check inter 5s rise 2 fall 3
server rabbitmq2 5673 check inter 5s rise 2 fall 3
server rabbitmq3 5674 check inter 5s rise 2 fall 3
maxconn 2000
timeout server 50000
backend backend_rabbitmq_management
balance roundrobin
mode http
option httpchk GET /api/healthcheck
option httplog
option logasap
server rabbitmq1 15672 check inter 5s rise 2 fall 3
maxconn 1000
timeout server 50000
# 4. Prometheus Configuration for RabbitMQ
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets:
- 'rabbitmq-exporter:9419'
metrics_path: /metrics
scrape_interval: 5s
scrape_timeout: 10s
# 5. RabbitMQ Definitions and Policies
# rabbitmq1/definitions.json
{
"users": [
{
"name": "admin",
"password": "${RABBITMQ_PASSWORD}",
"tags": ["administrator"],
"hashing_algorithm": "SHA-256",
"permissions": {
"configure": ".*",
"write": ".*",
"read": ".*"
}
},
{
"name": "service_user",
"password": "service_password",
"tags": ["service"],
"hashing_algorithm": "SHA-256",
"permissions": {
"configure": ".*",
"write": "service.*",
"read": "service.*"
}
},
{
"name": "monitoring_user",
"password": "monitoring_password",
"tags": ["monitoring"],
"hashing_algorithm": "backup",
"permissions": {
"configure": "metrics.*",
"read": "metrics.*"
}
}
],
"vhosts": [
{
"name": "/",
"description": "Default virtual host"
},
{
"name": "production",
"description": "Production environment"
},
{
"name": "staging",
"description": "Staging environment"
},
{
"name": "development",
"description": "Development environment"
}
],
"permissions": [
{
"user": "service_user",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "monitoring_user",
"vhost": "/",
"configure": "metrics.*",
"read": ".*"
}
],
"queues": [
{
"name": "user_events_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {
"x-max-length": 10000,
"x-overflow": "drop-head"
}
},
{
"name": "order_processing_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {
"x-max-priority": 10,
"x-queue-mode": "lazy"
}
},
{
"name": "notifications_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {
"x-message-ttl": 86400000,
"x-dead-letter-exchange": "notifications-dlx"
}
},
{
"name": "priority_queue",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {
"x-max-priority": 10,
"x-queue-mode": "priority",
"x-queue-mode-legacy": "true"
}
}
],
"exchanges": [
{
"name": "user_events_exchange",
"vhost": "/",
"type": "topic",
"durable": true,
"auto_delete": false
},
{
"name": "orders_exchange",
"vhost": "/",
"type": "direct",
"durable": true,
"auto_delete": false
},
{
"name": "notifications_exchange",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false
}
],
"bindings": [
{
"source": "user_events_exchange",
"destination": "user_events_queue",
"destination_type": "queue",
"routing_key": "user.*.events",
"arguments": {}
},
{
"source": "orders_exchange",
"destination": "order_processing_queue",
"destination_type": "queue",
"routing_key": "order.created",
"arguments": {}
},
{
"source": "notifications_exchange",
"destination": "notifications_queue",
"destination_type": "queue",
"routing_key": "*",
"arguments": {}
}
]
}
# 6. High Availability Scripts
# setup-cluster.sh
#!/bin/bash
set -e
echo "Setting up RabbitMQ cluster..."
# Wait for RabbitMQ containers to be ready
echo "Waiting for RabbitMQ containers to start..."
until docker exec rabbitmq1 rabbitmq-diagnostics -q ping; do
echo "Waiting for rabbitmq1..."
sleep 5
done
until docker exec rabbitmq2 rabbitmq-diagnostics -q ping; do
echo "Waiting for rabbitmq2..."
sleep 5
done
until docker exec rabbitmq3 rabbitmq-diagnostics -s ping; do
echo "Waiting for rabbitmq3..."
sleep 5
done
echo "All RabbitMQ nodes are ready!"
# Verify cluster status
echo "Verifying cluster status..."
docker exec rabbitmq1 rabbitmqctl cluster_status
# Create federation upstream for external connectivity
echo "Setting up federation upstream..."
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/parameters/federation-upstreams \
-X PUT -H "content-type: application/json" -d '{
"value": {
"uri": "amqps://external-rabbitmq:5672",
"expires": 3600000,
"reconnect-delay": 5000
}
}'
# Create policies
echo "Creating policies..."
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/policies/hourly-rate-limit \
-X PUT -H "content-type:application/json" -d '{
"pattern": "^amq\.default",
"definition": {
"type": "classic",
"max-count": 10000,
"priority": 100
}
}'
curl -u admin:${RABMQ_PASSWORD} http://localhost:15672/api/policies/queue-size-limit \
-X PUT -H "content-type:application/json" -d '{
"pattern": "^amq\.default",
"definition": {
"max-length": 5000,
"priority": 50
}
}'
# Enable federation
curl -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/parameters/federation-upstreams/external-rabbitmq/set-policy -X POST -H "content-type: application/json" -d '{
"value": {
"apply-to": "all"
}
}'
echo "RabbitMQ cluster setup completed!"
# 7. Performance Monitoring Script
# monitor-cluster.sh
#!/bin/bash
echo "Monitoring RabbitMQ cluster..."
# Get cluster status
echo "=== Cluster Status ==="
curl -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/cluster_status | jq '.'
# Get node details
echo -e "
=== Node Details ==="
for i in {1..3}; do
echo -e "
=== RabbitMQ$i ==="
curl -s -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/nodes/rabbit@rabbitmq$i | jq '.memory.used_bytes, memory.limit_bytes, fd_used, fd_total, running_processes'
done
# Get queue statistics
echo -e "
=== Queue Statistics ==="
curl -s -u admin:${RabbitMQ_PASSWORD} http://localhost:15672/api/queues | jq '.[] | {name, messages, memory, consumers}'
# Get connection statistics
echo -e "
=== Connection Statistics ==="
curl -s -u admin:${RABBITMQ_PASSWORD} http://localhost:15672/api/connections | jq '.[] | {name, state, channels, peer_addr, client_properties}'
# Get channel statistics
echo -e "
=== Channel Statistics ==="
curl -s -u admin:${RABBITMQ} http://localhost:15672/api/channels | jq '.[] | {name, state, connection_name, messages, consumer_count}' | head -20
# Get exchange statistics
echo -e "
=== Exchange Statistics ==="
curl -s -u admin:${RABBITMQ} http://localhost:15672/api/exchanges | jq '.[] | {name, type, messages, message_stats_in, message_stats_out}' | head -10
# Health check function
check_health() {
local port=$1
local host=${2:-localhost}
local path=${3:-/api/healthcheck}
echo "Performing health check on $host:$port..."
local response=$(curl -s -o /dev/null -w "%{http_code}" "http://$host:$port$path")
if [ "$response" = "200" ]; then
echo "✓ Health check passed"
return 0
else
echo "✗ Health check failed with status $response"
return 1
fi
}
# Health checks
check_health 5672 rabbitmq1
check_health 5672 rabbitmq2
check_health 5672 rabbitmq3
check_health 15672 haproxy
echo "Health checks completed!"
# 8. Environment Variables
# .env
RABBITMQ_PASSWORD=secure_password_here
GRAFANA_PASSWORD=admin_password_here
# Node-specific configurations
RABBITMQ1_NAME=rabbit1
RABBITMQ2_NAME=rabbit2
RABBITMQ3_NAME=rabbit3
# Network configuration
RABBITMQ_CLUSTER_COOKIE=very_secure_cluster_cookie
RABBITMQ_DEFAULT_USER=admin
RABBITMQ_DEFAULT_VHOST=/
RABBITMQ_NODENAME=rabbit@$(hostname)
# Performance tuning
RABBITMQ_VM_MEMORY_HIGH_WATERMARK=1000
RABBITMQ_VM_MEMORY_LIMIT=2000
RABBITMQ_DISK_FREE_LIMIT=1GB
RABBITMQ_FLOW_ACTIVE_TIMEOUT=60000000
RABBITMQ_FLOW_PASSIVE_TIMEOUT=90000000
# Monitoring and Logging
RABBITMQ_LOG_LEVEL=INFO
RABBITMQ_PROMETHEUS_ENABLED=true
RABBITMQ_PROMETHEUS_CONFIG_FILE=/etc/rabbitmq/prometheus.json
RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_prometheus,rabbitmq_shovel
# Security
RABBITMQ_SSL_ENABLED=false
RABBITMQ_SSL_VERIFY=verify_peer
RABBITMQ_SSL_FAIL_IF_NO_PEER_CERT=true
RABBITMQ_SSL_CERT_FILE=/etc/ssl/rabbitmq.crt
RABBITMQ_KEY_FILE=/etc/ssl/rabbitmq.key
RABBITMQ_CA_FILE=/etc/ssl/ca_bundle.crt
# 9. Application Connection Examples
# Node.js Connection with Connection Pool
const amqp = require('amqplib');
const cluster = require('amqplib/lib/cluster');
// Connection pool for high-traffic applications
const connectionPool = {
hostname: ['localhost:5672', 'localhost:5673', 'localhost:5674'],
port: 5672,
username: 'admin',
channelMax: 10,
heartbeat: 30,
connection_timeout: 10000,
retry: 3
};
async function createRabbitConnection() {
try {
const connection = await amqp.connect({
hostname: connectionPool.hostname[0],
port: connectionPool.port,
username: connectionPool.username,
password: process.env.RABBITMQ_PASSWORD,
heartbeat: connectionPool.heartbeat,
connection_timeout: connectionPool.connection_timeout
});
console.log('RabbitMQ connection established');
return connection;
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
throw error;
}
}
// Cluster connection with failover
async function createClusterConnection() {
return new Promise((resolve, reject) => {
const clusterConfig = {
addresses: [
'amqp://localhost:5672',
'amqp://localhost:5673',
'amqp://localhost:5674'
],
username: 'admin',
password: process.env.RABBITMQ_PASSWORD,
timeout: 10000,
heartbeat: 10,
reconnect: true,
reconnectBackoffStrategy: 'linear'
};
cluster.connect(clusterConfig, (err, connection) => {
if (err) {
console.error('Failed to connect to RabbitMQ cluster:', err);
reject(err);
} else {
console.log('RabbitMQ cluster connection established');
resolve(connection);
}
});
});
}
// Python Connection with Pika
import pika
from pika.connection import ConnectionParameters
from pika.exceptions import AMQPConnectionError
# High availability connection
def create_connection():
credentials = pika.PlainCredentials(
'admin',
os.environ['RABBITMQ_PASSWORD']
)
parameters = ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=30,
connection_attempts=3,
retry_delay=5
)
try:
connection = pika.BlockingConnection(parameters)
print("RabbitMQ connection established")
return connection
except AMQPConnectionError as e:
print(f"Failed to connect to RabbitMQ: {e}")
raise
# Connection with retry and circuit breaker
class RabbitMQConnectionManager:
def __init__(self, max_retries=3, retry_delay=5):
self.max_retries = max_retries
self.retry_delay = retry_delay
self.connection = None
self.circuit_breaker_open = False
self.circuit_breaker_timeout = 30000 # 30 seconds
self.circuit_breaker_reset_timeout = 60000 # 1 minute
def get_connection(self):
if self.circuit_breaker_open:
print("Circuit breaker is open, refusing connection attempts")
return None
if self.connection and self.connection.is_open:
return self.connection
return self._connect_with_retry()
def _connect_with_retry(self):
last_exception = None
for attempt in range(self.max_retries):
try:
connection = create_connection()
self.connection = connection
self.circuit_breaker_open = False
return connection
except Exception as e:
last_exception = e
if attempt < self.max_retries - 1:
print(f"Connection attempt {attempt + 1} failed, retrying in {self.retry_delay}s...")
time.sleep(self.retry_delay)
continue
else:
self.circuit_breaker_open = True
raise last_exception
# Set circuit breaker to close after timeout
threading.Timer(self.circuit_breaker_reset_timeout, self._reset_circuit_breaker).start()
raise last_exception
def _reset_circuit_breaker(self):
print("Resetting circuit breaker")
self.circuit_breaker_open = False
# 10. Production Deployment Configuration
# Kubernetes deployment
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq-cluster
namespace: production
labels:
app: rabbitmq
spec:
serviceName: rabbitmq
replicas: 3
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
terminationGracePeriodSeconds: 30
containers:
- name: rabbitmq
image: rabbitmq:3.12-management
ports:
- containerPort: 5672
name: amqp
- containerPort: 15672
name: management
- containerPort: 25672
name: stomp
env:
- name: RABBITMQ_DEFAULT_USER
value: "admin"
- name: RABBITMQ_DEFAULT_PASS
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: password
- name: RABBITMQ_ERLANG_COOKIE
value: "rabbitmq-cluster-cookie"
- name: RABBITMQ_USE_LONGNAME
value: "true"
- name:1:RABBITMQ_NODENAME
value: "rabbit@$(POD_NAME)-$(POD_HOSTNAME)"
- name: RABBITMQ_CLUSTER_NODES
value: "rabbit@$( (($(POD_NAME)-$(POD_HOSTNAME) | tr -d ' '))\,"
- name: RABBITMQ_DISK_NODE
value: "true"
- name: RABBITMQ_RAM_NODE
value: "false"
- name: RABBITMQ_DISK_FREE_LIMIT
value: "1.0"
- name: RABBITMQ_VM_MEMORY_HIGH_WATERMARK
value: "1000"
- name: RABBITMQ_VM_MEMORY_LIMIT
value: "2000"
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
- name: rabbitmq-config
mountPath: /etc/rabbitmq
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
exec:
command:
- rabbitmq-diagnostics
- -q
- ping
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
readinessProbe:
exec:
command:
- rabbitmq-diagnostics
- -q
- ping
initialDelaySeconds: 20
periodSeconds: 10
timeoutSeconds: 5
volumes:
- name: rabbitmq-data
persistentVolumeClaim:
claimName: rabbitmq-data
- name: rabbitmq-config
configMap:
name: rabbitmq-config
- name: rabbitmq-definitions
configMap:
name: rabbitmq-definitions
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: rabbitmq-data
namespace: production
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: fast-ssd
---
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-secret
namespace: production
type: Opaque
data:
password: <base64-encoded-password>
---
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-config
namespace: production
data:
rabbitmq.conf: |
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = [email protected]
cluster_formation.classic_config.nodes.2 = [email protected]
cluster_formation.classic_config.nodes.3 = [email protected]
cluster_formation.classic_config.nodes.4 = [email protected]
disk_free_limit.absolute = 2GB
vm_memory_high_watermark = 1000MB
vm_memory_limit = 2GB
heartbeat_timeout = 60
connection_timeout = 10
consumer_timeout = 3600000
consumer_prefetch_count = 10
message_ttl = 86400000
definitions.json: |
{
"users": [
{
"name": "admin",
"password": "${RABBITMQ_PASSWORD}",
"tags": ["administrator"],
"permissions": {
"configure": ".*",
"write": ".*",
"read": ".*"
}
}
],
"vhosts": [
{
"name": "/",
"description": "Production virtual host"
}
],
"queues": [
{
"name": "high-priority-queue",
"durable": true,
"arguments": {
"x-max-priority": 10
}
}
]
}
💻 Padrões de Microserviços RabbitMQ typescript
🔴 complex
⭐⭐⭐⭐
Arquitetura microserviços com RabbitMQ incluindo Saga pattern, event sourcing e CQRS
⏱️ 55 min
🏷️ rabbitmq, microservices, patterns, architecture
Prerequisites:
Node.js, TypeScript, Message Queue, Event-Driven Architecture
// RabbitMQ Microservices Architecture Patterns
// Event-driven architecture with RabbitMQ
// 1. Saga Pattern Implementation
// saga-manager.ts
import { Connection, Channel, ConsumeMessage } from 'amqplib/connection';
import { EventEmitter } from 'events';
interface SagaStep {
id: string;
execute: () => Promise<any>;
compensate?: () => Promise<any>;
onFail?: (error: Error) => Promise<void>;
onSuccess?: (result: any) => Promise<void>;
}
interface Saga {
id: string;
name: string;
steps: SagaStep[];
timeout: number;
}
interface SagaState {
id: string;
currentStep: number;
status: 'pending' | 'running' | 'completed' | 'failed' | 'compensating';
data: Record<string, any>;
compensations: Array<() => Promise<void>>;
error?: Error;
startTime: Date;
endTime?: Date;
}
export class SagaManager {
private connection: Connection;
private channel: Channel;
private sagas: Map<string, Saga> = new Map();
private sagaStates: Map<string, SagaState> = new Map();
private eventEmitter = new EventEmitter();
constructor(connection: Connection) {
this.connection = connection;
}
async initialize(): Promise<void> {
this.channel = await this.connection.createChannel();
console.log('Saga Manager initialized');
}
// Create a new saga
createSaga(saga: Saga): void {
this.sagas.set(saga.id, saga);
console.log(`Created saga: ${saga.name} (${saga.id})`);
}
// Execute a saga
async executeSaga(saga: Saga, data: any = {}): Promise<void> {
const sagaState: SagaState = {
id: saga.id,
currentStep: 0,
status: 'pending',
data,
compensations: [],
startTime: new Date()
};
this.sagaStates.set(saga.id, sagaState);
try {
for (let i = 0; i < saga.steps.length; i++) {
const step = saga.steps[i];
sagaState.currentStep = i;
sagaState.status = 'running';
console.log(`Executing saga ${saga.name}, step ${i + 1}/${saga.steps.length}: ${step.constructor.name}`);
// Record compensation if available
if (step.compensate) {
sagaState.compensations.push(step.compensate);
}
// Execute step with timeout
const result = await this.executeStepWithTimeout(step, saga.timeout);
// Store step result
sagaState.data[`step_${i}`] = result;
// Notify progress
this.eventEmitter.emit('stepCompleted', {
sagaId: saga.id,
sagaName: saga.name,
step: i + 1,
totalSteps: saga.steps.length,
result
});
if (step.onSuccess) {
await step.onSuccess(result);
}
}
sagaState.status = 'completed';
sagaState.endTime = new Date();
console.log(`Saga ${saga.name} completed successfully in ${sagaState.endTime.getTime() - sagaState.startTime.getTime()}ms`);
this.eventEmitter.emit('sagaCompleted', sagaState);
} catch (error) {
sagaState.status = 'failed';
sagaState.error = error;
sagaState.endTime = new Date();
console.error(`Saga ${saga.name} failed: ${error.message}`);
// Execute compensations in reverse order
console.log(`Executing compensations for saga ${saga.name}...`);
await this.executeCompensations(sagaState.compensations);
if (step.onFail) {
await step.onFail(error);
}
sagaState.status = 'compensated';
this.eventEmitter.emit('sagaCompensated', sagaState);
throw error;
}
}
private async executeStepWithTimeout(step: SagaStep, timeout: number): Promise<any> {
return Promise.race([
step.execute(),
new Promise((_, reject) => {
setTimeout(() => reject(new Error(`Saga step timeout after ${timeout}ms`)), timeout);
})
]);
}
private async executeCompensations(compensations: Array<() => Promise<void>>): Promise<void> {
for (let i = compensations.length - 1; i >= 0; i--) {
try {
await compensations[i]();
} catch (error) {
console.error(`Compensation failed: ${error.message}`);
// Continue with other compensations even if one fails
}
}
}
// Event listener methods
onSagaCompleted(callback: (state: SagaState) => void): void {
this.eventEmitter.on('sagaCompleted', callback);
}
onSagaCompensated(callback: (state: SetState) => void): void {
this.eventEmitter.on('sagaCompensated', callback);
}
onStepCompleted(callback: (data: any) => void => void {
this.eventEmitter.on('stepCompleted', callback);
}
}
// 2. Event Sourcing Pattern
// event-store.ts
interface Event {
id: string;
type: string;
data: any;
metadata: Record<string, any>;
timestamp: Date;
aggregateId: string;
sequence: number;
version: number;
}
interface AggregateState {
id: string;
type: string;
version: number;
data: Record<string, any>;
lastEvent: Date;
}
export class EventStore {
private connection: Connection;
private channel: Channel;
private events: Map<string, Event[]> = new Map();
private aggregates: Map<string, AggregateState> = new Map();
constructor(connection: Connection) {
this.connection = connection;
}
async initialize(): Promise<void> {
this.channel = await this.connection.createChannel();
await this.channel.assertQueue('events', { durable: true });
await this.channel.assertQueue('events-replay', { durable: true });
await this.channel.assertQueue('events-snapshot', { durable: true });
// Start consuming events
this.startEventConsumption();
console.log('Event Store initialized');
}
async storeEvent(event: Event): Promise<void> {
const eventMessage = {
id: event.id,
type: event.type,
data: event.data,
metadata: event.metadata,
timestamp: event.timestamp,
aggregateId: event.aggregateId,
sequence: event.sequence,
version: event.version
};
await this.channel.sendToQueue('events', Buffer.from(JSON.stringify(eventMessage)));
console.log(`Event stored: ${event.type} for aggregate ${event.aggregateId}`);
}
async getEventsForAggregate(aggregateId: string): Promise<Event[]> {
const events = this.events.get(aggregateId) || [];
if (events.length > 0) {
return events;
}
// Replay events from queue
try {
const q = await this.channel.getQueue('events-replay', { durable: true });
let hasMore = true;
while (hasMore) {
const msg = await q.get({ noAck: false });
if (msg === false) break;
const event = JSON.parse(msg.content.toString());
if (event.aggregateId === aggregateId) {
events.push(event);
events.push(event);
}
hasMore = false;
// In a real implementation, you'd track sequence numbers properly
}
this.events.set(aggregateId, events);
return events;
} catch (error) {
console.error(`Failed to replay events for aggregate ${aggregateId}: ${error}`);
return [];
}
}
async getAggregateState(aggregateId: string): Promise<AggregateState | null> {
const events = await this.getEventsForAggregate(aggregateId);
if (events.length === 0) {
return null;
}
// Apply events in order
let state: AggregateState = {
id: aggregateId,
type: events[0].type.split('.')[1],
version: events[0].version,
data: events[0].data,
lastEvent: events[events.length - 1].timestamp
};
for (let i = 1; i < events.length; i++) {
const event = events[i];
state = this.applyEventToState(state, event);
}
return state;
}
private applyEventToState(state: AggregateState, event: Event): AggregateState {
const [entityType, action] = event.type.split('.');
switch (action) {
case 'Created':
state.data = event.data;
state.version = event.version;
break;
'Updated':
state.data = { ...state.data, ...event.data };
state.version = event.version;
break;
'Deleted':
state.data = {};
state.version = event.version;
break;
'Versioned':
state.data = { ...state.data, ...event.data };
state.version = event.version;
break;
default:
state.data = { ...state.data, ...event.data };
}
state.lastEvent = event.timestamp;
return state;
}
}
private startEventConsumption(): void {
this.channel.consume('events', { noAck: true }, async (msg) => {
const event = JSON.parse(msg.content.toString());
const aggregateId = event.aggregateId;
let events = this.events.get(aggregateId) || [];
events.push(event);
this.events.set(aggregateId, events);
// Keep only last 1000 events per aggregate
if (events.length > 1000) {
events = events.slice(-1000);
this.events.set(aggregateId, events);
}
this.channel.ack(msg);
this.channel.nack(msg);
});
}
// Create snapshot of aggregate state
async createSnapshot(aggregateId: string, state: AggregateState): Promise<void> {
const snapshotMessage = {
aggregateId,
state,
timestamp: new Date(),
version: state.version
};
await this.channel.sendToQueue('events-snapshot', Buffer.from(JSON.stringify(snapshotMessage)));
console.log(`Created snapshot for aggregate ${aggregateId}, version ${state.version}`);
}
// Restore from snapshot
async restoreFromSnapshot(aggregateId: string): Promise<AggregateState | null> {
try {
const q = await this.channel.getQueue('events-snapshot', { durable: true });
const msg = await q.get({ noAck: true });
if (msg === false) {
return null;
}
const snapshot = JSON.parse(msg.content.toString());
return snapshot.state;
} catch (error) {
console.error(`Failed to restore snapshot for aggregate ${aggregateId}: ${error}`);
return null;
}
}
}
// 3. CQRS Pattern Implementation
// query-service.ts and command-service.ts
// Query Service (Read Model)
export class QueryService {
private eventStore: EventStore;
constructor(eventStore: EventStore) {
this.eventStore = eventStore;
}
async getUserById(userId: string): Promise<any> {
const state = await this.eventStore.getAggregateState(userId);
return state?.data;
}
async getUsersByRole(role: string): Promise<any[]> {
const users = [];
// Get all user aggregates
const allUserIds = await this.getAllUserIds();
for (const userId of allUserIds) {
const user = await this.getUserById(userId);
if (user && user.role === role) {
users.push(user);
}
}
return users;
}
async getAllUserIds(): Promise<string[]> {
// In a real implementation, you'd maintain a registry of all aggregate IDs
return ['user-1', 'user-2', 'user-3'];
}
async searchUsers(query: string): Promise<any[]> {
const results = [];
// Simple text search implementation
const allUserIds = await this.getAllUserIds();
for (const userId of allUserIds) {
const user = await this.getUserById(userId);
if (user && (
user.name.toLowerCase().includes(query.toLowerCase()) ||
user.email.toLowerCase().includes(query.toLowerCase())
)) {
results.push(user);
}
}
return results;
}
}
// Command Service (Write Model)
export class CommandService {
private eventStore: EventStore;
constructor(eventStore: EventStore) {
this.eventStore = eventStore;
}
async createUser(userData: Promise<string> {
const userId = user-data.id || `user-${Date.now()}`;
const event: Event = {
id: `event-${Date.now()}`,
type: 'UserCreated',
data: userData,
metadata: {
source: 'command-service',
timestamp: new Date()
},
timestamp: new Date(),
aggregateId: userId,
sequence: 1,
version: 1
};
await this.eventStore.storeEvent(event);
return userId;
}
async updateUser(userId: string, updates: Record<string, any>): Promise<void> {
const currentState = await this.eventStore.getAggregateState(userId);
const event: Event = {
id: `event-${Date.now()}`,
type: 'UserUpdated',
data: updates,
metadata: {
source: 'command-service',
timestamp: new Date()
},
timestamp: new Date(),
aggregateId: userId,
sequence: (currentState?.version || 0) + 1,
version: (currentState?.version || 0) + 1
};
await this.eventStore.storeEvent(event);
}
async deleteUser(userId: string): Promise<void> {
const event: Event = {
id: `event-${Date.now()}`,
type: 'UserDeleted',
data: { deleted: true, deletedAt: new Date() },
metadata: {
source: 'command-service',
timestamp: new Date()
},
timestamp: new Date(),
aggregateId: userId,
sequence: 1,
version: 1
};
await this.eventStore.storeEvent(event);
}
}
// 4. Event Bus Implementation
// event-bus.ts
import { EventEmitter } from 'events';
interface EventBusConfig {
retryAttempts: number;
retryDelay: number;
deadLetterExchange: string;
deadLetterQueue: string;
}
export class EventBus extends EventEmitter {
private channel: Channel;
private config: EventBusConfig;
constructor(channel: Channel, config?: Partial<EventBusConfig>) {
super();
this.channel = channel;
this.config = {
retryAttempts: 3,
retryDelay: 1000,
deadLetterExchange: 'exchange.dlx',
deadLetterQueue: 'queue.dlq',
...config
};
}
async publishEvent(event: Event, routingKey?: string, options?: any): Promise<void> {
const eventName = event.type;
let routingKey = routingKey || eventName;
try {
const publishPromise = new Promise<void>((resolve, reject) => {
const options = {
expiration: options?.expiration || '24h',
persistent: options?.persistent || false,
timestamp: options?.timestamp || false,
content_type: 'application/json',
headers: options?.headers || {}
};
const success = this.channel.publish(
event.type,
routingKey,
Buffer.from(JSON.stringify(event)),
options,
(err, ok) => {
if (err) reject(err);
else resolve(ok);
}
);
// Handle success callback
success.then(() => {
console.log(`Event ${eventName} published to ${routingKey}`);
resolve();
});
});
await publishPromise;
} catch (error) {
console.error(`Failed to publish event ${eventName}: ${error}`);
// Send to dead letter queue if configured
if (this.config.deadLetterExchange) {
this.publishToDeadLetterQueue(event, error);
}
throw error;
}
} catch (error) {
console.error(`Critical error in event bus: ${error}`);
throw error;
}
}
private async publishToDeadLetterEvent(event: Event, originalError: Error): Promise<void> {
const dlxEvent = {
id: `dlx-event-${Date.now()}`,
type: event.type,
routing_key: 'dead-letter',
data: {
originalEvent: event,
error: {
message: originalError.message,
stack: originalError.stack
}
},
timestamp: new Date()
};
const dlxPromise = new Promise<void>((resolve, reject) => {
const success = this.channel.publish(
this.config.deadLetterExchange,
'dead-letter',
Buffer.from(JSON.stringify(dlxEvent))
);
});
await dlxPromise;
}
async subscribe(pattern: string, handler: (event: Event) => Promise<void>, queueName?: string): Promise<void> {
const queueOptions = {
durable: true,
exclusive: false,
autoDelete: false,
arguments: [pattern],
headers: {
'x-dead-letter-exchange': this.config.deadLetterExchange
}
};
const queueNameSuffix = queueName || pattern;
const q = await this.channel.assertQueue(queueNameSuffix, queueOptions);
await q.consume(async (msg) => {
const event: Event = JSON.parse(msg.content.toString());
await handler(event);
q.ack();
});
console.log(`Subscribed to pattern ${pattern} on queue ${queueNameSuffix}`);
}
async createEvent(eventData: Record<string, any>, eventType: string): Promise<Event> {
const event: Event = {
id: `event-${Date.now()}`,
type: eventType,
data: eventData,
metadata: {
source: 'application',
timestamp: new Date()
},
timestamp: new Date()
};
return event;
}
}
// 5. Circuit Breaker for External Dependencies
// circuit-breaker.ts
interface CircuitBreakerOptions {
timeout: number;
failureThreshold: number;
successThreshold: number;
resetTimeout: number;
retryDelay: number;
}
enum CircuitBreakerState {
CLOSED = 'CLOSED',
OPEN = 'OPEN',
HALF_OPEN = 'HALF_OPEN'
}
export class CircuitBreaker {
private state: CircuitBreakerState;
private failureCount: number = 0;
private successCount: number = 0;
private lastFailureTime: Date | null = null;
private nextAttempt: Date | null = null;
private options: CircuitBreakerOptions;
constructor(options: Partial<CircuitBreakerOptions>) {
this.options = {
timeout: 30000,
failureThreshold: 5,
successThreshold: 2,
resetTimeout: 60000,
retryDelay: 1000,
...options
};
this.state = CircuitBreakerState.CLOSED;
}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === CircuitBreakerOPEN) {
throw new Error('Circuit breaker is open');
}
const now = Date.now();
// Reset if reset timeout has passed
if (this.options.resetTimeout > 0 &&
this.lastFailureTime &&
(now.getTime() - this.lastFailureTime.getTime()) > this.options.resetTimeout) {
this.reset();
}
if (this.state === CircuitBreaker.HALF_OPEN) {
// In half-open state, allow a single attempt
try {
const result = await this.executeWithTimeout(operation);
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
this.state = CircuitBreaker.OPEN;
throw error;
}
}
try {
const result = await this.executeWithTimeout(operation);
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
private async executeWithTimeout<T>(operation: () => Promise<T>): Promise<T> {
return Promise.race([
operation(),
new Promise((_, reject) => {
setTimeout(() => reject(new Error('Operation timeout')), this.options.timeout);
})
]);
}
private recordSuccess(): void {
this.successCount++;
if (this.successCount >= this.options.successThreshold) {
this.state = CircuitBreakerState.CLOSED;
} else if (this.state === CircuitBreaker.HALF_OPEN) {
this.state = CircuitBreaker.CLOSED;
}
this.failureCount = 0;
}
private recordFailure(): void {
this.failureCount++;
this.lastFailureTime = new Date();
if (this.failureCount >= this.options.failureThreshold) {
this.state = CircuitBreaker.OPEN;
} else if (this.state === CircuitBreaker.CLOSED &&
this.failureCount >= Math.floor(this.options.failureThreshold / 2)) {
this.state = CircuitBreaker.HALF_OPEN;
}
this.successCount = 0;
}
private reset(): void {
this.state = CircuitBreaker.CLOSED;
this.failureCount = 0;
this.successCount = 0;
this.lastFailureTime = null;
this.nextAttempt = null;
}
getState(): CircuitBreakerState {
return this.state;
}
getStatus(): string {
return this.state;
}
}
// 6. Dead Letter Queue Handler
// dead-letter-queue-handler.ts
interface DeadLetterEvent {
event: Event;
error: Error;
retryCount: number;
lastAttempt: Date;
originalExchange: string;
originalRoutingKey: string;
}
export class DeadLetterQueueHandler {
private channel: Channel;
private retryAttempts: Map<string, number> = new Map();
constructor(channel: Channel) {
this.channel = channel;
this.initializeDeadLetterQueue();
}
private async initializeDeadLetterQueue(): Promise<void> {
await this.channel.assertExchange('exchange.dlx', { durable: true });
await this.channel.assertQueue('queue.dlq', { durable: true });
await this.channel.bindQueue('queue.dlq', 'exchange.dlx');
console.log('Dead letter queue initialized');
}
async handleDeadLetter(event: Event, error: Error, retryCount: number): Promise<void> {
const dlxEvent: DeadLetterEvent = {
event,
error,
retryCount,
lastAttempt: new Date(),
originalExchange: 'exchange.main',
originalRoutingKey: event.type
};
console.error(`Dead letter event for ${event.type}: ${error.message}`);
// Check if we should retry
const eventKey = ${event.type}:${event.aggregateId}`;
const currentRetries = this.retryAttempts.get(eventKey) || 0;
if (currentRetries < 3) {
console.log(`Retrying dead letter event for ${event.type} (attempt ${currentRetries + 1}/3)`);
this.retryAttempts.set(eventKey, currentRetries + 1);
// Add retry delay with exponential backoff
const retryDelay = Math.pow(2, currentRetries) * 1000;
await new Promise(resolve => setTimeout(resolve, retryDelay));
// Re-queue the event
await this.requeueEvent(event);
} else {
// Max retries exceeded, log and persist
console.error(`Max retries exceeded for event ${event.type}, logging to persistent storage`);
await this.persistFailedEvent(dlxEvent);
}
}
private async requeueEvent(event: Event): Promise<void> {
// Find the original exchange and routing key
const originalExchange = 'exchange.main';
const routingKey = event.type;
await this.channel.publish(
originalExchange,
routingKey,
Buffer.from(JSON.stringify(event))
);
}
private async persistFailedEvent(dlxEVent: DeadLetterEvent): Promise<void> {
// In production, persist to database or log system
console.error(
[PERSISTENT DEAD LETTER EVENT]
Event: ${dlxEvent.event.type}
Error: ${dlxVent.error.message}
Retry Count: ${dlxVent.retryCount}
`);
}
}
// 7. Usage Examples
// main.ts - Application entry point
import { Connection } from 'amqplib/connection';
async function main() {
try {
// Establish RabbitMQ connection
const connection = await connect({
host: 'localhost',
port: 5672,
username: 'admin',
password: process.env.RABBITMQ_PASSWORD,
});
console.log('Connected to RabbitMQ');
// Initialize services
const sagaManager = new SagaManager(connection);
await sagaManager.initialize();
const eventStore = new EventStore(connection);
await eventStore.initialize();
const eventBus = new EventBus(eventStore.channel);
const queryService = new QueryService(eventStore);
const commandService = new CommandService(eventStore);
// Create sample saga
const orderSaga = {
id: 'order-saga-1',
name: 'Order Processing Saga',
timeout: 30000,
steps: [
{
id: 'step-1',
name: 'Validate Order Data',
execute: async () => {
// Validate order data
return { valid: true, orderId: 'order-123' };
},
compensate: async () => {
console.log('Compensating order creation');
}
},
{
'id': 'step-2',
name: 'Process Payment',
execute: async () => {
// Process payment
return { success: true, paymentId: 'pay-456' };
},
compensate: async () => {
console.log('Refunding payment');
}
},
{
'id: 'step-3',
'name: 'Create Order',
execute: async () => {
// Create order
return { orderId: 'order-123', status: 'created' };
},
compensate: async () => {
console.log('Cancel order');
}
},
{
'id: 'step-4',
'name: 'Send Confirmation',
execute: async () => {
// Send confirmation
console.log('Order confirmed');
}
}
]
};
sagaManager.createSaga(orderSaga);
// Handle incoming events
eventBus.subscribe('order.*', async (event) => {
console.log('Received order event:', event.type, event.data);
// Handle business logic here
});
// Execute sample saga
const orderData = {
id: 'order-123',
customer: 'John Doe',
items: [
{ id: 'item-1', name: 'Product 1', price: 100 },
{ id: 'message-id', name: 'Message ID: message-id-123', price: 50 }
],
total: 150
};
await sagaManager.executeSaga(orderSaga, orderData);
} catch (error) {
console.error('Application startup failed:', error);
process.exit(1);
}
}
// 8. Environment Configuration for Different Environments
// config/development.ts
export const developmentConfig = {
connection: {
host: 'localhost',
port: 5672,
username: 'admin',
password: 'dev_password',
frameMax: 4096,
heartbeat: 30
timeout: 10000
},
eventBus: {
retryAttempts: 3,
retryDelay: 1000,
deadLetterExchange: 'exchange.dlx',
deadLetterQueue: 'queue.dlq'
},
sagas: {
timeout: 30000,
maxConcurrent: 10
}
};
// config/production.ts
export const productionConfig = {
connection: {
urls: [
'amqps://rabbitmq-main:5672',
'amqps://rabbitmq-backup:5673'
],
credentials: {
username: 'prod-user',
password: process.env.RABBITMQ_PASSWORD,
heartbeat: 20,
timeout: 5000
}
},
eventBus: {
retryAttempts: 5,
retryDelay: 5000,
deadLetterExchange: 'exchange.dlx.production',
deadLetterQueue: 'queue.dlq.production'
},
sagas: {
timeout: 60000,
maxConcurrent: 50
},
monitoring: {
metricsEnabled: true,
tracingEnabled: true,
loggingLevel: 'warn'
}
};
// config/test.ts
export const testConfig = {
connection: {
host: 'localhost',
port: 5672,
username: 'test',
password: 'test_password',
frameMax: 1024
},
eventBus: {
retryAttempts: 1,
retryDelay: 500
}
};