🎯 Рекомендуемые коллекции
Балансированные коллекции примеров кода из различных категорий, которые вы можете исследовать
Примеры AWS EventBridge
Примеры AWS EventBridge включая шины событий, правила, цели, реестр схем, пользовательские события и межаккаунтную маршрутизацию событий для бессерверной событийно-ориентированной архитектуры
💻 EventBridge Hello World - Базовая Шина Событий javascript
Простой пример AWS EventBridge с пользовательской шиной событий, правилами событий и целями Lambda
// AWS EventBridge Hello World - Basic Event Bus Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-lambda
const { EventBridgeClient, PutEventsCommand, CreateEventBusCommand } = require('@aws-sdk/client-eventbridge');
const { LambdaClient, AddPermissionCommand } = require('@aws-sdk/client-lambda');
// AWS Configuration
const eventbridge = new EventBridgeClient({ region: 'us-east-1' });
const lambda = new LambdaClient({ region: 'us-east-1' });
// Create custom event bus
async function createCustomEventBus() {
const params = {
Name: 'my-app-events',
Description: 'Event bus for my application events',
};
try {
const command = new CreateEventBusCommand(params);
const result = await eventbridge.send(command);
console.log('✅ Custom event bus created:', result.EventBusArn);
return result.EventBusArn;
} catch (error) {
if (error.name === 'ResourceAlreadyExistsException') {
console.log('ℹ️ Event bus already exists');
return 'arn:aws:events:us-east-1:123456789012:event-bus/my-app-events';
}
throw error;
}
}
// Lambda function handler (deployment separate from this code)
const lambdaHandler = `
exports.handler = async (event) => {
console.log('Event received:', JSON.stringify(event, null, 2));
for (const record of event.Records) {
console.log('Event detail:', record.detail);
console.log('Event source:', record.source);
console.log('Event type:', record['detail-type']);
// Process event based on type
switch (record['detail-type']) {
case 'UserCreated':
console.log(`Processing new user: ${record.detail.userId}`);
// Add user to database, send welcome email, etc.
break;
case 'OrderPlaced':
console.log(`Processing order: ${record.detail.orderId}`);
// Process order, update inventory, etc.
break;
}
}
return {
statusCode: 200,
body: JSON.stringify({ message: 'Events processed successfully' }),
};
};
`;
// Grant EventBridge permission to invoke Lambda
async function grantEventBridgePermission(lambdaArn) {
const params = {
FunctionName: lambdaArn,
StatementId: 'EventBridgeInvokePermission',
Action: 'lambda:InvokeFunction',
Principal: 'events.amazonaws.com',
};
try {
const command = new AddPermissionCommand(params);
await lambda.send(command);
console.log('✅ EventBridge permission granted to Lambda');
} catch (error) {
if (error.name === 'ResourceConflictException') {
console.log('ℹ️ Permission already exists');
} else {
throw error;
}
}
}
// Send custom events to EventBridge
async function sendCustomEvents() {
const events = [
{
Source: 'my.app.users',
DetailType: 'UserCreated',
Detail: JSON.stringify({
userId: 'user123',
email: '[email protected]',
name: 'John Doe',
timestamp: new Date().toISOString(),
}),
EventBusName: 'my-app-events',
},
{
Source: 'my.app.orders',
DetailType: 'OrderPlaced',
Detail: JSON.stringify({
orderId: 'order456',
userId: 'user123',
total: 99.99,
items: [
{ productId: 'prod001', quantity: 2, price: 49.99 },
],
timestamp: new Date().toISOString(),
}),
EventBusName: 'my-app-events',
},
{
Source: 'my.app.payments',
DetailType: 'PaymentProcessed',
Detail: JSON.stringify({
paymentId: 'pay789',
orderId: 'order456',
amount: 99.99,
status: 'completed',
method: 'credit_card',
timestamp: new Date().toISOString(),
}),
EventBusName: 'my-app-events',
},
];
console.log('📤 Sending events to EventBridge...');
for (const event of events) {
const params = {
Entries: [event],
};
try {
const command = new PutEventsCommand(params);
const result = await eventbridge.send(command);
if (result.SuccessfulEntries && result.SuccessfulEntries.length > 0) {
const entry = result.SuccessfulEntries[0];
console.log(`✅ Event sent: ${event.DetailType}`);
console.log(` Event ID: ${entry.EventId}`);
}
if (result.FailedEntries && result.FailedEntries.length > 0) {
console.error('❌ Failed to send event:', result.FailedEntries[0]);
}
} catch (error) {
console.error('Error sending event:', error);
}
}
}
// EventBridge Rule Configuration (CloudFormation/Terraform)
const eventRuleTemplate = `
# CloudFormation Template for EventBridge Rule
Resources:
UserEventsRule:
Type: AWS:: Events:: Rule
Properties:
Name: user- events - rule
Description: 'Process user-related events'
EventBusName: my - app - events
EventPattern:
source:
- 'my.app.users'
detail - type:
- 'UserCreated'
- 'UserUpdated'
- 'UserDeleted'
State: ENABLED
Targets:
- Arn: !GetAtt UserEventProcessorFunction.Arn
Id: 'UserEventProcessorTarget'
OrderEventsRule:
Type: AWS:: Events:: Rule
Properties:
Name: order - events - rule
Description: 'Process order-related events'
EventBusName: my - app - events
EventPattern:
source:
- 'my.app.orders'
detail - type:
- 'OrderPlaced'
- 'OrderShipped'
- 'OrderDelivered'
State: ENABLED
Targets:
- Arn: !GetAtt OrderEventProcessorFunction.Arn
Id: 'OrderEventProcessorTarget'
`;
// Main function to demonstrate EventBridge setup
async function main() {
console.log('=== AWS EventBridge Hello World ===');
try {
// Step 1: Create custom event bus
console.log('\n1. Creating custom event bus...');
const eventBusArn = await createCustomEventBus();
// Step 2: Note: Lambda functions and rules would be deployed via infrastructure as code
console.log('\n2. Lambda handler code:');
console.log(lambdaHandler);
console.log('\n3. CloudFormation template for rules:');
console.log(eventRuleTemplate);
// Step 3: Send custom events
console.log('\n4. Sending custom events...');
await sendCustomEvents();
console.log('\n✅ EventBridge Hello World completed!');
console.log('\nNext steps:');
console.log('- Deploy Lambda functions with the provided handler');
console.log('- Deploy EventBridge rules using the CloudFormation template');
console.log('- Set up monitoring and logging for event processing');
} catch(error) {
console.error('Error:', error);
}
}
// Example event patterns for filtering
const eventPatterns = {
// Filter by user events
userEvents: {
source: ['my.app.users'],
'detail-type': ['UserCreated', 'UserUpdated', 'UserDeleted'],
},
// Filter by high-value orders
highValueOrders: {
source: ['my.app.orders'],
'detail-type': ['OrderPlaced'],
detail: {
total: [{ numeric: ['>=', 1000] }],
},
},
// Filter by failed payments
failedPayments: {
source: ['my.app.payments'],
'detail-type': ['PaymentProcessed'],
detail: {
status: ['failed'],
},
},
// Filter by specific time range
recentEvents: {
time: [{ prefix: '2025-12' }],
},
// Complex pattern: premium users with high-value orders
premiumUserOrders: {
source: ['my.app.orders'],
'detail-type': ['OrderPlaced'],
detail: {
userId: [{ prefix: 'premium_' }],
total: [{ numeric: ['>=', 500] }],
},
},
};
// Console log event patterns for reference
console.log('\n📋 Event Pattern Examples:');
Object.entries(eventPatterns).forEach(([name, pattern]) => {
console.log(`\n${name}:`);
console.log(JSON.stringify(pattern, null, 2));
});
main().catch(err => {
console.error('Error:', err);
});
💻 Реестр Схем EventBridge javascript
Использовать Реестр Схем EventBridge для обнаружения, создания и управления схемами событий для обработки событий с типобезопасностью
// AWS EventBridge Schema Registry Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-schemas @aws-sdk/client-lambda
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const { SchemasClient, CreateSchemaCommand, ListSchemasCommand, DescribeSchemaCommand, SearchSchemasCommand } = require('@aws-sdk/client-schemas');
const eventbridge = new EventBridgeClient({ region: 'us-east-1' });
const schemas = new SchemasClient({ region: 'us-east-1' });
// Schema definitions for different event types
const eventSchemas = {
UserCreated: {
Type: 'OpenApi3',
Schema: `{
"openapi": "3.0.0",
"info": {
"title": "UserCreated Event",
"version": "1.0.0",
"description": "Event emitted when a new user is created"
},
"components": {
"schemas": {
"UserCreated": {
"type": "object",
"required": ["userId", "email", "createdAt"],
"properties": {
"userId": {
"type": "string",
"description": "Unique identifier for the user"
},
"email": {
"type": "string",
"format": "email",
"description": "User's email address"
},
"name": {
"type": "string",
"description": "User's full name"
},
"role": {
"type": "string",
"enum": ["user", "admin", "moderator"],
"default": "user"
},
"createdAt": {
"type": "string",
"format": "date-time",
"description": "Timestamp when user was created"
},
"metadata": {
"type": "object",
"properties": {
"source": { "type": "string" },
"ipAddress": { "type": "string" },
"userAgent": { "type": "string" }
}
}
}
}
}
}
}`,
},
OrderPlaced: {
Type: 'OpenApi3',
Schema: `{
"openapi": "3.0.0",
"info": {
"title": "OrderPlaced Event",
"version": "1.0.0",
"description": "Event emitted when a customer places an order"
},
"components": {
"schemas": {
"OrderPlaced": {
"type": "object",
"required": ["orderId", "userId", "total", "items", "createdAt"],
"properties": {
"orderId": {
"type": "string",
"description": "Unique identifier for the order"
},
"userId": {
"type": "string",
"description": "Customer's user ID"
},
"total": {
"type": "number",
"format": "float",
"minimum": 0,
"description": "Total order amount"
},
"currency": {
"type": "string",
"pattern": "^[A-Z]{3}$",
"default": "USD"
},
"items": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": ["productId", "quantity", "price"],
"properties": {
"productId": { "type": "string" },
"quantity": { "type": "integer", "minimum": 1 },
"price": { "type": "number", "minimum": 0 }
}
}
},
"shippingAddress": {
"type": "object",
"properties": {
"street": { "type": "string" },
"city": { "type": "string" },
"state": { "type": "string" },
"zipCode": { "type": "string" },
"country": { "type": "string" }
}
},
"createdAt": {
"type": "string",
"format": "date-time"
}
}
}
}
}
}`,
},
PaymentProcessed: {
Type: 'OpenApi3',
Schema: `{
"openapi": "3.0.0",
"info": {
"title": "PaymentProcessed Event",
"version": "1.0.0",
"description": "Event emitted when a payment is processed"
},
"components": {
"schemas": {
"PaymentProcessed": {
"type": "object",
"required": ["paymentId", "orderId", "amount", "status", "processedAt"],
"properties": {
"paymentId": {
"type": "string",
"description": "Unique identifier for the payment"
},
"orderId": {
"type": "string",
"description": "Associated order ID"
},
"amount": {
"type": "number",
"format": "float",
"minimum": 0
},
"currency": {
"type": "string",
"pattern": "^[A-Z]{3}$"
},
"status": {
"type": "string",
"enum": ["completed", "failed", "pending", "refunded"]
},
"method": {
"type": "string",
"enum": ["credit_card", "debit_card", "paypal", "bank_transfer"]
},
"transactionId": {
"type": "string",
"description": "External transaction ID from payment processor"
},
"failureReason": {
"type": "string",
"description": "Reason for payment failure (if status is failed)"
},
"processedAt": {
"type": "string",
"format": "date-time"
}
}
}
}
}
}`,
},
};
// Create schema in EventBridge Schema Registry
async function createSchema(name, schemaDefinition) {
const params = {
RegistryName: 'default', // Use default registry
SchemaName: name,
Type: schemaDefinition.Type,
Content: schemaDefinition.Schema,
Description: `${name} event schema for my application`,
};
try {
const command = new CreateSchemaCommand(params);
const result = await schemas.send(command);
console.log(`✅ Schema '${name}' created successfully`);
console.log(` Schema ARN: ${result.SchemaArn}`);
console.log(` Schema Version: ${result.SchemaVersion}`);
return result;
} catch (error) {
if (error.name === 'ConflictException') {
console.log(`ℹ️ Schema '${name}' already exists`);
return { SchemaArn: `arn:aws:schemas:us-east-1:123456789012:schema/${name}` };
}
throw error;
}
}
// List all schemas in registry
async function listSchemas() {
const params = {
RegistryName: 'default',
Limit: 50,
};
try {
const command = new ListSchemasCommand(params);
const result = await schemas.send(command);
console.log('\n📋 Available Schemas:');
result.Schemas.forEach(schema => {
console.log(` - ${schema.SchemaName} (v${schema.Version}): ${schema.Description || 'No description'}`);
});
return result.Schemas;
} catch (error) {
console.error('Error listing schemas:', error);
return [];
}
}
// Get detailed schema information
async function describeSchema(schemaName) {
const params = {
RegistryName: 'default',
SchemaName: schemaName,
};
try {
const command = new DescribeSchemaCommand(params);
const result = await schemas.send(command);
console.log(`\n📖 Schema Details: ${schemaName}`);
console.log(` Type: ${result.Type}`);
console.log(` Version: ${result.SchemaVersion}`);
console.log(` Created: ${result.CreatedAt}`);
console.log(` Modified: ${result.LastModified}`);
console.log(` Content Length: ${result.Content.length} characters`);
return result;
} catch (error) {
console.error(`Error describing schema '${schemaName}':`, error);
return null;
}
}
// Search schemas by keywords
async function searchSchemas(keyword) {
const params = {
RegistryName: 'default',
Keywords: keyword,
Limit: 10,
};
try {
const command = new SearchSchemasCommand(params);
const result = await schemas.send(command);
console.log(`\n🔍 Search Results for '${keyword}':`);
result.Schemas.forEach(schema => {
console.log(` - ${schema.SchemaName}: ${schema.Description}`);
});
return result.Schemas;
} catch (error) {
console.error('Error searching schemas:', error);
return [];
}
}
// Validate event against schema
function validateEvent(eventData, schemaName) {
console.log(`\n🔍 Validating event against '${schemaName}' schema`);
// In a real application, you would use a JSON schema validator
// Here we're demonstrating the concept
try {
const event = JSON.parse(eventData);
// Basic validation example
if (schemaName === 'UserCreated') {
const required = ['userId', 'email', 'createdAt'];
const missing = required.filter(field => !event[field]);
if (missing.length > 0) {
console.log(`❌ Validation failed: Missing required fields: ${missing.join(', ')}`);
return false;
}
// Email format validation
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(event.email)) {
console.log('❌ Validation failed: Invalid email format');
return false;
}
}
console.log('✅ Event validation passed');
return true;
} catch (error) {
console.log(`❌ Validation failed: ${error.message}`);
return false;
}
}
// Generate TypeScript types from schema
function generateTypeScriptTypes() {
const types = `// Auto-generated TypeScript types for EventBridge events
export interface UserCreated {
userId: string;
email: string;
name?: string;
role?: 'user' | 'admin' | 'moderator';
createdAt: string;
metadata?: {
source?: string;
ipAddress?: string;
userAgent?: string;
};
}
export interface OrderItem {
productId: string;
quantity: number;
price: number;
}
export interface OrderPlaced {
orderId: string;
userId: string;
total: number;
currency?: string;
items: OrderItem[];
shippingAddress?: {
street?: string;
city?: string;
state?: string;
zipCode?: string;
country?: string;
};
createdAt: string;
}
export interface PaymentProcessed {
paymentId: string;
orderId: string;
amount: number;
currency: string;
status: 'completed' | 'failed' | 'pending' | 'refunded';
method?: 'credit_card' | 'debit_card' | 'paypal' | 'bank_transfer';
transactionId?: string;
failureReason?: string;
processedAt: string;
}
// Union type for all events
export type AppEvent = UserCreated | OrderPlaced | PaymentProcessed;
// Event type discriminator
export type EventType = 'UserCreated' | 'OrderPlaced' | 'PaymentProcessed';
`;
console.log('\n📝 Generated TypeScript Types:');
console.log(types);
return types;
}
// Code generator for Lambda handlers with type safety
function generateTypedLambdaHandler() {
const handler = `// Type-safe Lambda handler with schema validation
import { AppEvent, UserCreated, OrderPlaced, PaymentProcessed } from './event-types';
exports.handler = async (event: any) => {
console.log('Processing EventBridge events...');
for (const record of event.Records) {
const eventType = record['detail-type'];
const eventData = record.detail;
try {
switch (eventType) {
case 'UserCreated':
await handleUserCreated(eventData as UserCreated);
break;
case 'OrderPlaced':
await handleOrderPlaced(eventData as OrderPlaced);
break;
case 'PaymentProcessed':
await handlePaymentProcessed(eventData as PaymentProcessed);
break;
default:
console.warn(`Unknown event type: ${eventType}`);
}
} catch (error) {
console.error(`Error processing event ${eventType}:`, error);
// Consider implementing dead-letter queue for failed events
}
}
};
async function handleUserCreated(user: UserCreated) {
console.log(`Processing new user: ${user.userId}`);
// Type-safe access to user properties
console.log(`Email: ${user.email}`);
if (user.metadata?.source) {
console.log(`Source: ${user.metadata.source}`);
}
// Add user to database, send welcome email, etc.
}
async function handleOrderPlaced(order: OrderPlaced) {
console.log(`Processing order: ${order.orderId}`);
console.log(`Total: ${order.total} ${order.currency || 'USD'}`);
// Process order, update inventory, calculate shipping, etc.
}
async function handlePaymentProcessed(payment: PaymentProcessed) {
console.log(`Processing payment: ${payment.paymentId}`);
console.log(`Status: ${payment.status}`);
// Update order status, send receipt, handle failures, etc.
}
`;
console.log('\n🔧 Generated Type-Safe Lambda Handler:');
console.log(handler);
return handler;
}
// Main function to demonstrate schema registry
async function main() {
console.log('=== AWS EventBridge Schema Registry Example ===');
try {
// Step 1: Create schemas
console.log('\n1. Creating event schemas...');
for (const [schemaName, schemaDef] of Object.entries(eventSchemas)) {
await createSchema(schemaName, schemaDef);
}
// Step 2: List all schemas
console.log('\n2. Listing available schemas...');
await listSchemas();
// Step 3: Describe specific schema
console.log('\n3. Getting schema details...');
await describeSchema('UserCreated');
// Step 4: Search schemas
console.log('\n4. Searching schemas...');
await searchSchemas('User');
await searchSchemas('Payment');
// Step 5: Validate events
console.log('\n5. Validating events...');
const validUserEvent = JSON.stringify({
userId: 'user123',
email: '[email protected]',
name: 'John Doe',
createdAt: new Date().toISOString(),
});
const invalidUserEvent = JSON.stringify({
userId: 'user123',
// Missing required email field
createdAt: new Date().toISOString(),
});
validateEvent(validUserEvent, 'UserCreated');
validateEvent(invalidUserEvent, 'UserCreated');
// Step 6: Generate TypeScript types
console.log('\n6. Generating TypeScript types...');
generateTypeScriptTypes();
// Step 7: Generate typed Lambda handler
console.log('\n7. Generating type-safe Lambda handler...');
generateTypedLambdaHandler();
console.log('\n✅ Schema Registry example completed!');
console.log('\nBenefits of Schema Registry:');
console.log('- Type safety for event producers and consumers');
console.log('- Automatic code generation');
console.log('- Schema discovery and documentation');
console.log('- Validation and enforcement of event contracts');
} catch (error) {
console.error('Error:', error);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Pipes EventBridge javascript
Создавать event-driven конвейеры с EventBridge Pipes для точечной интеграции с фильтрацией, обогащением и трансформацией
// AWS EventBridge Pipes Example
// Install: npm install @aws-sdk/client-eventbridge-pipes @aws-sdk/client-sqs @aws-sdk/client-lambda
const { EventBridgePipesClient, CreatePipeCommand, UpdatePipeCommand, DeletePipeCommand, ListPipesCommand, DescribePipeCommand } = require('@aws-sdk/client-eventbridge-pipes');
const { SQSClient, CreateQueueCommand, GetQueueAttributesCommand } = require('@aws-sdk/client-sqs');
const { LambdaClient, CreateFunctionCommand } = require('@aws-sdk/client-lambda');
const pipesClient = new EventBridgePipesClient({ region: 'us-east-1' });
const sqs = new SQSClient({ region: 'us-east-1' });
const lambda = new LambdaClient({ region: 'us-east-1' });
// Example 1: Order Processing Pipeline
// SQS -> EventBridge Pipe -> Lambda -> EventBridge
// Create source SQS queue for order events
async function createOrderQueue() {
const params = {
QueueName: 'order-events-queue',
Attributes: {
VisibilityTimeout: '300', // 5 minutes
MessageRetentionPeriod: '1209600', // 14 days
DelaySeconds: '0',
ReceiveMessageWaitTimeSeconds: '20', // Long polling
},
};
try {
const command = new CreateQueueCommand(params);
const result = await sqs.send(command);
console.log('✅ Order queue created:', result.QueueUrl);
return result.QueueUrl;
} catch (error) {
if (error.name === 'QueueAlreadyExists') {
console.log('ℹ️ Order queue already exists');
return 'https://sqs.us-east-1.amazonaws.com/123456789012/order-events-queue';
}
throw error;
}
}
// Lambda function for order processing
const orderProcessorLambda = {
FunctionName: 'order-processor',
Runtime: 'nodejs18.x',
Role: 'arn:aws:iam::123456789012:role/lambda-execution-role',
Handler: 'index.handler',
Code: {
ZipFile: Buffer.from(`
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const eventbridge = new EventBridgeClient();
exports.handler = async (event) => {
console.log('Processing order events from SQS:', event);
for (const record of event.Records) {
const orderEvent = JSON.parse(record.body);
console.log('Processing order:', orderEvent);
// Transform and enrich order data
const enrichedEvent = {
orderId: orderEvent.orderId,
userId: orderEvent.userId,
total: orderEvent.total,
items: orderEvent.items,
processedAt: new Date().toISOString(),
processorId: 'order-service-v1',
region: process.env.AWS_REGION,
// Add enrichment data
customerTier: await getCustomerTier(orderEvent.userId),
inventoryStatus: await checkInventory(orderEvent.items),
};
// Send to EventBridge for downstream processing
await eventbridge.send(new PutEventsCommand({
Entries: [{
Source: 'order.processing',
DetailType: 'OrderProcessed',
Detail: JSON.stringify(enrichedEvent),
EventBusName: 'default'
}]
}));
}
return {
statusCode: 200,
body: JSON.stringify({ processed: event.Records.length });
};
};
async function getCustomerTier(userId) {
// Mock customer tier lookup
const tiers = ['bronze', 'silver', 'gold', 'platinum'];
return tiers[Math.floor(Math.random() * tiers.length)];
}
async function checkInventory(items) {
// Mock inventory check
return 'available';
}
`).toString('base64'),
},
Environment: {
Variables: {
LOG_LEVEL: 'INFO',
EVENT_BUS: 'default',
},
},
};
// Create EventBridge Pipe for order processing
async function createOrderProcessingPipe(sourceQueueUrl, targetLambdaArn) {
const pipeName = 'order-processing-pipe';
const params = {
Name: pipeName,
Description: 'Process order events from SQS and enrich with customer data',
RoleArn: 'arn:aws:iam::123456789012:role/EventBridgePipesExecutionRole',
Source: sourceQueueUrl,
Target: targetLambdaArn,
SourceParameters: {
FilterCriteria: {
Filters: [
{
Pattern: JSON.stringify({
eventType: ['order.created', 'order.updated'],
}),
},
],
},
SqsQueueParameters: {
BatchSize: 10,
MaximumBatchingWindowInSeconds: 30,
},
},
TargetParameters: {
InputTemplate: ` + '`' + `{ "order": <aws.events.json.jsonParse($.body)>, "source": "sqs", "timestamp": <aws.events.schedulerScheduledTime> }` + '`' + `,
EventBridgeEventParameters: {
DetailType: 'OrderProcessingStarted',
Source: 'eventbridge.pipes',
},
},
LogConfiguration: {
CloudwatchLogsLogDestination: {
LogGroupArn: 'arn:aws:logs:us-east-1:123456789012:log-group:/aws/eventbridge/pipes/order-processing-pipe',
},
LogLevel: 'INFO',
IncludeExecutionData: ['ALL'],
},
Enrichment: 'arn:aws:lambda:us-east-1:123456789012:function:customer-enrichment',
};
try {
const command = new CreatePipeCommand(params);
const result = await pipesClient.send(command);
console.log(`✅ Pipe '${pipeName}' created successfully`);
console.log(` ARN: ${result.Arn}`);
console.log(` Created: ${result.CreateTime}`);
return result;
} catch (error) {
if (error.name === 'ConflictException') {
console.log(`ℹ️ Pipe '${pipeName}' already exists`);
return { Arn: `arn:aws:pipes:us-east-1:123456789012:pipe/${pipeName}` };
}
throw error;
}
}
// Example 2: Real-time Data Pipeline
// Kinesis -> EventBridge Pipe -> Transformation -> EventBridge -> Multiple Targets
// Create EventBridge Pipe for real-time analytics
async function createAnalyticsPipe() {
const pipeName = 'real-time-analytics-pipe';
const params = {
Name: pipeName,
Description: 'Process real-time analytics events from Kinesis',
RoleArn: 'arn:aws:iam::123456789012:role/EventBridgePipesExecutionRole',
Source: 'arn:aws:kinesis:us-east-1:123456789012:stream/analytics-events',
Target: 'arn:aws:events:us-east-1:123456789012:event-bus/analytics-bus',
SourceParameters: {
FilterCriteria: {
Filters: [
{
Pattern: JSON.stringify({
eventType: ['user.action', 'page.view', 'click.event'],
}),
},
],
},
KinesisStreamParameters: {
BatchSize: 100,
MaximumBatchingWindowInSeconds: 5,
StartingPosition: 'LATEST',
DeadLetterConfig: {
Arn: 'arn:aws:sqs:us-east-1:123456789012:queue/analytics-dlq',
},
},
},
TargetParameters: {
InputTransformer: {
InputPathsMap: {
eventData: '$.detail',
timestamp: '$.time',
region: '$.region',
},
InputTemplate: ` + '`' + `{
"analyticEvent": <aws.events.json.jsonParse($.eventData)>,
"processedAt": <aws.events.schedulerScheduledTime>,
"sourceRegion": <aws.events.json.toString($.region)>,
"pipelineVersion": "v1.0"
}` + '`' + `,
},
EventBridgeEventParameters: {
DetailType: 'AnalyticsEventProcessed',
Source: 'analytics.pipeline',
},
},
LogConfiguration: {
CloudwatchLogsLogDestination: {
LogGroupArn: 'arn:aws:logs:us-east-1:123456789012:log-group:/aws/eventbridge/pipes/analytics-pipe',
},
LogLevel: 'DEBUG',
},
};
try {
const command = new CreatePipeCommand(params);
const result = await pipesClient.send(command);
console.log(`✅ Analytics pipe '${pipeName}' created successfully`);
return result;
} catch (error) {
console.error(`Error creating analytics pipe: ${error}`);
return null;
}
}
// List all pipes
async function listPipes() {
const params = {
Limit: 50,
};
try {
const command = new ListPipesCommand(params);
const result = await pipesClient.send(command);
console.log('\n📋 Available EventBridge Pipes:');
result.Pipes.forEach(pipe => {
console.log(` - ${pipe.Name}: ${pipe.Description || 'No description'}`);
console.log(` State: ${pipe.CurrentState}`);
console.log(` Created: ${pipe.CreateTime}`);
if (pipe.LastModifiedTime) {
console.log(` Modified: ${pipe.LastModifiedTime}`);
}
console.log('');
});
return result.Pipes;
} catch (error) {
console.error('Error listing pipes:', error);
return [];
}
}
// Get detailed pipe information
async function describePipe(pipeName) {
const params = {
Name: pipeName,
};
try {
const command = new DescribePipeCommand(params);
const result = await pipesClient.send(command);
console.log(`\n📖 Pipe Details: ${pipeName}`);
console.log(` ARN: ${result.Arn}`);
console.log(` Description: ${result.Description}`);
console.log(` State: ${result.CurrentState}`);
console.log(` Source: ${result.Source}`);
console.log(` Target: ${result.Target}`);
console.log(` Role: ${result.RoleArn}`);
if (result.DesiredState && result.DesiredState !== result.CurrentState) {
console.log(` Desired State: ${result.DesiredState}`);
}
return result;
} catch (error) {
console.error(`Error describing pipe '${pipeName}':`, error);
return null;
}
}
// Update pipe configuration
async function updatePipe(pipeName, updates) {
const params = {
Name: pipeName,
...updates,
};
try {
const command = new UpdatePipeCommand(params);
const result = await pipesClient.send(command);
console.log(`✅ Pipe '${pipeName}' updated successfully`);
console.log(` New desired state: ${result.DesiredState}`);
return result;
} catch (error) {
console.error(`Error updating pipe '${pipeName}':`, error);
return null;
}
}
// Monitor pipe performance
function createPipeMonitoring() {
const monitoring = `// EventBridge Pipe Monitoring with CloudWatch
const { CloudWatchClient, PutMetricDataCommand } = require('@aws-sdk/client-cloudwatch');
const cloudwatch = new CloudWatchClient();
// Custom metrics for pipe monitoring
async function publishPipeMetrics(pipeName, metrics) {
const params = {
Namespace: 'EventBridge/Pipes',
MetricData: [
{
MetricName: 'EventsProcessed',
Dimensions: [
{ Name: 'PipeName', Value: pipeName },
],
Value: metrics.eventsProcessed,
Unit: 'Count',
},
{
MetricName: 'ProcessingLatency',
Dimensions: [
{ Name: 'PipeName', Value: pipeName },
],
Value: metrics.processingLatency,
Unit: 'Milliseconds',
},
{
MetricName: 'Errors',
Dimensions: [
{ Name: 'PipeName', Value: pipeName },
],
Value: metrics.errors,
Unit: 'Count',
},
],
};
await cloudwatch.send(new PutMetricDataCommand(params));
console.log(`Published metrics for pipe: ${pipeName}`);
}
// Lambda function to monitor pipe health
exports.pipeMonitorHandler = async (event) => {
// Analyze pipe performance and alert on issues
for (const record of event.Records) {
const { pipeName, metrics } = JSON.parse(record.body);
if (metrics.errors > 10) {
// Send alert for high error rate
await sendAlert(pipeName, 'High error rate detected');
}
if (metrics.processingLatency > 5000) {
// Send alert for high latency
await sendAlert(pipeName, 'High processing latency detected');
}
}
};
async function sendAlert(pipeName, message) {
console.log(`ALERT [${pipeName}]: ${message}`);
// Integrate with SNS, PagerDuty, Slack, etc.
}`;
console.log('\n📊 Pipe Monitoring Code:');
console.log(monitoring);
return monitoring;
}
// CloudFormation template for EventBridge Pipes
const pipesCloudFormation = `Resources:
# EventBridge Pipe Execution Role
PipesExecutionRole:
Type: AWS:: IAM:: Role
Properties:
RoleName: EventBridgePipesExecutionRole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: pipes.amazonaws.com
Action: sts: AssumeRole
Policies:
- PolicyName: PipesExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- sqs: ReceiveMessage
- sqs: DeleteMessage
- sqs: GetQueueAttributes
Resource: '*'
- Effect: Allow
Action:
- lambda: InvokeFunction
Resource: '*'
- Effect: Allow
Action:
- events: PutEvents
Resource: '*'
# Order Processing Pipe
OrderProcessingPipe:
Type: AWS:: Events:: Pipe
Properties:
Name: order - processing - pipe
Description: Process order events with enrichment
RoleArn: !GetAtt PipesExecutionRole.Arn
Source: !GetAtt OrderQueue.Arn
Target: !GetAtt OrderProcessorFunction.Arn
SourceParameters:
FilterCriteria:
Filters:
- Pattern: '{ "eventType": ["order.created"] }'
SqsQueueParameters:
BatchSize: 10
MaximumBatchingWindowInSeconds: 30
TargetParameters:
InputTemplate: '{ "order": <aws.events.json.jsonParse($.body)>, "processedAt": <aws.events.schedulerScheduledTime> }'
EventBridgeEventParameters:
DetailType: 'OrderProcessed'
Source: 'order.service'
LogConfiguration:
LogLevel: INFO
CloudwatchLogsLogDestination:
LogGroupArn: !GetAtt PipesLogGroup.Arn
# Source SQS Queue
OrderQueue:
Type: AWS:: SQS:: Queue
Properties:
QueueName: order - events - queue
VisibilityTimeout: 300
MessageRetentionPeriod: 1209600
# Target Lambda Function
OrderProcessorFunction:
Type: AWS:: Lambda:: Function
Properties:
FunctionName: order - processor
Runtime: nodejs18.x
Handler: index.handler
Role: !GetAtt LambdaExecutionRole.Arn
Code:
ZipFile: |
exports.handler = async (event) => {
console.log('Processing orders:', event);
// Process orders here
return { statusCode: 200 };
};
# Log Group for Pipes
PipesLogGroup:
Type: AWS:: Logs:: LogGroup
Properties:
LogGroupName: /aws/eventbridge / pipes / order - processing - pipe
RetentionInDays: 14`;
console.log('\n🏗️ CloudFormation Template for EventBridge Pipes:');
console.log(pipesCloudFormation);
return pipesCloudFormation;
}
// Main function to demonstrate EventBridge Pipes
async function main() {
console.log('=== AWS EventBridge Pipes Example ===');
try {
// Step 1: Create source queue
console.log('\n1. Creating source SQS queue...');
const queueUrl = await createOrderQueue();
// Step 2: Note: Lambda functions would be deployed separately
console.log('\n2. Lambda function code for order processor:');
console.log(orderProcessorLambda);
// Step 3: Create EventBridge Pipe
console.log('\n3. Creating EventBridge Pipe...');
// await createOrderProcessingPipe(queueUrl, 'arn:aws:lambda:us-east-1:123456789012:function:order-processor');
// Step 4: Create analytics pipe
console.log('\n4. Creating analytics pipe...');
await createAnalyticsPipe();
// Step 5: List pipes
console.log('\n5. Listing all pipes...');
await listPipes();
// Step 6: Create monitoring setup
console.log('\n6. Setting up monitoring...');
createPipeMonitoring();
// Step 7: CloudFormation template
console.log('\n7. Infrastructure as Code template...');
createPipeMonitoring();
console.log('\n✅ EventBridge Pipes example completed!');
console.log('\nKey Features of EventBridge Pipes:');
console.log('- Point-to-point integration without custom code');
console.log('- Built-in filtering and transformation');
console.log('- Enrichment with Lambda functions');
console.log('- Integrated monitoring and logging');
console.log('- Support for various sources and targets');
} catch (error) {
console.error('Error:', error);
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Архивация и Воспроизведение EventBridge javascript
Архивировать события для соответствия требованиям и отладки, затем воспроизводить события для сценариев тестирования и восстановления
// AWS EventBridge Archive and Replay Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-lambda
const { EventBridgeClient, CreateArchiveCommand, DescribeArchiveCommand, UpdateArchiveCommand, ListArchivesCommand, StartReplayCommand, DescribeReplayCommand, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const eventbridge = new EventBridgeClient({ region: 'us-east-1' });
// Sample business events to archive
const businessEvents = [
{
Source: 'com.myapp.orders',
DetailType: 'OrderCreated',
Detail: JSON.stringify({
orderId: 'ORD-001',
customerId: 'CUST-001',
total: 299.99,
currency: 'USD',
items: [
{ productId: 'PROD-001', quantity: 2, price: 149.99 },
],
createdAt: '2025-12-15T10:00:00Z',
}),
},
{
Source: 'com.myapp.orders',
DetailType: 'OrderUpdated',
Detail: JSON.stringify({
orderId: 'ORD-001',
customerId: 'CUST-001',
status: 'confirmed',
updatedAt: '2025-12-15T10:05:00Z',
}),
},
{
Source: 'com.myapp.payments',
DetailType: 'PaymentProcessed',
Detail: JSON.stringify({
paymentId: 'PAY-001',
orderId: 'ORD-001',
amount: 299.99,
status: 'completed',
processedAt: '2025-12-15T10:10:00Z',
}),
},
{
Source: 'com.myapp.shipping',
DetailType: 'OrderShipped',
Detail: JSON.stringify({
orderId: 'ORD-001',
trackingNumber: 'TRK-123456789',
carrier: 'FedEx',
shippedAt: '2025-12-15T14:00:00Z',
}),
},
{
Source: 'com.myapp.orders',
DetailType: 'OrderDelivered',
Detail: JSON.stringify({
orderId: 'ORD-001',
customerId: 'CUST-001',
deliveredAt: '2025-12-16T16:30:00Z',
}),
},
];
// Send business events to EventBridge
async function sendBusinessEvents() {
console.log('📤 Sending business events to EventBridge...');
for (let i = 0; i < businessEvents.length; i++) {
const event = businessEvents[i];
const params = {
Entries: [event],
};
try {
const command = new PutEventsCommand(params);
const result = await eventbridge.send(command);
if (result.SuccessfulEntries && result.SuccessfulEntries.length > 0) {
const entry = result.SuccessfulEntries[0];
console.log(`✅ Event ${i + 1}/${businessEvents.length} sent: ${event.DetailType}`);
console.log(` Event ID: ${entry.EventId}`);
// Add small delay between events
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
console.error(`❌ Failed to send event ${i + 1}:`, error);
}
}
}
// Create an event archive
async function createEventArchive(archiveName, description, eventPattern) {
const params = {
ArchiveName: archiveName,
Description: description,
EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
EventPattern: eventPattern ? JSON.stringify(eventPattern) : undefined,
RetentionDays: 30, // Archive for 30 days
};
try {
const command = new CreateArchiveCommand(params);
const result = await eventbridge.send(command);
console.log(`✅ Archive '${archiveName}' created successfully`);
console.log(` ARN: ${result.ArchiveArn}`);
console.log(` State: ${result.State}`);
console.log(` Creation Time: ${result.CreationTime}`);
console.log(` Retention Days: ${result.RetentionDays}`);
return result;
} catch (error) {
if (error.name === 'ResourceAlreadyExistsException') {
console.log(`ℹ️ Archive '${archiveName}' already exists`);
return { ArchiveArn: `arn:aws:events:us-east-1:123456789012:archive/${archiveName}` };
}
throw error;
}
}
// Example: Create different types of archives
async function setupArchives() {
console.log('\n📦 Setting up event archives...');
// Archive 1: All order events for compliance
const orderEventsPattern = {
source: ['com.myapp.orders'],
};
await createEventArchive(
'order-events-archive',
'Archive all order events for compliance and audit',
orderEventsPattern
);
// Archive 2: High-value transactions for fraud analysis
const highValuePattern = {
source: ['com.myapp.orders', 'com.myapp.payments'],
detail: {
total: [{ numeric: ['>=', 1000] }],
},
};
await createEventArchive(
'high-value-transactions-archive',
'Archive high-value transactions for fraud analysis',
highValuePattern
);
// Archive 3: Payment events for reconciliation
const paymentEventsPattern = {
source: ['com.myapp.payments'],
};
await createEventArchive(
'payment-events-archive',
'Archive payment events for financial reconciliation',
paymentEventsPattern
);
// Archive 4: All events for disaster recovery
await createEventArchive(
'disaster-recovery-archive',
'Archive all events for disaster recovery purposes'
);
}
// List all archives
async function listArchives() {
const params = {
NamePrefix: '', // List all archives
Limit: 50,
};
try {
const command = new ListArchivesCommand(params);
const result = await eventbridge.send(command);
console.log('\n📋 Available Event Archives:');
result.Archives.forEach(archive => {
console.log(` - ${archive.ArchiveName}`);
console.log(` State: ${archive.State}`);
console.log(` Event Source: ${archive.EventSourceArn}`);
console.log(` Creation Time: ${archive.CreationTime}`);
console.log(` Retention Days: ${archive.RetentionDays}`);
if (archive.SizeBytes) {
console.log(` Size: ${(archive.SizeBytes / 1024 / 1024).toFixed(2)} MB`);
}
if (archive.EventCount) {
console.log(` Event Count: ${archive.EventCount}`);
}
console.log('');
});
return result.Archives;
} catch (error) {
console.error('Error listing archives:', error);
return [];
}
}
// Get detailed archive information
async function describeArchive(archiveName) {
const params = {
ArchiveName: archiveName,
};
try {
const command = new DescribeArchiveCommand(params);
const result = await eventbridge.send(command);
console.log(`\n📖 Archive Details: ${archiveName}`);
console.log(` ARN: ${result.ArchiveArn}`);
console.log(` Description: ${result.Description}`);
console.log(` State: ${result.State}`);
console.log(` Event Source: ${result.EventSourceArn}`);
console.log(` Creation Time: ${result.CreationTime}`);
console.log(` Retention Days: ${result.RetentionDays}`);
console.log(` Size: ${result.SizeBytes ? (result.SizeBytes / 1024 / 1024).toFixed(2) + ' MB' : 'Unknown'}`);
console.log(` Event Count: ${result.EventCount || 'Unknown'}`);
if (result.EventPattern) {
console.log(` Event Pattern: ${JSON.stringify(result.EventPattern)}`);
}
return result;
} catch (error) {
console.error(`Error describing archive '${archiveName}':`, error);
return null;
}
}
// Start replaying archived events
async function startReplay(archiveName, replayName, startTime, endTime, destination) {
const params = {
ArchiveName: archiveName,
ReplayName: replayName,
Description: `Replay events from ${archiveName} for testing`,
EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
StartTime: startTime,
EndTime: endTime,
Destination: {
Arn: destination,
},
};
try {
const command = new StartReplayCommand(params);
const result = await eventbridge.send(command);
console.log(`✅ Replay '${replayName}' started successfully`);
console.log(` Replay ARN: ${result.ReplayArn}`);
console.log(` State: ${result.State}`);
console.log(` Start Time: ${result.ReplayStartTime}`);
return result;
} catch (error) {
if (error.name === 'ResourceAlreadyExistsException') {
console.log(`ℹ️ Replay '${replayName}' already exists`);
return { ReplayArn: `arn:aws:events:us-east-1:123456789012:replay/${replayName}` };
}
throw error;
}
}
// Example replay scenarios
async function demonstrateReplayScenarios() {
console.log('\n🔄 Demonstrating replay scenarios...');
// Scenario 1: Testing new Lambda function with historical data
console.log('\n1. Testing new function with historical order events...');
const orderReplayStart = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24 hours ago
const orderReplayEnd = new Date();
await startReplay(
'order-events-archive',
'test-new-processor',
orderReplayStart,
orderReplayEnd,
'arn:aws:lambda:us-east-1:123456789012:function:test-order-processor'
);
// Scenario 2: Debugging production issues
console.log('\n2. Debugging payment processing issues...');
const paymentReplayStart = new Date(Date.now() - 2 * 60 * 60 * 1000); // 2 hours ago
const paymentReplayEnd = new Date();
await startReplay(
'payment-events-archive',
'debug-payment-issues',
paymentReplayStart,
paymentReplayEnd,
'arn:aws:lambda:us-east-1:123456789012:function:payment-debugger'
);
// Scenario 3: Rebuilding data warehouse
console.log('\n3. Rebuilding data warehouse with all events...');
const warehouseReplayStart = new Date('2025-12-01T00:00:00Z'); // Start of month
const warehouseReplayEnd = new Date();
await startReplay(
'disaster-recovery-archive',
'rebuild-data-warehouse',
warehouseReplayStart,
warehouseReplayEnd,
'arn:aws:events:us-east-1:123456789012:event-bus/data-warehouse-ingest'
);
}
// Get replay status
async function describeReplay(replayName) {
const params = {
ReplayName: replayName,
};
try {
const command = new DescribeReplayCommand(params);
const result = await eventbridge.send(command);
console.log(`\n📊 Replay Status: ${replayName}`);
console.log(` State: ${result.State}`);
console.log(` Start Time: ${result.ReplayStartTime}`);
console.log(` End Time: ${result.ReplayEndTime || 'In progress'}`);
console.log(` Events Replayable: ${result.EventSourceArn}`);
if (result.ReplayStartTime && result.ReplayEndTime) {
const duration = (new Date(result.ReplayEndTime) - new Date(result.ReplayStartTime)) / 1000;
console.log(` Duration: ${duration} seconds`);
}
return result;
} catch (error) {
console.error(`Error describing replay '${replayName}':`, error);
return null;
}
}
// Archive compliance and retention management
function createArchiveManagement() {
const management = `// Event Archive Management for Compliance
const { EventBridgeClient, UpdateArchiveCommand, DeleteArchiveCommand } = require('@aws-sdk/client-eventbridge');
const { IAMClient, GetRolePolicyCommand } = require('@aws-sdk/client-iam');
const eventbridge = new EventBridgeClient();
// Update archive retention based on compliance requirements
async function updateArchiveRetention(archiveName, retentionDays, reason) {
const params = {
ArchiveName: archiveName,
RetentionDays: retentionDays,
Description: `Updated retention for ${reason}`,
};
try {
const command = new UpdateArchiveCommand(params);
const result = await eventbridge.send(command);
console.log(`✅ Archive '${archiveName}' retention updated to ${retentionDays} days`);
console.log(` Reason: ${reason}`);
return result;
} catch (error) {
console.error(`Error updating archive retention: ${error}`);
return null;
}
}
// Check archive compliance
async function checkArchiveCompliance(archiveName) {
const requiredRetention = 2555; // 7 years for financial data
// Get archive details
const archive = await describeArchive(archiveName);
if (archive && archive.RetentionDays < requiredRetention) {
console.log(`⚠️ COMPLIANCE WARNING: Archive '${archiveName}' retention (${archive.RetentionDays} days) is below required (${requiredRetention} days)`);
// Auto-update for compliance
await updateArchiveRetention(archiveName, requiredRetention, 'Automated compliance update');
} else {
console.log(`✅ Archive '${archiveName}' is compliant`);
}
}
// Archive lifecycle management
const ARCHIVE_POLICIES = {
'order-events-archive': {
retentionDays: 2555, // 7 years for financial records
autoExtend: true,
complianceLevel: 'critical',
},
'payment-events-archive': {
retentionDays: 2555, // 7 years for payment records
autoExtend: true,
complianceLevel: 'critical',
},
'high-value-transactions-archive': {
retentionDays: 1825, // 5 years for fraud analysis
autoExtend: true,
complianceLevel: 'high',
},
'debug-events-archive': {
retentionDays: 90, // 90 days for debugging
autoExtend: false,
complianceLevel: 'low',
},
};
// Implement archive lifecycle
async function manageArchiveLifecycle() {
for (const [archiveName, policy] of Object.entries(ARCHIVE_POLICIES)) {
console.log(`\n🔄 Managing lifecycle for ${archiveName}`);
if (policy.complianceLevel === 'critical') {
await checkArchiveCompliance(archiveName);
}
// Check if archive needs to be extended
if (policy.autoExtend) {
console.log(` Auto-extend policy: Active`);
}
}
}
// CloudWatch Events for archive monitoring
const archiveMonitoring = {
// Event pattern for archive state changes
ArchiveStateChange: {
source: ['aws.events'],
'detail-type': ['EventBridge Archive State Change'],
detail: {
state: ['ENABLED', 'DISABLED', 'CREATING', 'UPDATING', 'DELETING'],
},
},
// Event pattern for replay state changes
ReplayStateChange: {
source: ['aws.events'],
'detail-type': ['EventBridge Archive Replay State Change'],
detail: {
state: ['STARTING', 'RUNNING', 'COMPLETED', 'FAILED'],
},
},
};
console.log('Archive Monitoring Event Patterns:', JSON.stringify(archiveMonitoring, null, 2));`;
console.log('\n🔒 Archive Management Code:');
console.log(management);
return management;
}
// Lambda function for archive automation
const archiveAutomationLambda = {
FunctionName: 'eventbridge-archive-manager',
Runtime: 'nodejs18.x',
Handler: 'index.handler',
Code: {
ZipFile: Buffer.from(`
const { EventBridgeClient, CreateArchiveCommand, ListArchivesCommand } = require('@aws-sdk/client-eventbridge');
const eventbridge = new EventBridgeClient();
exports.handler = async (event) => {
console.log('Archive manager triggered:', event);
// Daily archive creation for different event types
const today = new Date().toISOString().split('T')[0];
// Create daily archive for order events
await createDailyArchive(`order-events-${today}`, {
source: ['com.myapp.orders'],
});
// Create daily archive for payment events
await createDailyArchive(`payment-events-${today}`, {
source: ['com.myapp.payments'],
});
// Clean up old archives (keep last 30 days)
await cleanupOldArchives();
return {
statusCode: 200,
body: JSON.stringify({ message: 'Archive management completed' });
};
};
async function createDailyArchive(archiveName, eventPattern) {
try {
const command = new CreateArchiveCommand({
ArchiveName: archiveName,
Description: `Daily archive for ${archiveName}`,
EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
EventPattern: JSON.stringify(eventPattern),
RetentionDays: 30,
});
await eventbridge.send(command);
console.log(`Created daily archive: ${archiveName}`);
} catch (error) {
if (error.name !== 'ResourceAlreadyExistsException') {
console.error(`Error creating archive ${archiveName}:`, error);
}
}
}
async function cleanupOldArchives() {
// Implementation for cleaning up old archives
console.log('Cleaning up old archives...');
}
`).toString('base64'),
},
};
console.log('\n🤖 Archive Automation Lambda:');
console.log(archiveAutomationLambda);
// Main function
async function main() {
console.log('=== AWS EventBridge Archive and Replay Example ===');
try {
// Step 1: Send sample events
console.log('\n1. Sending business events...');
await sendBusinessEvents();
// Step 2: Wait for events to be processed
console.log('\n2. Waiting for events to be processed...');
await new Promise(resolve => setTimeout(resolve, 5000));
// Step 3: Set up archives
console.log('\n3. Setting up event archives...');
await setupArchives();
// Step 4: List archives
console.log('\n4. Listing archives...');
await listArchives();
// Step 5: Describe specific archive
console.log('\n5. Getting archive details...');
await describeArchive('order-events-archive');
// Step 6: Demonstrate replay scenarios
console.log('\n6. Setting up replay scenarios...');
await demonstrateReplayScenarios();
// Step 7: Archive management
console.log('\n7. Setting up archive management...');
createArchiveManagement();
// Step 8: Lambda automation
console.log('\n8. Archive automation Lambda configuration...');
console.log(archiveAutomationLambda);
console.log('\n✅ EventBridge Archive and Replay example completed!');
console.log('\nKey Benefits:');
console.log('- Event archival for compliance and audit');
console.log('- Historical data replay for testing');
console.log('- Disaster recovery capabilities');
console.log('- Debugging with real production events');
console.log('- Data warehouse rebuilding');
} catch (error) {
console.error('Error:', error);
}
}
main().catch(err => {
console.error('Error:', err);
});