🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples AWS EventBridge
Exemples AWS EventBridge incluant les bus d'événements, règles, cibles, registre de schémas, événements personnalisés et routage d'événements inter-comptes pour l'architecture serverless event-driven
💻 EventBridge Hello World - Bus d'Événements de Base javascript
Exemple simple AWS EventBridge avec bus d'événements personnalisé, règles d'événements et cibles Lambda
// AWS EventBridge Hello World - Basic Event Bus Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-lambda
const { EventBridgeClient, PutEventsCommand, CreateEventBusCommand } = require('@aws-sdk/client-eventbridge');
const { LambdaClient, AddPermissionCommand } = require('@aws-sdk/client-lambda');
// AWS Configuration
const eventbridge = new EventBridgeClient({ region: 'us-east-1' });
const lambda = new LambdaClient({ region: 'us-east-1' });
// Create custom event bus
async function createCustomEventBus() {
const params = {
Name: 'my-app-events',
Description: 'Event bus for my application events',
};
try {
const command = new CreateEventBusCommand(params);
const result = await eventbridge.send(command);
console.log('✅ Custom event bus created:', result.EventBusArn);
return result.EventBusArn;
} catch (error) {
if (error.name === 'ResourceAlreadyExistsException') {
console.log('ℹ️ Event bus already exists');
return 'arn:aws:events:us-east-1:123456789012:event-bus/my-app-events';
}
throw error;
}
}
// Lambda function handler (deployment separate from this code)
const lambdaHandler = `
exports.handler = async (event) => {
console.log('Event received:', JSON.stringify(event, null, 2));
for (const record of event.Records) {
console.log('Event detail:', record.detail);
console.log('Event source:', record.source);
console.log('Event type:', record['detail-type']);
// Process event based on type
switch (record['detail-type']) {
case 'UserCreated':
console.log(`Processing new user: ${record.detail.userId}`);
// Add user to database, send welcome email, etc.
break;
case 'OrderPlaced':
console.log(`Processing order: ${record.detail.orderId}`);
// Process order, update inventory, etc.
break;
}
}
return {
statusCode: 200,
body: JSON.stringify({ message: 'Events processed successfully' }),
};
};
`;
// Grant EventBridge permission to invoke Lambda
async function grantEventBridgePermission(lambdaArn) {
const params = {
FunctionName: lambdaArn,
StatementId: 'EventBridgeInvokePermission',
Action: 'lambda:InvokeFunction',
Principal: 'events.amazonaws.com',
};
try {
const command = new AddPermissionCommand(params);
await lambda.send(command);
console.log('✅ EventBridge permission granted to Lambda');
} catch (error) {
if (error.name === 'ResourceConflictException') {
console.log('ℹ️ Permission already exists');
} else {
throw error;
}
}
}
// Send custom events to EventBridge
async function sendCustomEvents() {
const events = [
{
Source: 'my.app.users',
DetailType: 'UserCreated',
Detail: JSON.stringify({
userId: 'user123',
email: '[email protected]',
name: 'John Doe',
timestamp: new Date().toISOString(),
}),
EventBusName: 'my-app-events',
},
{
Source: 'my.app.orders',
DetailType: 'OrderPlaced',
Detail: JSON.stringify({
orderId: 'order456',
userId: 'user123',
total: 99.99,
items: [
{ productId: 'prod001', quantity: 2, price: 49.99 },
],
timestamp: new Date().toISOString(),
}),
EventBusName: 'my-app-events',
},
{
Source: 'my.app.payments',
DetailType: 'PaymentProcessed',
Detail: JSON.stringify({
paymentId: 'pay789',
orderId: 'order456',
amount: 99.99,
status: 'completed',
method: 'credit_card',
timestamp: new Date().toISOString(),
}),
EventBusName: 'my-app-events',
},
];
console.log('📤 Sending events to EventBridge...');
for (const event of events) {
const params = {
Entries: [event],
};
try {
const command = new PutEventsCommand(params);
const result = await eventbridge.send(command);
if (result.SuccessfulEntries && result.SuccessfulEntries.length > 0) {
const entry = result.SuccessfulEntries[0];
console.log(`✅ Event sent: ${event.DetailType}`);
console.log(` Event ID: ${entry.EventId}`);
}
if (result.FailedEntries && result.FailedEntries.length > 0) {
console.error('❌ Failed to send event:', result.FailedEntries[0]);
}
} catch (error) {
console.error('Error sending event:', error);
}
}
}
// EventBridge Rule Configuration (CloudFormation/Terraform)
const eventRuleTemplate = `
# CloudFormation Template for EventBridge Rule
Resources:
UserEventsRule:
Type: AWS:: Events:: Rule
Properties:
Name: user- events - rule
Description: 'Process user-related events'
EventBusName: my - app - events
EventPattern:
source:
- 'my.app.users'
detail - type:
- 'UserCreated'
- 'UserUpdated'
- 'UserDeleted'
State: ENABLED
Targets:
- Arn: !GetAtt UserEventProcessorFunction.Arn
Id: 'UserEventProcessorTarget'
OrderEventsRule:
Type: AWS:: Events:: Rule
Properties:
Name: order - events - rule
Description: 'Process order-related events'
EventBusName: my - app - events
EventPattern:
source:
- 'my.app.orders'
detail - type:
- 'OrderPlaced'
- 'OrderShipped'
- 'OrderDelivered'
State: ENABLED
Targets:
- Arn: !GetAtt OrderEventProcessorFunction.Arn
Id: 'OrderEventProcessorTarget'
`;
// Main function to demonstrate EventBridge setup
async function main() {
console.log('=== AWS EventBridge Hello World ===');
try {
// Step 1: Create custom event bus
console.log('\n1. Creating custom event bus...');
const eventBusArn = await createCustomEventBus();
// Step 2: Note: Lambda functions and rules would be deployed via infrastructure as code
console.log('\n2. Lambda handler code:');
console.log(lambdaHandler);
console.log('\n3. CloudFormation template for rules:');
console.log(eventRuleTemplate);
// Step 3: Send custom events
console.log('\n4. Sending custom events...');
await sendCustomEvents();
console.log('\n✅ EventBridge Hello World completed!');
console.log('\nNext steps:');
console.log('- Deploy Lambda functions with the provided handler');
console.log('- Deploy EventBridge rules using the CloudFormation template');
console.log('- Set up monitoring and logging for event processing');
} catch(error) {
console.error('Error:', error);
}
}
// Example event patterns for filtering
const eventPatterns = {
// Filter by user events
userEvents: {
source: ['my.app.users'],
'detail-type': ['UserCreated', 'UserUpdated', 'UserDeleted'],
},
// Filter by high-value orders
highValueOrders: {
source: ['my.app.orders'],
'detail-type': ['OrderPlaced'],
detail: {
total: [{ numeric: ['>=', 1000] }],
},
},
// Filter by failed payments
failedPayments: {
source: ['my.app.payments'],
'detail-type': ['PaymentProcessed'],
detail: {
status: ['failed'],
},
},
// Filter by specific time range
recentEvents: {
time: [{ prefix: '2025-12' }],
},
// Complex pattern: premium users with high-value orders
premiumUserOrders: {
source: ['my.app.orders'],
'detail-type': ['OrderPlaced'],
detail: {
userId: [{ prefix: 'premium_' }],
total: [{ numeric: ['>=', 500] }],
},
},
};
// Console log event patterns for reference
console.log('\n📋 Event Pattern Examples:');
Object.entries(eventPatterns).forEach(([name, pattern]) => {
console.log(`\n${name}:`);
console.log(JSON.stringify(pattern, null, 2));
});
main().catch(err => {
console.error('Error:', err);
});
💻 Registre de Schémas EventBridge javascript
Utiliser le Registre de Schémas EventBridge pour découvrir, créer et gérer des schémas d'événements pour le traitement d'événements avec sécurité de types
// AWS EventBridge Schema Registry Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-schemas @aws-sdk/client-lambda
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const { SchemasClient, CreateSchemaCommand, ListSchemasCommand, DescribeSchemaCommand, SearchSchemasCommand } = require('@aws-sdk/client-schemas');
const eventbridge = new EventBridgeClient({ region: 'us-east-1' });
const schemas = new SchemasClient({ region: 'us-east-1' });
// Schema definitions for different event types
const eventSchemas = {
UserCreated: {
Type: 'OpenApi3',
Schema: `{
"openapi": "3.0.0",
"info": {
"title": "UserCreated Event",
"version": "1.0.0",
"description": "Event emitted when a new user is created"
},
"components": {
"schemas": {
"UserCreated": {
"type": "object",
"required": ["userId", "email", "createdAt"],
"properties": {
"userId": {
"type": "string",
"description": "Unique identifier for the user"
},
"email": {
"type": "string",
"format": "email",
"description": "User's email address"
},
"name": {
"type": "string",
"description": "User's full name"
},
"role": {
"type": "string",
"enum": ["user", "admin", "moderator"],
"default": "user"
},
"createdAt": {
"type": "string",
"format": "date-time",
"description": "Timestamp when user was created"
},
"metadata": {
"type": "object",
"properties": {
"source": { "type": "string" },
"ipAddress": { "type": "string" },
"userAgent": { "type": "string" }
}
}
}
}
}
}
}`,
},
OrderPlaced: {
Type: 'OpenApi3',
Schema: `{
"openapi": "3.0.0",
"info": {
"title": "OrderPlaced Event",
"version": "1.0.0",
"description": "Event emitted when a customer places an order"
},
"components": {
"schemas": {
"OrderPlaced": {
"type": "object",
"required": ["orderId", "userId", "total", "items", "createdAt"],
"properties": {
"orderId": {
"type": "string",
"description": "Unique identifier for the order"
},
"userId": {
"type": "string",
"description": "Customer's user ID"
},
"total": {
"type": "number",
"format": "float",
"minimum": 0,
"description": "Total order amount"
},
"currency": {
"type": "string",
"pattern": "^[A-Z]{3}$",
"default": "USD"
},
"items": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": ["productId", "quantity", "price"],
"properties": {
"productId": { "type": "string" },
"quantity": { "type": "integer", "minimum": 1 },
"price": { "type": "number", "minimum": 0 }
}
}
},
"shippingAddress": {
"type": "object",
"properties": {
"street": { "type": "string" },
"city": { "type": "string" },
"state": { "type": "string" },
"zipCode": { "type": "string" },
"country": { "type": "string" }
}
},
"createdAt": {
"type": "string",
"format": "date-time"
}
}
}
}
}
}`,
},
PaymentProcessed: {
Type: 'OpenApi3',
Schema: `{
"openapi": "3.0.0",
"info": {
"title": "PaymentProcessed Event",
"version": "1.0.0",
"description": "Event emitted when a payment is processed"
},
"components": {
"schemas": {
"PaymentProcessed": {
"type": "object",
"required": ["paymentId", "orderId", "amount", "status", "processedAt"],
"properties": {
"paymentId": {
"type": "string",
"description": "Unique identifier for the payment"
},
"orderId": {
"type": "string",
"description": "Associated order ID"
},
"amount": {
"type": "number",
"format": "float",
"minimum": 0
},
"currency": {
"type": "string",
"pattern": "^[A-Z]{3}$"
},
"status": {
"type": "string",
"enum": ["completed", "failed", "pending", "refunded"]
},
"method": {
"type": "string",
"enum": ["credit_card", "debit_card", "paypal", "bank_transfer"]
},
"transactionId": {
"type": "string",
"description": "External transaction ID from payment processor"
},
"failureReason": {
"type": "string",
"description": "Reason for payment failure (if status is failed)"
},
"processedAt": {
"type": "string",
"format": "date-time"
}
}
}
}
}
}`,
},
};
// Create schema in EventBridge Schema Registry
async function createSchema(name, schemaDefinition) {
const params = {
RegistryName: 'default', // Use default registry
SchemaName: name,
Type: schemaDefinition.Type,
Content: schemaDefinition.Schema,
Description: `${name} event schema for my application`,
};
try {
const command = new CreateSchemaCommand(params);
const result = await schemas.send(command);
console.log(`✅ Schema '${name}' created successfully`);
console.log(` Schema ARN: ${result.SchemaArn}`);
console.log(` Schema Version: ${result.SchemaVersion}`);
return result;
} catch (error) {
if (error.name === 'ConflictException') {
console.log(`ℹ️ Schema '${name}' already exists`);
return { SchemaArn: `arn:aws:schemas:us-east-1:123456789012:schema/${name}` };
}
throw error;
}
}
// List all schemas in registry
async function listSchemas() {
const params = {
RegistryName: 'default',
Limit: 50,
};
try {
const command = new ListSchemasCommand(params);
const result = await schemas.send(command);
console.log('\n📋 Available Schemas:');
result.Schemas.forEach(schema => {
console.log(` - ${schema.SchemaName} (v${schema.Version}): ${schema.Description || 'No description'}`);
});
return result.Schemas;
} catch (error) {
console.error('Error listing schemas:', error);
return [];
}
}
// Get detailed schema information
async function describeSchema(schemaName) {
const params = {
RegistryName: 'default',
SchemaName: schemaName,
};
try {
const command = new DescribeSchemaCommand(params);
const result = await schemas.send(command);
console.log(`\n📖 Schema Details: ${schemaName}`);
console.log(` Type: ${result.Type}`);
console.log(` Version: ${result.SchemaVersion}`);
console.log(` Created: ${result.CreatedAt}`);
console.log(` Modified: ${result.LastModified}`);
console.log(` Content Length: ${result.Content.length} characters`);
return result;
} catch (error) {
console.error(`Error describing schema '${schemaName}':`, error);
return null;
}
}
// Search schemas by keywords
async function searchSchemas(keyword) {
const params = {
RegistryName: 'default',
Keywords: keyword,
Limit: 10,
};
try {
const command = new SearchSchemasCommand(params);
const result = await schemas.send(command);
console.log(`\n🔍 Search Results for '${keyword}':`);
result.Schemas.forEach(schema => {
console.log(` - ${schema.SchemaName}: ${schema.Description}`);
});
return result.Schemas;
} catch (error) {
console.error('Error searching schemas:', error);
return [];
}
}
// Validate event against schema
function validateEvent(eventData, schemaName) {
console.log(`\n🔍 Validating event against '${schemaName}' schema`);
// In a real application, you would use a JSON schema validator
// Here we're demonstrating the concept
try {
const event = JSON.parse(eventData);
// Basic validation example
if (schemaName === 'UserCreated') {
const required = ['userId', 'email', 'createdAt'];
const missing = required.filter(field => !event[field]);
if (missing.length > 0) {
console.log(`❌ Validation failed: Missing required fields: ${missing.join(', ')}`);
return false;
}
// Email format validation
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(event.email)) {
console.log('❌ Validation failed: Invalid email format');
return false;
}
}
console.log('✅ Event validation passed');
return true;
} catch (error) {
console.log(`❌ Validation failed: ${error.message}`);
return false;
}
}
// Generate TypeScript types from schema
function generateTypeScriptTypes() {
const types = `// Auto-generated TypeScript types for EventBridge events
export interface UserCreated {
userId: string;
email: string;
name?: string;
role?: 'user' | 'admin' | 'moderator';
createdAt: string;
metadata?: {
source?: string;
ipAddress?: string;
userAgent?: string;
};
}
export interface OrderItem {
productId: string;
quantity: number;
price: number;
}
export interface OrderPlaced {
orderId: string;
userId: string;
total: number;
currency?: string;
items: OrderItem[];
shippingAddress?: {
street?: string;
city?: string;
state?: string;
zipCode?: string;
country?: string;
};
createdAt: string;
}
export interface PaymentProcessed {
paymentId: string;
orderId: string;
amount: number;
currency: string;
status: 'completed' | 'failed' | 'pending' | 'refunded';
method?: 'credit_card' | 'debit_card' | 'paypal' | 'bank_transfer';
transactionId?: string;
failureReason?: string;
processedAt: string;
}
// Union type for all events
export type AppEvent = UserCreated | OrderPlaced | PaymentProcessed;
// Event type discriminator
export type EventType = 'UserCreated' | 'OrderPlaced' | 'PaymentProcessed';
`;
console.log('\n📝 Generated TypeScript Types:');
console.log(types);
return types;
}
// Code generator for Lambda handlers with type safety
function generateTypedLambdaHandler() {
const handler = `// Type-safe Lambda handler with schema validation
import { AppEvent, UserCreated, OrderPlaced, PaymentProcessed } from './event-types';
exports.handler = async (event: any) => {
console.log('Processing EventBridge events...');
for (const record of event.Records) {
const eventType = record['detail-type'];
const eventData = record.detail;
try {
switch (eventType) {
case 'UserCreated':
await handleUserCreated(eventData as UserCreated);
break;
case 'OrderPlaced':
await handleOrderPlaced(eventData as OrderPlaced);
break;
case 'PaymentProcessed':
await handlePaymentProcessed(eventData as PaymentProcessed);
break;
default:
console.warn(`Unknown event type: ${eventType}`);
}
} catch (error) {
console.error(`Error processing event ${eventType}:`, error);
// Consider implementing dead-letter queue for failed events
}
}
};
async function handleUserCreated(user: UserCreated) {
console.log(`Processing new user: ${user.userId}`);
// Type-safe access to user properties
console.log(`Email: ${user.email}`);
if (user.metadata?.source) {
console.log(`Source: ${user.metadata.source}`);
}
// Add user to database, send welcome email, etc.
}
async function handleOrderPlaced(order: OrderPlaced) {
console.log(`Processing order: ${order.orderId}`);
console.log(`Total: ${order.total} ${order.currency || 'USD'}`);
// Process order, update inventory, calculate shipping, etc.
}
async function handlePaymentProcessed(payment: PaymentProcessed) {
console.log(`Processing payment: ${payment.paymentId}`);
console.log(`Status: ${payment.status}`);
// Update order status, send receipt, handle failures, etc.
}
`;
console.log('\n🔧 Generated Type-Safe Lambda Handler:');
console.log(handler);
return handler;
}
// Main function to demonstrate schema registry
async function main() {
console.log('=== AWS EventBridge Schema Registry Example ===');
try {
// Step 1: Create schemas
console.log('\n1. Creating event schemas...');
for (const [schemaName, schemaDef] of Object.entries(eventSchemas)) {
await createSchema(schemaName, schemaDef);
}
// Step 2: List all schemas
console.log('\n2. Listing available schemas...');
await listSchemas();
// Step 3: Describe specific schema
console.log('\n3. Getting schema details...');
await describeSchema('UserCreated');
// Step 4: Search schemas
console.log('\n4. Searching schemas...');
await searchSchemas('User');
await searchSchemas('Payment');
// Step 5: Validate events
console.log('\n5. Validating events...');
const validUserEvent = JSON.stringify({
userId: 'user123',
email: '[email protected]',
name: 'John Doe',
createdAt: new Date().toISOString(),
});
const invalidUserEvent = JSON.stringify({
userId: 'user123',
// Missing required email field
createdAt: new Date().toISOString(),
});
validateEvent(validUserEvent, 'UserCreated');
validateEvent(invalidUserEvent, 'UserCreated');
// Step 6: Generate TypeScript types
console.log('\n6. Generating TypeScript types...');
generateTypeScriptTypes();
// Step 7: Generate typed Lambda handler
console.log('\n7. Generating type-safe Lambda handler...');
generateTypedLambdaHandler();
console.log('\n✅ Schema Registry example completed!');
console.log('\nBenefits of Schema Registry:');
console.log('- Type safety for event producers and consumers');
console.log('- Automatic code generation');
console.log('- Schema discovery and documentation');
console.log('- Validation and enforcement of event contracts');
} catch (error) {
console.error('Error:', error);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Pipes EventBridge javascript
Construire des pipelines event-driven avec EventBridge Pipes pour l'intégration point-à-point avec filtrage, enrichissement et transformation
// AWS EventBridge Pipes Example
// Install: npm install @aws-sdk/client-eventbridge-pipes @aws-sdk/client-sqs @aws-sdk/client-lambda
const { EventBridgePipesClient, CreatePipeCommand, UpdatePipeCommand, DeletePipeCommand, ListPipesCommand, DescribePipeCommand } = require('@aws-sdk/client-eventbridge-pipes');
const { SQSClient, CreateQueueCommand, GetQueueAttributesCommand } = require('@aws-sdk/client-sqs');
const { LambdaClient, CreateFunctionCommand } = require('@aws-sdk/client-lambda');
const pipesClient = new EventBridgePipesClient({ region: 'us-east-1' });
const sqs = new SQSClient({ region: 'us-east-1' });
const lambda = new LambdaClient({ region: 'us-east-1' });
// Example 1: Order Processing Pipeline
// SQS -> EventBridge Pipe -> Lambda -> EventBridge
// Create source SQS queue for order events
async function createOrderQueue() {
const params = {
QueueName: 'order-events-queue',
Attributes: {
VisibilityTimeout: '300', // 5 minutes
MessageRetentionPeriod: '1209600', // 14 days
DelaySeconds: '0',
ReceiveMessageWaitTimeSeconds: '20', // Long polling
},
};
try {
const command = new CreateQueueCommand(params);
const result = await sqs.send(command);
console.log('✅ Order queue created:', result.QueueUrl);
return result.QueueUrl;
} catch (error) {
if (error.name === 'QueueAlreadyExists') {
console.log('ℹ️ Order queue already exists');
return 'https://sqs.us-east-1.amazonaws.com/123456789012/order-events-queue';
}
throw error;
}
}
// Lambda function for order processing
const orderProcessorLambda = {
FunctionName: 'order-processor',
Runtime: 'nodejs18.x',
Role: 'arn:aws:iam::123456789012:role/lambda-execution-role',
Handler: 'index.handler',
Code: {
ZipFile: Buffer.from(`
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const eventbridge = new EventBridgeClient();
exports.handler = async (event) => {
console.log('Processing order events from SQS:', event);
for (const record of event.Records) {
const orderEvent = JSON.parse(record.body);
console.log('Processing order:', orderEvent);
// Transform and enrich order data
const enrichedEvent = {
orderId: orderEvent.orderId,
userId: orderEvent.userId,
total: orderEvent.total,
items: orderEvent.items,
processedAt: new Date().toISOString(),
processorId: 'order-service-v1',
region: process.env.AWS_REGION,
// Add enrichment data
customerTier: await getCustomerTier(orderEvent.userId),
inventoryStatus: await checkInventory(orderEvent.items),
};
// Send to EventBridge for downstream processing
await eventbridge.send(new PutEventsCommand({
Entries: [{
Source: 'order.processing',
DetailType: 'OrderProcessed',
Detail: JSON.stringify(enrichedEvent),
EventBusName: 'default'
}]
}));
}
return {
statusCode: 200,
body: JSON.stringify({ processed: event.Records.length });
};
};
async function getCustomerTier(userId) {
// Mock customer tier lookup
const tiers = ['bronze', 'silver', 'gold', 'platinum'];
return tiers[Math.floor(Math.random() * tiers.length)];
}
async function checkInventory(items) {
// Mock inventory check
return 'available';
}
`).toString('base64'),
},
Environment: {
Variables: {
LOG_LEVEL: 'INFO',
EVENT_BUS: 'default',
},
},
};
// Create EventBridge Pipe for order processing
async function createOrderProcessingPipe(sourceQueueUrl, targetLambdaArn) {
const pipeName = 'order-processing-pipe';
const params = {
Name: pipeName,
Description: 'Process order events from SQS and enrich with customer data',
RoleArn: 'arn:aws:iam::123456789012:role/EventBridgePipesExecutionRole',
Source: sourceQueueUrl,
Target: targetLambdaArn,
SourceParameters: {
FilterCriteria: {
Filters: [
{
Pattern: JSON.stringify({
eventType: ['order.created', 'order.updated'],
}),
},
],
},
SqsQueueParameters: {
BatchSize: 10,
MaximumBatchingWindowInSeconds: 30,
},
},
TargetParameters: {
InputTemplate: ` + '`' + `{ "order": <aws.events.json.jsonParse($.body)>, "source": "sqs", "timestamp": <aws.events.schedulerScheduledTime> }` + '`' + `,
EventBridgeEventParameters: {
DetailType: 'OrderProcessingStarted',
Source: 'eventbridge.pipes',
},
},
LogConfiguration: {
CloudwatchLogsLogDestination: {
LogGroupArn: 'arn:aws:logs:us-east-1:123456789012:log-group:/aws/eventbridge/pipes/order-processing-pipe',
},
LogLevel: 'INFO',
IncludeExecutionData: ['ALL'],
},
Enrichment: 'arn:aws:lambda:us-east-1:123456789012:function:customer-enrichment',
};
try {
const command = new CreatePipeCommand(params);
const result = await pipesClient.send(command);
console.log(`✅ Pipe '${pipeName}' created successfully`);
console.log(` ARN: ${result.Arn}`);
console.log(` Created: ${result.CreateTime}`);
return result;
} catch (error) {
if (error.name === 'ConflictException') {
console.log(`ℹ️ Pipe '${pipeName}' already exists`);
return { Arn: `arn:aws:pipes:us-east-1:123456789012:pipe/${pipeName}` };
}
throw error;
}
}
// Example 2: Real-time Data Pipeline
// Kinesis -> EventBridge Pipe -> Transformation -> EventBridge -> Multiple Targets
// Create EventBridge Pipe for real-time analytics
async function createAnalyticsPipe() {
const pipeName = 'real-time-analytics-pipe';
const params = {
Name: pipeName,
Description: 'Process real-time analytics events from Kinesis',
RoleArn: 'arn:aws:iam::123456789012:role/EventBridgePipesExecutionRole',
Source: 'arn:aws:kinesis:us-east-1:123456789012:stream/analytics-events',
Target: 'arn:aws:events:us-east-1:123456789012:event-bus/analytics-bus',
SourceParameters: {
FilterCriteria: {
Filters: [
{
Pattern: JSON.stringify({
eventType: ['user.action', 'page.view', 'click.event'],
}),
},
],
},
KinesisStreamParameters: {
BatchSize: 100,
MaximumBatchingWindowInSeconds: 5,
StartingPosition: 'LATEST',
DeadLetterConfig: {
Arn: 'arn:aws:sqs:us-east-1:123456789012:queue/analytics-dlq',
},
},
},
TargetParameters: {
InputTransformer: {
InputPathsMap: {
eventData: '$.detail',
timestamp: '$.time',
region: '$.region',
},
InputTemplate: ` + '`' + `{
"analyticEvent": <aws.events.json.jsonParse($.eventData)>,
"processedAt": <aws.events.schedulerScheduledTime>,
"sourceRegion": <aws.events.json.toString($.region)>,
"pipelineVersion": "v1.0"
}` + '`' + `,
},
EventBridgeEventParameters: {
DetailType: 'AnalyticsEventProcessed',
Source: 'analytics.pipeline',
},
},
LogConfiguration: {
CloudwatchLogsLogDestination: {
LogGroupArn: 'arn:aws:logs:us-east-1:123456789012:log-group:/aws/eventbridge/pipes/analytics-pipe',
},
LogLevel: 'DEBUG',
},
};
try {
const command = new CreatePipeCommand(params);
const result = await pipesClient.send(command);
console.log(`✅ Analytics pipe '${pipeName}' created successfully`);
return result;
} catch (error) {
console.error(`Error creating analytics pipe: ${error}`);
return null;
}
}
// List all pipes
async function listPipes() {
const params = {
Limit: 50,
};
try {
const command = new ListPipesCommand(params);
const result = await pipesClient.send(command);
console.log('\n📋 Available EventBridge Pipes:');
result.Pipes.forEach(pipe => {
console.log(` - ${pipe.Name}: ${pipe.Description || 'No description'}`);
console.log(` State: ${pipe.CurrentState}`);
console.log(` Created: ${pipe.CreateTime}`);
if (pipe.LastModifiedTime) {
console.log(` Modified: ${pipe.LastModifiedTime}`);
}
console.log('');
});
return result.Pipes;
} catch (error) {
console.error('Error listing pipes:', error);
return [];
}
}
// Get detailed pipe information
async function describePipe(pipeName) {
const params = {
Name: pipeName,
};
try {
const command = new DescribePipeCommand(params);
const result = await pipesClient.send(command);
console.log(`\n📖 Pipe Details: ${pipeName}`);
console.log(` ARN: ${result.Arn}`);
console.log(` Description: ${result.Description}`);
console.log(` State: ${result.CurrentState}`);
console.log(` Source: ${result.Source}`);
console.log(` Target: ${result.Target}`);
console.log(` Role: ${result.RoleArn}`);
if (result.DesiredState && result.DesiredState !== result.CurrentState) {
console.log(` Desired State: ${result.DesiredState}`);
}
return result;
} catch (error) {
console.error(`Error describing pipe '${pipeName}':`, error);
return null;
}
}
// Update pipe configuration
async function updatePipe(pipeName, updates) {
const params = {
Name: pipeName,
...updates,
};
try {
const command = new UpdatePipeCommand(params);
const result = await pipesClient.send(command);
console.log(`✅ Pipe '${pipeName}' updated successfully`);
console.log(` New desired state: ${result.DesiredState}`);
return result;
} catch (error) {
console.error(`Error updating pipe '${pipeName}':`, error);
return null;
}
}
// Monitor pipe performance
function createPipeMonitoring() {
const monitoring = `// EventBridge Pipe Monitoring with CloudWatch
const { CloudWatchClient, PutMetricDataCommand } = require('@aws-sdk/client-cloudwatch');
const cloudwatch = new CloudWatchClient();
// Custom metrics for pipe monitoring
async function publishPipeMetrics(pipeName, metrics) {
const params = {
Namespace: 'EventBridge/Pipes',
MetricData: [
{
MetricName: 'EventsProcessed',
Dimensions: [
{ Name: 'PipeName', Value: pipeName },
],
Value: metrics.eventsProcessed,
Unit: 'Count',
},
{
MetricName: 'ProcessingLatency',
Dimensions: [
{ Name: 'PipeName', Value: pipeName },
],
Value: metrics.processingLatency,
Unit: 'Milliseconds',
},
{
MetricName: 'Errors',
Dimensions: [
{ Name: 'PipeName', Value: pipeName },
],
Value: metrics.errors,
Unit: 'Count',
},
],
};
await cloudwatch.send(new PutMetricDataCommand(params));
console.log(`Published metrics for pipe: ${pipeName}`);
}
// Lambda function to monitor pipe health
exports.pipeMonitorHandler = async (event) => {
// Analyze pipe performance and alert on issues
for (const record of event.Records) {
const { pipeName, metrics } = JSON.parse(record.body);
if (metrics.errors > 10) {
// Send alert for high error rate
await sendAlert(pipeName, 'High error rate detected');
}
if (metrics.processingLatency > 5000) {
// Send alert for high latency
await sendAlert(pipeName, 'High processing latency detected');
}
}
};
async function sendAlert(pipeName, message) {
console.log(`ALERT [${pipeName}]: ${message}`);
// Integrate with SNS, PagerDuty, Slack, etc.
}`;
console.log('\n📊 Pipe Monitoring Code:');
console.log(monitoring);
return monitoring;
}
// CloudFormation template for EventBridge Pipes
const pipesCloudFormation = `Resources:
# EventBridge Pipe Execution Role
PipesExecutionRole:
Type: AWS:: IAM:: Role
Properties:
RoleName: EventBridgePipesExecutionRole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: pipes.amazonaws.com
Action: sts: AssumeRole
Policies:
- PolicyName: PipesExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- sqs: ReceiveMessage
- sqs: DeleteMessage
- sqs: GetQueueAttributes
Resource: '*'
- Effect: Allow
Action:
- lambda: InvokeFunction
Resource: '*'
- Effect: Allow
Action:
- events: PutEvents
Resource: '*'
# Order Processing Pipe
OrderProcessingPipe:
Type: AWS:: Events:: Pipe
Properties:
Name: order - processing - pipe
Description: Process order events with enrichment
RoleArn: !GetAtt PipesExecutionRole.Arn
Source: !GetAtt OrderQueue.Arn
Target: !GetAtt OrderProcessorFunction.Arn
SourceParameters:
FilterCriteria:
Filters:
- Pattern: '{ "eventType": ["order.created"] }'
SqsQueueParameters:
BatchSize: 10
MaximumBatchingWindowInSeconds: 30
TargetParameters:
InputTemplate: '{ "order": <aws.events.json.jsonParse($.body)>, "processedAt": <aws.events.schedulerScheduledTime> }'
EventBridgeEventParameters:
DetailType: 'OrderProcessed'
Source: 'order.service'
LogConfiguration:
LogLevel: INFO
CloudwatchLogsLogDestination:
LogGroupArn: !GetAtt PipesLogGroup.Arn
# Source SQS Queue
OrderQueue:
Type: AWS:: SQS:: Queue
Properties:
QueueName: order - events - queue
VisibilityTimeout: 300
MessageRetentionPeriod: 1209600
# Target Lambda Function
OrderProcessorFunction:
Type: AWS:: Lambda:: Function
Properties:
FunctionName: order - processor
Runtime: nodejs18.x
Handler: index.handler
Role: !GetAtt LambdaExecutionRole.Arn
Code:
ZipFile: |
exports.handler = async (event) => {
console.log('Processing orders:', event);
// Process orders here
return { statusCode: 200 };
};
# Log Group for Pipes
PipesLogGroup:
Type: AWS:: Logs:: LogGroup
Properties:
LogGroupName: /aws/eventbridge / pipes / order - processing - pipe
RetentionInDays: 14`;
console.log('\n🏗️ CloudFormation Template for EventBridge Pipes:');
console.log(pipesCloudFormation);
return pipesCloudFormation;
}
// Main function to demonstrate EventBridge Pipes
async function main() {
console.log('=== AWS EventBridge Pipes Example ===');
try {
// Step 1: Create source queue
console.log('\n1. Creating source SQS queue...');
const queueUrl = await createOrderQueue();
// Step 2: Note: Lambda functions would be deployed separately
console.log('\n2. Lambda function code for order processor:');
console.log(orderProcessorLambda);
// Step 3: Create EventBridge Pipe
console.log('\n3. Creating EventBridge Pipe...');
// await createOrderProcessingPipe(queueUrl, 'arn:aws:lambda:us-east-1:123456789012:function:order-processor');
// Step 4: Create analytics pipe
console.log('\n4. Creating analytics pipe...');
await createAnalyticsPipe();
// Step 5: List pipes
console.log('\n5. Listing all pipes...');
await listPipes();
// Step 6: Create monitoring setup
console.log('\n6. Setting up monitoring...');
createPipeMonitoring();
// Step 7: CloudFormation template
console.log('\n7. Infrastructure as Code template...');
createPipeMonitoring();
console.log('\n✅ EventBridge Pipes example completed!');
console.log('\nKey Features of EventBridge Pipes:');
console.log('- Point-to-point integration without custom code');
console.log('- Built-in filtering and transformation');
console.log('- Enrichment with Lambda functions');
console.log('- Integrated monitoring and logging');
console.log('- Support for various sources and targets');
} catch (error) {
console.error('Error:', error);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Archive et Relecture EventBridge javascript
Archiver les événements pour la conformité et le débogage, puis relire les événements pour les scénarios de test et de récupération
// AWS EventBridge Archive and Replay Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-lambda
const { EventBridgeClient, CreateArchiveCommand, DescribeArchiveCommand, UpdateArchiveCommand, ListArchivesCommand, StartReplayCommand, DescribeReplayCommand, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const eventbridge = new EventBridgeClient({ region: 'us-east-1' });
// Sample business events to archive
const businessEvents = [
{
Source: 'com.myapp.orders',
DetailType: 'OrderCreated',
Detail: JSON.stringify({
orderId: 'ORD-001',
customerId: 'CUST-001',
total: 299.99,
currency: 'USD',
items: [
{ productId: 'PROD-001', quantity: 2, price: 149.99 },
],
createdAt: '2025-12-15T10:00:00Z',
}),
},
{
Source: 'com.myapp.orders',
DetailType: 'OrderUpdated',
Detail: JSON.stringify({
orderId: 'ORD-001',
customerId: 'CUST-001',
status: 'confirmed',
updatedAt: '2025-12-15T10:05:00Z',
}),
},
{
Source: 'com.myapp.payments',
DetailType: 'PaymentProcessed',
Detail: JSON.stringify({
paymentId: 'PAY-001',
orderId: 'ORD-001',
amount: 299.99,
status: 'completed',
processedAt: '2025-12-15T10:10:00Z',
}),
},
{
Source: 'com.myapp.shipping',
DetailType: 'OrderShipped',
Detail: JSON.stringify({
orderId: 'ORD-001',
trackingNumber: 'TRK-123456789',
carrier: 'FedEx',
shippedAt: '2025-12-15T14:00:00Z',
}),
},
{
Source: 'com.myapp.orders',
DetailType: 'OrderDelivered',
Detail: JSON.stringify({
orderId: 'ORD-001',
customerId: 'CUST-001',
deliveredAt: '2025-12-16T16:30:00Z',
}),
},
];
// Send business events to EventBridge
async function sendBusinessEvents() {
console.log('📤 Sending business events to EventBridge...');
for (let i = 0; i < businessEvents.length; i++) {
const event = businessEvents[i];
const params = {
Entries: [event],
};
try {
const command = new PutEventsCommand(params);
const result = await eventbridge.send(command);
if (result.SuccessfulEntries && result.SuccessfulEntries.length > 0) {
const entry = result.SuccessfulEntries[0];
console.log(`✅ Event ${i + 1}/${businessEvents.length} sent: ${event.DetailType}`);
console.log(` Event ID: ${entry.EventId}`);
// Add small delay between events
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
console.error(`❌ Failed to send event ${i + 1}:`, error);
}
}
}
// Create an event archive
async function createEventArchive(archiveName, description, eventPattern) {
const params = {
ArchiveName: archiveName,
Description: description,
EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
EventPattern: eventPattern ? JSON.stringify(eventPattern) : undefined,
RetentionDays: 30, // Archive for 30 days
};
try {
const command = new CreateArchiveCommand(params);
const result = await eventbridge.send(command);
console.log(`✅ Archive '${archiveName}' created successfully`);
console.log(` ARN: ${result.ArchiveArn}`);
console.log(` State: ${result.State}`);
console.log(` Creation Time: ${result.CreationTime}`);
console.log(` Retention Days: ${result.RetentionDays}`);
return result;
} catch (error) {
if (error.name === 'ResourceAlreadyExistsException') {
console.log(`ℹ️ Archive '${archiveName}' already exists`);
return { ArchiveArn: `arn:aws:events:us-east-1:123456789012:archive/${archiveName}` };
}
throw error;
}
}
// Example: Create different types of archives
async function setupArchives() {
console.log('\n📦 Setting up event archives...');
// Archive 1: All order events for compliance
const orderEventsPattern = {
source: ['com.myapp.orders'],
};
await createEventArchive(
'order-events-archive',
'Archive all order events for compliance and audit',
orderEventsPattern
);
// Archive 2: High-value transactions for fraud analysis
const highValuePattern = {
source: ['com.myapp.orders', 'com.myapp.payments'],
detail: {
total: [{ numeric: ['>=', 1000] }],
},
};
await createEventArchive(
'high-value-transactions-archive',
'Archive high-value transactions for fraud analysis',
highValuePattern
);
// Archive 3: Payment events for reconciliation
const paymentEventsPattern = {
source: ['com.myapp.payments'],
};
await createEventArchive(
'payment-events-archive',
'Archive payment events for financial reconciliation',
paymentEventsPattern
);
// Archive 4: All events for disaster recovery
await createEventArchive(
'disaster-recovery-archive',
'Archive all events for disaster recovery purposes'
);
}
// List all archives
async function listArchives() {
const params = {
NamePrefix: '', // List all archives
Limit: 50,
};
try {
const command = new ListArchivesCommand(params);
const result = await eventbridge.send(command);
console.log('\n📋 Available Event Archives:');
result.Archives.forEach(archive => {
console.log(` - ${archive.ArchiveName}`);
console.log(` State: ${archive.State}`);
console.log(` Event Source: ${archive.EventSourceArn}`);
console.log(` Creation Time: ${archive.CreationTime}`);
console.log(` Retention Days: ${archive.RetentionDays}`);
if (archive.SizeBytes) {
console.log(` Size: ${(archive.SizeBytes / 1024 / 1024).toFixed(2)} MB`);
}
if (archive.EventCount) {
console.log(` Event Count: ${archive.EventCount}`);
}
console.log('');
});
return result.Archives;
} catch (error) {
console.error('Error listing archives:', error);
return [];
}
}
// Get detailed archive information
async function describeArchive(archiveName) {
const params = {
ArchiveName: archiveName,
};
try {
const command = new DescribeArchiveCommand(params);
const result = await eventbridge.send(command);
console.log(`\n📖 Archive Details: ${archiveName}`);
console.log(` ARN: ${result.ArchiveArn}`);
console.log(` Description: ${result.Description}`);
console.log(` State: ${result.State}`);
console.log(` Event Source: ${result.EventSourceArn}`);
console.log(` Creation Time: ${result.CreationTime}`);
console.log(` Retention Days: ${result.RetentionDays}`);
console.log(` Size: ${result.SizeBytes ? (result.SizeBytes / 1024 / 1024).toFixed(2) + ' MB' : 'Unknown'}`);
console.log(` Event Count: ${result.EventCount || 'Unknown'}`);
if (result.EventPattern) {
console.log(` Event Pattern: ${JSON.stringify(result.EventPattern)}`);
}
return result;
} catch (error) {
console.error(`Error describing archive '${archiveName}':`, error);
return null;
}
}
// Start replaying archived events
async function startReplay(archiveName, replayName, startTime, endTime, destination) {
const params = {
ArchiveName: archiveName,
ReplayName: replayName,
Description: `Replay events from ${archiveName} for testing`,
EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
StartTime: startTime,
EndTime: endTime,
Destination: {
Arn: destination,
},
};
try {
const command = new StartReplayCommand(params);
const result = await eventbridge.send(command);
console.log(`✅ Replay '${replayName}' started successfully`);
console.log(` Replay ARN: ${result.ReplayArn}`);
console.log(` State: ${result.State}`);
console.log(` Start Time: ${result.ReplayStartTime}`);
return result;
} catch (error) {
if (error.name === 'ResourceAlreadyExistsException') {
console.log(`ℹ️ Replay '${replayName}' already exists`);
return { ReplayArn: `arn:aws:events:us-east-1:123456789012:replay/${replayName}` };
}
throw error;
}
}
// Example replay scenarios
async function demonstrateReplayScenarios() {
console.log('\n🔄 Demonstrating replay scenarios...');
// Scenario 1: Testing new Lambda function with historical data
console.log('\n1. Testing new function with historical order events...');
const orderReplayStart = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24 hours ago
const orderReplayEnd = new Date();
await startReplay(
'order-events-archive',
'test-new-processor',
orderReplayStart,
orderReplayEnd,
'arn:aws:lambda:us-east-1:123456789012:function:test-order-processor'
);
// Scenario 2: Debugging production issues
console.log('\n2. Debugging payment processing issues...');
const paymentReplayStart = new Date(Date.now() - 2 * 60 * 60 * 1000); // 2 hours ago
const paymentReplayEnd = new Date();
await startReplay(
'payment-events-archive',
'debug-payment-issues',
paymentReplayStart,
paymentReplayEnd,
'arn:aws:lambda:us-east-1:123456789012:function:payment-debugger'
);
// Scenario 3: Rebuilding data warehouse
console.log('\n3. Rebuilding data warehouse with all events...');
const warehouseReplayStart = new Date('2025-12-01T00:00:00Z'); // Start of month
const warehouseReplayEnd = new Date();
await startReplay(
'disaster-recovery-archive',
'rebuild-data-warehouse',
warehouseReplayStart,
warehouseReplayEnd,
'arn:aws:events:us-east-1:123456789012:event-bus/data-warehouse-ingest'
);
}
// Get replay status
async function describeReplay(replayName) {
const params = {
ReplayName: replayName,
};
try {
const command = new DescribeReplayCommand(params);
const result = await eventbridge.send(command);
console.log(`\n📊 Replay Status: ${replayName}`);
console.log(` State: ${result.State}`);
console.log(` Start Time: ${result.ReplayStartTime}`);
console.log(` End Time: ${result.ReplayEndTime || 'In progress'}`);
console.log(` Events Replayable: ${result.EventSourceArn}`);
if (result.ReplayStartTime && result.ReplayEndTime) {
const duration = (new Date(result.ReplayEndTime) - new Date(result.ReplayStartTime)) / 1000;
console.log(` Duration: ${duration} seconds`);
}
return result;
} catch (error) {
console.error(`Error describing replay '${replayName}':`, error);
return null;
}
}
// Archive compliance and retention management
function createArchiveManagement() {
const management = `// Event Archive Management for Compliance
const { EventBridgeClient, UpdateArchiveCommand, DeleteArchiveCommand } = require('@aws-sdk/client-eventbridge');
const { IAMClient, GetRolePolicyCommand } = require('@aws-sdk/client-iam');
const eventbridge = new EventBridgeClient();
// Update archive retention based on compliance requirements
async function updateArchiveRetention(archiveName, retentionDays, reason) {
const params = {
ArchiveName: archiveName,
RetentionDays: retentionDays,
Description: `Updated retention for ${reason}`,
};
try {
const command = new UpdateArchiveCommand(params);
const result = await eventbridge.send(command);
console.log(`✅ Archive '${archiveName}' retention updated to ${retentionDays} days`);
console.log(` Reason: ${reason}`);
return result;
} catch (error) {
console.error(`Error updating archive retention: ${error}`);
return null;
}
}
// Check archive compliance
async function checkArchiveCompliance(archiveName) {
const requiredRetention = 2555; // 7 years for financial data
// Get archive details
const archive = await describeArchive(archiveName);
if (archive && archive.RetentionDays < requiredRetention) {
console.log(`⚠️ COMPLIANCE WARNING: Archive '${archiveName}' retention (${archive.RetentionDays} days) is below required (${requiredRetention} days)`);
// Auto-update for compliance
await updateArchiveRetention(archiveName, requiredRetention, 'Automated compliance update');
} else {
console.log(`✅ Archive '${archiveName}' is compliant`);
}
}
// Archive lifecycle management
const ARCHIVE_POLICIES = {
'order-events-archive': {
retentionDays: 2555, // 7 years for financial records
autoExtend: true,
complianceLevel: 'critical',
},
'payment-events-archive': {
retentionDays: 2555, // 7 years for payment records
autoExtend: true,
complianceLevel: 'critical',
},
'high-value-transactions-archive': {
retentionDays: 1825, // 5 years for fraud analysis
autoExtend: true,
complianceLevel: 'high',
},
'debug-events-archive': {
retentionDays: 90, // 90 days for debugging
autoExtend: false,
complianceLevel: 'low',
},
};
// Implement archive lifecycle
async function manageArchiveLifecycle() {
for (const [archiveName, policy] of Object.entries(ARCHIVE_POLICIES)) {
console.log(`\n🔄 Managing lifecycle for ${archiveName}`);
if (policy.complianceLevel === 'critical') {
await checkArchiveCompliance(archiveName);
}
// Check if archive needs to be extended
if (policy.autoExtend) {
console.log(` Auto-extend policy: Active`);
}
}
}
// CloudWatch Events for archive monitoring
const archiveMonitoring = {
// Event pattern for archive state changes
ArchiveStateChange: {
source: ['aws.events'],
'detail-type': ['EventBridge Archive State Change'],
detail: {
state: ['ENABLED', 'DISABLED', 'CREATING', 'UPDATING', 'DELETING'],
},
},
// Event pattern for replay state changes
ReplayStateChange: {
source: ['aws.events'],
'detail-type': ['EventBridge Archive Replay State Change'],
detail: {
state: ['STARTING', 'RUNNING', 'COMPLETED', 'FAILED'],
},
},
};
console.log('Archive Monitoring Event Patterns:', JSON.stringify(archiveMonitoring, null, 2));`;
console.log('\n🔒 Archive Management Code:');
console.log(management);
return management;
}
// Lambda function for archive automation
const archiveAutomationLambda = {
FunctionName: 'eventbridge-archive-manager',
Runtime: 'nodejs18.x',
Handler: 'index.handler',
Code: {
ZipFile: Buffer.from(`
const { EventBridgeClient, CreateArchiveCommand, ListArchivesCommand } = require('@aws-sdk/client-eventbridge');
const eventbridge = new EventBridgeClient();
exports.handler = async (event) => {
console.log('Archive manager triggered:', event);
// Daily archive creation for different event types
const today = new Date().toISOString().split('T')[0];
// Create daily archive for order events
await createDailyArchive(`order-events-${today}`, {
source: ['com.myapp.orders'],
});
// Create daily archive for payment events
await createDailyArchive(`payment-events-${today}`, {
source: ['com.myapp.payments'],
});
// Clean up old archives (keep last 30 days)
await cleanupOldArchives();
return {
statusCode: 200,
body: JSON.stringify({ message: 'Archive management completed' });
};
};
async function createDailyArchive(archiveName, eventPattern) {
try {
const command = new CreateArchiveCommand({
ArchiveName: archiveName,
Description: `Daily archive for ${archiveName}`,
EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
EventPattern: JSON.stringify(eventPattern),
RetentionDays: 30,
});
await eventbridge.send(command);
console.log(`Created daily archive: ${archiveName}`);
} catch (error) {
if (error.name !== 'ResourceAlreadyExistsException') {
console.error(`Error creating archive ${archiveName}:`, error);
}
}
}
async function cleanupOldArchives() {
// Implementation for cleaning up old archives
console.log('Cleaning up old archives...');
}
`).toString('base64'),
},
};
console.log('\n🤖 Archive Automation Lambda:');
console.log(archiveAutomationLambda);
// Main function
async function main() {
console.log('=== AWS EventBridge Archive and Replay Example ===');
try {
// Step 1: Send sample events
console.log('\n1. Sending business events...');
await sendBusinessEvents();
// Step 2: Wait for events to be processed
console.log('\n2. Waiting for events to be processed...');
await new Promise(resolve => setTimeout(resolve, 5000));
// Step 3: Set up archives
console.log('\n3. Setting up event archives...');
await setupArchives();
// Step 4: List archives
console.log('\n4. Listing archives...');
await listArchives();
// Step 5: Describe specific archive
console.log('\n5. Getting archive details...');
await describeArchive('order-events-archive');
// Step 6: Demonstrate replay scenarios
console.log('\n6. Setting up replay scenarios...');
await demonstrateReplayScenarios();
// Step 7: Archive management
console.log('\n7. Setting up archive management...');
createArchiveManagement();
// Step 8: Lambda automation
console.log('\n8. Archive automation Lambda configuration...');
console.log(archiveAutomationLambda);
console.log('\n✅ EventBridge Archive and Replay example completed!');
console.log('\nKey Benefits:');
console.log('- Event archival for compliance and audit');
console.log('- Historical data replay for testing');
console.log('- Disaster recovery capabilities');
console.log('- Debugging with real production events');
console.log('- Data warehouse rebuilding');
} catch (error) {
console.error('Error:', error);
}
}
main().catch(err => {
console.error('Error:', err);
});