Ejemplos de AWS EventBridge

Ejemplos de AWS EventBridge incluyendo buses de eventos, reglas, objetivos, registro de esquemas, eventos personalizados y enrutamiento de eventos entre cuentas para arquitectura serverless event-driven

💻 EventBridge Hello World - Bus de Eventos Básico javascript

🟢 simple ⭐⭐

Ejemplo simple de AWS EventBridge con bus de eventos personalizado, reglas de eventos y objetivos Lambda

⏱️ 15 min 🏷️ eventbridge, aws, events, serverless
Prerequisites: AWS account, AWS CLI configured, Node.js
// AWS EventBridge Hello World - Basic Event Bus Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-lambda

const { EventBridgeClient, PutEventsCommand, CreateEventBusCommand } = require('@aws-sdk/client-eventbridge');
const { LambdaClient, AddPermissionCommand } = require('@aws-sdk/client-lambda');

// AWS Configuration
const eventbridge = new EventBridgeClient({ region: 'us-east-1' });
const lambda = new LambdaClient({ region: 'us-east-1' });

// Create custom event bus
async function createCustomEventBus() {
  const params = {
    Name: 'my-app-events',
    Description: 'Event bus for my application events',
  };

  try {
    const command = new CreateEventBusCommand(params);
    const result = await eventbridge.send(command);
    console.log('✅ Custom event bus created:', result.EventBusArn);
    return result.EventBusArn;
  } catch (error) {
    if (error.name === 'ResourceAlreadyExistsException') {
      console.log('ℹ️  Event bus already exists');
      return 'arn:aws:events:us-east-1:123456789012:event-bus/my-app-events';
    }
    throw error;
  }
}

// Lambda function handler (deployment separate from this code)
const lambdaHandler = `
exports.handler = async (event) => {
  console.log('Event received:', JSON.stringify(event, null, 2));

  for (const record of event.Records) {
    console.log('Event detail:', record.detail);
    console.log('Event source:', record.source);
    console.log('Event type:', record['detail-type']);

    // Process event based on type
    switch (record['detail-type']) {
      case 'UserCreated':
        console.log(`Processing new user: ${record.detail.userId}`);
        // Add user to database, send welcome email, etc.
        break;
      case 'OrderPlaced':
        console.log(`Processing order: ${record.detail.orderId}`);
        // Process order, update inventory, etc.
        break;
    }
  }

  return {
    statusCode: 200,
    body: JSON.stringify({ message: 'Events processed successfully' }),
  };
};
`;

// Grant EventBridge permission to invoke Lambda
async function grantEventBridgePermission(lambdaArn) {
  const params = {
    FunctionName: lambdaArn,
    StatementId: 'EventBridgeInvokePermission',
    Action: 'lambda:InvokeFunction',
    Principal: 'events.amazonaws.com',
  };

  try {
    const command = new AddPermissionCommand(params);
    await lambda.send(command);
    console.log('✅ EventBridge permission granted to Lambda');
  } catch (error) {
    if (error.name === 'ResourceConflictException') {
      console.log('ℹ️  Permission already exists');
    } else {
      throw error;
    }
  }
}

// Send custom events to EventBridge
async function sendCustomEvents() {
  const events = [
    {
      Source: 'my.app.users',
      DetailType: 'UserCreated',
      Detail: JSON.stringify({
        userId: 'user123',
        email: '[email protected]',
        name: 'John Doe',
        timestamp: new Date().toISOString(),
      }),
      EventBusName: 'my-app-events',
    },
    {
      Source: 'my.app.orders',
      DetailType: 'OrderPlaced',
      Detail: JSON.stringify({
        orderId: 'order456',
        userId: 'user123',
        total: 99.99,
        items: [
          { productId: 'prod001', quantity: 2, price: 49.99 },
        ],
        timestamp: new Date().toISOString(),
      }),
      EventBusName: 'my-app-events',
    },
    {
      Source: 'my.app.payments',
      DetailType: 'PaymentProcessed',
      Detail: JSON.stringify({
        paymentId: 'pay789',
        orderId: 'order456',
        amount: 99.99,
        status: 'completed',
        method: 'credit_card',
        timestamp: new Date().toISOString(),
      }),
      EventBusName: 'my-app-events',
    },
  ];

  console.log('📤 Sending events to EventBridge...');

  for (const event of events) {
    const params = {
      Entries: [event],
    };

    try {
      const command = new PutEventsCommand(params);
      const result = await eventbridge.send(command);

      if (result.SuccessfulEntries && result.SuccessfulEntries.length > 0) {
                        const entry = result.SuccessfulEntries[0];
        console.log(`✅ Event sent: ${event.DetailType}`);
        console.log(`   Event ID: ${entry.EventId}`);
                      }

                      if (result.FailedEntries && result.FailedEntries.length > 0) {
                        console.error('❌ Failed to send event:', result.FailedEntries[0]);
                      }
                    } catch (error) {
                      console.error('Error sending event:', error);
                    }
                  }
                }

                // EventBridge Rule Configuration (CloudFormation/Terraform)
                const eventRuleTemplate = `
# CloudFormation Template for EventBridge Rule
Resources:
          UserEventsRule:
        Type: AWS:: Events:: Rule
    Properties:
          Name: user- events - rule
      Description: 'Process user-related events'
      EventBusName: my - app - events
      EventPattern:
      source:
      - 'my.app.users'
        detail - type:
      - 'UserCreated'
      - 'UserUpdated'
      - 'UserDeleted'
      State: ENABLED
      Targets:
      - Arn: !GetAtt UserEventProcessorFunction.Arn
          Id: 'UserEventProcessorTarget'

  OrderEventsRule:
      Type: AWS:: Events:: Rule
    Properties:
      Name: order - events - rule
      Description: 'Process order-related events'
      EventBusName: my - app - events
      EventPattern:
      source:
      - 'my.app.orders'
        detail - type:
      - 'OrderPlaced'
      - 'OrderShipped'
      - 'OrderDelivered'
      State: ENABLED
      Targets:
      - Arn: !GetAtt OrderEventProcessorFunction.Arn
          Id: 'OrderEventProcessorTarget'
        `;

        // Main function to demonstrate EventBridge setup
        async function main() {
        console.log('=== AWS EventBridge Hello World ===');

        try {
          // Step 1: Create custom event bus
          console.log('\n1. Creating custom event bus...');
          const eventBusArn = await createCustomEventBus();

          // Step 2: Note: Lambda functions and rules would be deployed via infrastructure as code
          console.log('\n2. Lambda handler code:');
          console.log(lambdaHandler);

          console.log('\n3. CloudFormation template for rules:');
          console.log(eventRuleTemplate);

          // Step 3: Send custom events
          console.log('\n4. Sending custom events...');
          await sendCustomEvents();

                    console.log('\n✅ EventBridge Hello World completed!');
          console.log('\nNext steps:');
          console.log('- Deploy Lambda functions with the provided handler');
          console.log('- Deploy EventBridge rules using the CloudFormation template');
          console.log('- Set up monitoring and logging for event processing');
        } catch(error) {
          console.error('Error:', error);
        }
      }

                // Example event patterns for filtering
                const eventPatterns = {
      // Filter by user events
      userEvents: {
        source: ['my.app.users'],
        'detail-type': ['UserCreated', 'UserUpdated', 'UserDeleted'],
      },

      // Filter by high-value orders
      highValueOrders: {
        source: ['my.app.orders'],
        'detail-type': ['OrderPlaced'],
        detail: {
          total: [{ numeric: ['>=', 1000] }],
        },
      },

      // Filter by failed payments
      failedPayments: {
        source: ['my.app.payments'],
        'detail-type': ['PaymentProcessed'],
        detail: {
          status: ['failed'],
        },
      },

      // Filter by specific time range
      recentEvents: {
        time: [{ prefix: '2025-12' }],
      },

      // Complex pattern: premium users with high-value orders
      premiumUserOrders: {
        source: ['my.app.orders'],
        'detail-type': ['OrderPlaced'],
        detail: {
          userId: [{ prefix: 'premium_' }],
          total: [{ numeric: ['>=', 500] }],
        },
      },
    };

    // Console log event patterns for reference
    console.log('\n📋 Event Pattern Examples:');
    Object.entries(eventPatterns).forEach(([name, pattern]) => {
      console.log(`\n${name}:`);
  console.log(JSON.stringify(pattern, null, 2));
});

main().catch(err => {
  console.error('Error:', err);
});

💻 Registro de Esquemas EventBridge javascript

🟡 intermediate ⭐⭐⭐⭐

Usar el Registro de Esquemas de EventBridge para descubrir, crear y gestionar esquemas de eventos para procesamiento de eventos con seguridad de tipos

⏱️ 25 min 🏷️ eventbridge, schema, registry, types
Prerequisites: AWS account, EventBridge enabled, Node.js, Understanding of JSON Schema
// AWS EventBridge Schema Registry Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-schemas @aws-sdk/client-lambda

const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const { SchemasClient, CreateSchemaCommand, ListSchemasCommand, DescribeSchemaCommand, SearchSchemasCommand } = require('@aws-sdk/client-schemas');

const eventbridge = new EventBridgeClient({ region: 'us-east-1' });
const schemas = new SchemasClient({ region: 'us-east-1' });

// Schema definitions for different event types
const eventSchemas = {
  UserCreated: {
    Type: 'OpenApi3',
    Schema: `{
      "openapi": "3.0.0",
      "info": {
        "title": "UserCreated Event",
        "version": "1.0.0",
        "description": "Event emitted when a new user is created"
      },
      "components": {
        "schemas": {
          "UserCreated": {
            "type": "object",
            "required": ["userId", "email", "createdAt"],
            "properties": {
              "userId": {
                "type": "string",
                "description": "Unique identifier for the user"
              },
              "email": {
                "type": "string",
                "format": "email",
                "description": "User's email address"
              },
              "name": {
                "type": "string",
                "description": "User's full name"
              },
              "role": {
                "type": "string",
                "enum": ["user", "admin", "moderator"],
                "default": "user"
              },
              "createdAt": {
                "type": "string",
                "format": "date-time",
                "description": "Timestamp when user was created"
              },
              "metadata": {
                "type": "object",
                "properties": {
                  "source": { "type": "string" },
                  "ipAddress": { "type": "string" },
                  "userAgent": { "type": "string" }
                }
              }
            }
          }
        }
      }
    }`,
  },

  OrderPlaced: {
    Type: 'OpenApi3',
    Schema: `{
      "openapi": "3.0.0",
      "info": {
        "title": "OrderPlaced Event",
        "version": "1.0.0",
        "description": "Event emitted when a customer places an order"
      },
      "components": {
        "schemas": {
          "OrderPlaced": {
            "type": "object",
            "required": ["orderId", "userId", "total", "items", "createdAt"],
            "properties": {
              "orderId": {
                "type": "string",
                "description": "Unique identifier for the order"
              },
              "userId": {
                "type": "string",
                "description": "Customer's user ID"
              },
              "total": {
                "type": "number",
                "format": "float",
                "minimum": 0,
                "description": "Total order amount"
              },
              "currency": {
                "type": "string",
                "pattern": "^[A-Z]{3}$",
                "default": "USD"
              },
              "items": {
                "type": "array",
                "minItems": 1,
                "items": {
                  "type": "object",
                  "required": ["productId", "quantity", "price"],
                  "properties": {
                    "productId": { "type": "string" },
                    "quantity": { "type": "integer", "minimum": 1 },
                    "price": { "type": "number", "minimum": 0 }
                  }
                }
              },
              "shippingAddress": {
                "type": "object",
                "properties": {
                  "street": { "type": "string" },
                  "city": { "type": "string" },
                  "state": { "type": "string" },
                  "zipCode": { "type": "string" },
                  "country": { "type": "string" }
                }
              },
              "createdAt": {
                "type": "string",
                "format": "date-time"
              }
            }
          }
        }
      }
    }`,
  },

  PaymentProcessed: {
    Type: 'OpenApi3',
    Schema: `{
      "openapi": "3.0.0",
      "info": {
        "title": "PaymentProcessed Event",
        "version": "1.0.0",
        "description": "Event emitted when a payment is processed"
      },
      "components": {
        "schemas": {
          "PaymentProcessed": {
            "type": "object",
            "required": ["paymentId", "orderId", "amount", "status", "processedAt"],
            "properties": {
              "paymentId": {
                "type": "string",
                "description": "Unique identifier for the payment"
              },
              "orderId": {
                "type": "string",
                "description": "Associated order ID"
              },
              "amount": {
                "type": "number",
                "format": "float",
                "minimum": 0
              },
              "currency": {
                "type": "string",
                "pattern": "^[A-Z]{3}$"
              },
              "status": {
                "type": "string",
                "enum": ["completed", "failed", "pending", "refunded"]
              },
              "method": {
                "type": "string",
                "enum": ["credit_card", "debit_card", "paypal", "bank_transfer"]
              },
              "transactionId": {
                "type": "string",
                "description": "External transaction ID from payment processor"
              },
              "failureReason": {
                "type": "string",
                "description": "Reason for payment failure (if status is failed)"
              },
              "processedAt": {
                "type": "string",
                "format": "date-time"
              }
            }
          }
        }
      }
    }`,
  },
};

// Create schema in EventBridge Schema Registry
async function createSchema(name, schemaDefinition) {
  const params = {
    RegistryName: 'default', // Use default registry
    SchemaName: name,
    Type: schemaDefinition.Type,
    Content: schemaDefinition.Schema,
    Description: `${name} event schema for my application`,
  };

  try {
    const command = new CreateSchemaCommand(params);
    const result = await schemas.send(command);
    console.log(`✅ Schema '${name}' created successfully`);
    console.log(`   Schema ARN: ${result.SchemaArn}`);
    console.log(`   Schema Version: ${result.SchemaVersion}`);
    return result;
  } catch (error) {
    if (error.name === 'ConflictException') {
      console.log(`ℹ️  Schema '${name}' already exists`);
      return { SchemaArn: `arn:aws:schemas:us-east-1:123456789012:schema/${name}` };
    }
    throw error;
  }
}

// List all schemas in registry
async function listSchemas() {
  const params = {
    RegistryName: 'default',
    Limit: 50,
  };

  try {
    const command = new ListSchemasCommand(params);
    const result = await schemas.send(command);

    console.log('\n📋 Available Schemas:');
    result.Schemas.forEach(schema => {
      console.log(`   - ${schema.SchemaName} (v${schema.Version}): ${schema.Description || 'No description'}`);
    });

    return result.Schemas;
  } catch (error) {
    console.error('Error listing schemas:', error);
    return [];
  }
}

// Get detailed schema information
async function describeSchema(schemaName) {
  const params = {
    RegistryName: 'default',
    SchemaName: schemaName,
  };

  try {
    const command = new DescribeSchemaCommand(params);
    const result = await schemas.send(command);

    console.log(`\n📖 Schema Details: ${schemaName}`);
    console.log(`   Type: ${result.Type}`);
    console.log(`   Version: ${result.SchemaVersion}`);
    console.log(`   Created: ${result.CreatedAt}`);
    console.log(`   Modified: ${result.LastModified}`);
    console.log(`   Content Length: ${result.Content.length} characters`);

    return result;
  } catch (error) {
    console.error(`Error describing schema '${schemaName}':`, error);
    return null;
  }
}

// Search schemas by keywords
async function searchSchemas(keyword) {
  const params = {
    RegistryName: 'default',
    Keywords: keyword,
    Limit: 10,
  };

  try {
    const command = new SearchSchemasCommand(params);
    const result = await schemas.send(command);

    console.log(`\n🔍 Search Results for '${keyword}':`);
    result.Schemas.forEach(schema => {
      console.log(`   - ${schema.SchemaName}: ${schema.Description}`);
    });

    return result.Schemas;
  } catch (error) {
    console.error('Error searching schemas:', error);
    return [];
  }
}

// Validate event against schema
function validateEvent(eventData, schemaName) {
  console.log(`\n🔍 Validating event against '${schemaName}' schema`);

  // In a real application, you would use a JSON schema validator
  // Here we're demonstrating the concept
  try {
    const event = JSON.parse(eventData);

    // Basic validation example
    if (schemaName === 'UserCreated') {
      const required = ['userId', 'email', 'createdAt'];
      const missing = required.filter(field => !event[field]);

      if (missing.length > 0) {
        console.log(`❌ Validation failed: Missing required fields: ${missing.join(', ')}`);
        return false;
      }

      // Email format validation
      const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
      if (!emailRegex.test(event.email)) {
        console.log('❌ Validation failed: Invalid email format');
        return false;
      }
    }

    console.log('✅ Event validation passed');
    return true;
  } catch (error) {
    console.log(`❌ Validation failed: ${error.message}`);
    return false;
  }
}

// Generate TypeScript types from schema
function generateTypeScriptTypes() {
  const types = `// Auto-generated TypeScript types for EventBridge events

export interface UserCreated {
  userId: string;
  email: string;
  name?: string;
  role?: 'user' | 'admin' | 'moderator';
  createdAt: string;
  metadata?: {
    source?: string;
    ipAddress?: string;
    userAgent?: string;
  };
}

export interface OrderItem {
  productId: string;
  quantity: number;
  price: number;
}

export interface OrderPlaced {
  orderId: string;
  userId: string;
  total: number;
  currency?: string;
  items: OrderItem[];
  shippingAddress?: {
    street?: string;
    city?: string;
    state?: string;
    zipCode?: string;
    country?: string;
  };
  createdAt: string;
}

export interface PaymentProcessed {
  paymentId: string;
  orderId: string;
  amount: number;
  currency: string;
  status: 'completed' | 'failed' | 'pending' | 'refunded';
  method?: 'credit_card' | 'debit_card' | 'paypal' | 'bank_transfer';
  transactionId?: string;
  failureReason?: string;
  processedAt: string;
}

// Union type for all events
export type AppEvent = UserCreated | OrderPlaced | PaymentProcessed;

// Event type discriminator
export type EventType = 'UserCreated' | 'OrderPlaced' | 'PaymentProcessed';
`;

  console.log('\n📝 Generated TypeScript Types:');
  console.log(types);

  return types;
}

// Code generator for Lambda handlers with type safety
function generateTypedLambdaHandler() {
  const handler = `// Type-safe Lambda handler with schema validation
import { AppEvent, UserCreated, OrderPlaced, PaymentProcessed } from './event-types';

exports.handler = async (event: any) => {
  console.log('Processing EventBridge events...');

  for (const record of event.Records) {
    const eventType = record['detail-type'];
    const eventData = record.detail;

    try {
      switch (eventType) {
        case 'UserCreated':
          await handleUserCreated(eventData as UserCreated);
          break;
        case 'OrderPlaced':
          await handleOrderPlaced(eventData as OrderPlaced);
          break;
        case 'PaymentProcessed':
          await handlePaymentProcessed(eventData as PaymentProcessed);
          break;
        default:
          console.warn(`Unknown event type: ${eventType}`);
      }
    } catch (error) {
      console.error(`Error processing event ${eventType}:`, error);
      // Consider implementing dead-letter queue for failed events
    }
  }
};

async function handleUserCreated(user: UserCreated) {
  console.log(`Processing new user: ${user.userId}`);
  // Type-safe access to user properties
  console.log(`Email: ${user.email}`);
  if (user.metadata?.source) {
    console.log(`Source: ${user.metadata.source}`);
  }
  // Add user to database, send welcome email, etc.
}

async function handleOrderPlaced(order: OrderPlaced) {
  console.log(`Processing order: ${order.orderId}`);
  console.log(`Total: ${order.total} ${order.currency || 'USD'}`);
  // Process order, update inventory, calculate shipping, etc.
}

async function handlePaymentProcessed(payment: PaymentProcessed) {
  console.log(`Processing payment: ${payment.paymentId}`);
  console.log(`Status: ${payment.status}`);
  // Update order status, send receipt, handle failures, etc.
}
`;

  console.log('\n🔧 Generated Type-Safe Lambda Handler:');
  console.log(handler);

  return handler;
}

// Main function to demonstrate schema registry
async function main() {
  console.log('=== AWS EventBridge Schema Registry Example ===');

  try {
    // Step 1: Create schemas
    console.log('\n1. Creating event schemas...');
    for (const [schemaName, schemaDef] of Object.entries(eventSchemas)) {
      await createSchema(schemaName, schemaDef);
    }

    // Step 2: List all schemas
    console.log('\n2. Listing available schemas...');
    await listSchemas();

    // Step 3: Describe specific schema
    console.log('\n3. Getting schema details...');
    await describeSchema('UserCreated');

    // Step 4: Search schemas
    console.log('\n4. Searching schemas...');
    await searchSchemas('User');
    await searchSchemas('Payment');

    // Step 5: Validate events
    console.log('\n5. Validating events...');
    const validUserEvent = JSON.stringify({
      userId: 'user123',
      email: '[email protected]',
      name: 'John Doe',
      createdAt: new Date().toISOString(),
    });

    const invalidUserEvent = JSON.stringify({
      userId: 'user123',
      // Missing required email field
      createdAt: new Date().toISOString(),
    });

    validateEvent(validUserEvent, 'UserCreated');
    validateEvent(invalidUserEvent, 'UserCreated');

    // Step 6: Generate TypeScript types
    console.log('\n6. Generating TypeScript types...');
    generateTypeScriptTypes();

    // Step 7: Generate typed Lambda handler
    console.log('\n7. Generating type-safe Lambda handler...');
    generateTypedLambdaHandler();

    console.log('\n✅ Schema Registry example completed!');
    console.log('\nBenefits of Schema Registry:');
    console.log('- Type safety for event producers and consumers');
    console.log('- Automatic code generation');
    console.log('- Schema discovery and documentation');
    console.log('- Validation and enforcement of event contracts');
  } catch (error) {
    console.error('Error:', error);
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 Pipes EventBridge javascript

🟡 intermediate ⭐⭐⭐⭐

Construir pipelines event-driven con EventBridge Pipes para integración punto a punto con filtrado, enriquecimiento y transformación

⏱️ 30 min 🏷️ eventbridge, pipes, integration, pipeline
Prerequisites: AWS account, EventBridge enabled, SQS, Lambda, Understanding of event-driven architecture
// AWS EventBridge Pipes Example
// Install: npm install @aws-sdk/client-eventbridge-pipes @aws-sdk/client-sqs @aws-sdk/client-lambda

const { EventBridgePipesClient, CreatePipeCommand, UpdatePipeCommand, DeletePipeCommand, ListPipesCommand, DescribePipeCommand } = require('@aws-sdk/client-eventbridge-pipes');
const { SQSClient, CreateQueueCommand, GetQueueAttributesCommand } = require('@aws-sdk/client-sqs');
const { LambdaClient, CreateFunctionCommand } = require('@aws-sdk/client-lambda');

const pipesClient = new EventBridgePipesClient({ region: 'us-east-1' });
const sqs = new SQSClient({ region: 'us-east-1' });
const lambda = new LambdaClient({ region: 'us-east-1' });

// Example 1: Order Processing Pipeline
// SQS -> EventBridge Pipe -> Lambda -> EventBridge

// Create source SQS queue for order events
async function createOrderQueue() {
  const params = {
    QueueName: 'order-events-queue',
    Attributes: {
      VisibilityTimeout: '300', // 5 minutes
      MessageRetentionPeriod: '1209600', // 14 days
      DelaySeconds: '0',
      ReceiveMessageWaitTimeSeconds: '20', // Long polling
    },
  };

  try {
    const command = new CreateQueueCommand(params);
    const result = await sqs.send(command);
    console.log('✅ Order queue created:', result.QueueUrl);
    return result.QueueUrl;
  } catch (error) {
    if (error.name === 'QueueAlreadyExists') {
      console.log('ℹ️  Order queue already exists');
      return 'https://sqs.us-east-1.amazonaws.com/123456789012/order-events-queue';
    }
    throw error;
  }
}

// Lambda function for order processing
const orderProcessorLambda = {
  FunctionName: 'order-processor',
  Runtime: 'nodejs18.x',
  Role: 'arn:aws:iam::123456789012:role/lambda-execution-role',
  Handler: 'index.handler',
  Code: {
    ZipFile: Buffer.from(`
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');

const eventbridge = new EventBridgeClient();

exports.handler = async (event) => {
  console.log('Processing order events from SQS:', event);

  for (const record of event.Records) {
    const orderEvent = JSON.parse(record.body);
    console.log('Processing order:', orderEvent);

    // Transform and enrich order data
    const enrichedEvent = {
      orderId: orderEvent.orderId,
      userId: orderEvent.userId,
      total: orderEvent.total,
      items: orderEvent.items,
      processedAt: new Date().toISOString(),
      processorId: 'order-service-v1',
      region: process.env.AWS_REGION,
      // Add enrichment data
      customerTier: await getCustomerTier(orderEvent.userId),
      inventoryStatus: await checkInventory(orderEvent.items),
    };

    // Send to EventBridge for downstream processing
    await eventbridge.send(new PutEventsCommand({
      Entries: [{
        Source: 'order.processing',
        DetailType: 'OrderProcessed',
        Detail: JSON.stringify(enrichedEvent),
        EventBusName: 'default'
      }]
    }));
  }

  return {
    statusCode: 200,
    body: JSON.stringify({ processed: event.Records.length });
  };
};

async function getCustomerTier(userId) {
  // Mock customer tier lookup
  const tiers = ['bronze', 'silver', 'gold', 'platinum'];
  return tiers[Math.floor(Math.random() * tiers.length)];
}

async function checkInventory(items) {
  // Mock inventory check
  return 'available';
}
`).toString('base64'),
  },
  Environment: {
    Variables: {
      LOG_LEVEL: 'INFO',
      EVENT_BUS: 'default',
    },
  },
};

// Create EventBridge Pipe for order processing
async function createOrderProcessingPipe(sourceQueueUrl, targetLambdaArn) {
  const pipeName = 'order-processing-pipe';

  const params = {
    Name: pipeName,
    Description: 'Process order events from SQS and enrich with customer data',
    RoleArn: 'arn:aws:iam::123456789012:role/EventBridgePipesExecutionRole',
    Source: sourceQueueUrl,
    Target: targetLambdaArn,
    SourceParameters: {
      FilterCriteria: {
        Filters: [
          {
            Pattern: JSON.stringify({
              eventType: ['order.created', 'order.updated'],
            }),
          },
        ],
      },
      SqsQueueParameters: {
        BatchSize: 10,
        MaximumBatchingWindowInSeconds: 30,
      },
    },
    TargetParameters: {
      InputTemplate: ` + '`' + `{ "order": <aws.events.json.jsonParse($.body)>, "source": "sqs", "timestamp": <aws.events.schedulerScheduledTime> }` + '`' + `,
      EventBridgeEventParameters: {
        DetailType: 'OrderProcessingStarted',
        Source: 'eventbridge.pipes',
      },
    },
    LogConfiguration: {
      CloudwatchLogsLogDestination: {
        LogGroupArn: 'arn:aws:logs:us-east-1:123456789012:log-group:/aws/eventbridge/pipes/order-processing-pipe',
      },
      LogLevel: 'INFO',
      IncludeExecutionData: ['ALL'],
    },
    Enrichment: 'arn:aws:lambda:us-east-1:123456789012:function:customer-enrichment',
  };

  try {
    const command = new CreatePipeCommand(params);
    const result = await pipesClient.send(command);
    console.log(`✅ Pipe '${pipeName}' created successfully`);
    console.log(`   ARN: ${result.Arn}`);
    console.log(`   Created: ${result.CreateTime}`);
    return result;
  } catch (error) {
    if (error.name === 'ConflictException') {
      console.log(`ℹ️  Pipe '${pipeName}' already exists`);
      return { Arn: `arn:aws:pipes:us-east-1:123456789012:pipe/${pipeName}` };
    }
    throw error;
  }
}

// Example 2: Real-time Data Pipeline
// Kinesis -> EventBridge Pipe -> Transformation -> EventBridge -> Multiple Targets

// Create EventBridge Pipe for real-time analytics
async function createAnalyticsPipe() {
  const pipeName = 'real-time-analytics-pipe';

  const params = {
    Name: pipeName,
    Description: 'Process real-time analytics events from Kinesis',
    RoleArn: 'arn:aws:iam::123456789012:role/EventBridgePipesExecutionRole',
    Source: 'arn:aws:kinesis:us-east-1:123456789012:stream/analytics-events',
    Target: 'arn:aws:events:us-east-1:123456789012:event-bus/analytics-bus',
    SourceParameters: {
      FilterCriteria: {
        Filters: [
          {
            Pattern: JSON.stringify({
              eventType: ['user.action', 'page.view', 'click.event'],
            }),
          },
        ],
      },
      KinesisStreamParameters: {
        BatchSize: 100,
        MaximumBatchingWindowInSeconds: 5,
        StartingPosition: 'LATEST',
        DeadLetterConfig: {
          Arn: 'arn:aws:sqs:us-east-1:123456789012:queue/analytics-dlq',
        },
      },
    },
    TargetParameters: {
      InputTransformer: {
        InputPathsMap: {
          eventData: '$.detail',
          timestamp: '$.time',
          region: '$.region',
        },
        InputTemplate: ` + '`' + `{
          "analyticEvent": <aws.events.json.jsonParse($.eventData)>,
          "processedAt": <aws.events.schedulerScheduledTime>,
          "sourceRegion": <aws.events.json.toString($.region)>,
          "pipelineVersion": "v1.0"
        }` + '`' + `,
      },
EventBridgeEventParameters: {
  DetailType: 'AnalyticsEventProcessed',
    Source: 'analytics.pipeline',
      },
    },
LogConfiguration: {
  CloudwatchLogsLogDestination: {
    LogGroupArn: 'arn:aws:logs:us-east-1:123456789012:log-group:/aws/eventbridge/pipes/analytics-pipe',
      },
  LogLevel: 'DEBUG',
    },
  };

try {
  const command = new CreatePipeCommand(params);
  const result = await pipesClient.send(command);
  console.log(`✅ Analytics pipe '${pipeName}' created successfully`);
    return result;
  } catch (error) {
    console.error(`Error creating analytics pipe: ${error}`);
    return null;
  }
}

// List all pipes
async function listPipes() {
  const params = {
    Limit: 50,
  };

  try {
    const command = new ListPipesCommand(params);
    const result = await pipesClient.send(command);

    console.log('\n📋 Available EventBridge Pipes:');
    result.Pipes.forEach(pipe => {
      console.log(`   - ${pipe.Name}: ${pipe.Description || 'No description'}`);
      console.log(`     State: ${pipe.CurrentState}`);
      console.log(`     Created: ${pipe.CreateTime}`);
      if (pipe.LastModifiedTime) {
        console.log(`     Modified: ${pipe.LastModifiedTime}`);
      }
      console.log('');
    });

    return result.Pipes;
  } catch (error) {
    console.error('Error listing pipes:', error);
    return [];
  }
}

// Get detailed pipe information
async function describePipe(pipeName) {
  const params = {
    Name: pipeName,
  };

  try {
    const command = new DescribePipeCommand(params);
    const result = await pipesClient.send(command);

    console.log(`\n📖 Pipe Details: ${pipeName}`);
    console.log(`   ARN: ${result.Arn}`);
    console.log(`   Description: ${result.Description}`);
    console.log(`   State: ${result.CurrentState}`);
    console.log(`   Source: ${result.Source}`);
    console.log(`   Target: ${result.Target}`);
    console.log(`   Role: ${result.RoleArn}`);

    if (result.DesiredState && result.DesiredState !== result.CurrentState) {
      console.log(`   Desired State: ${result.DesiredState}`);
    }

    return result;
  } catch (error) {
    console.error(`Error describing pipe '${pipeName}':`, error);
    return null;
  }
}

// Update pipe configuration
async function updatePipe(pipeName, updates) {
  const params = {
    Name: pipeName,
    ...updates,
  };

  try {
    const command = new UpdatePipeCommand(params);
    const result = await pipesClient.send(command);
    console.log(`✅ Pipe '${pipeName}' updated successfully`);
    console.log(`   New desired state: ${result.DesiredState}`);
    return result;
  } catch (error) {
    console.error(`Error updating pipe '${pipeName}':`, error);
    return null;
  }
}

// Monitor pipe performance
function createPipeMonitoring() {
  const monitoring = `// EventBridge Pipe Monitoring with CloudWatch
const { CloudWatchClient, PutMetricDataCommand } = require('@aws-sdk/client-cloudwatch');

  const cloudwatch = new CloudWatchClient();

  // Custom metrics for pipe monitoring
  async function publishPipeMetrics(pipeName, metrics) {
    const params = {
      Namespace: 'EventBridge/Pipes',
      MetricData: [
        {
          MetricName: 'EventsProcessed',
          Dimensions: [
            { Name: 'PipeName', Value: pipeName },
          ],
          Value: metrics.eventsProcessed,
          Unit: 'Count',
        },
        {
          MetricName: 'ProcessingLatency',
          Dimensions: [
            { Name: 'PipeName', Value: pipeName },
          ],
          Value: metrics.processingLatency,
          Unit: 'Milliseconds',
        },
        {
          MetricName: 'Errors',
          Dimensions: [
            { Name: 'PipeName', Value: pipeName },
          ],
          Value: metrics.errors,
          Unit: 'Count',
        },
      ],
    };

    await cloudwatch.send(new PutMetricDataCommand(params));
    console.log(`Published metrics for pipe: ${pipeName}`);
}

// Lambda function to monitor pipe health
exports.pipeMonitorHandler = async (event) => {
  // Analyze pipe performance and alert on issues
  for (const record of event.Records) {
    const { pipeName, metrics } = JSON.parse(record.body);

    if (metrics.errors > 10) {
      // Send alert for high error rate
      await sendAlert(pipeName, 'High error rate detected');
    }

    if (metrics.processingLatency > 5000) {
      // Send alert for high latency
      await sendAlert(pipeName, 'High processing latency detected');
    }
  }
};

async function sendAlert(pipeName, message) {
  console.log(`ALERT [${pipeName}]: ${message}`);
  // Integrate with SNS, PagerDuty, Slack, etc.
}`;

  console.log('\n📊 Pipe Monitoring Code:');
  console.log(monitoring);

  return monitoring;
}

  // CloudFormation template for EventBridge Pipes
const pipesCloudFormation = `Resources:
  # EventBridge Pipe Execution Role
  PipesExecutionRole:
    Type: AWS:: IAM:: Role
    Properties:
    RoleName: EventBridgePipesExecutionRole
      AssumeRolePolicyDocument:
    Version: '2012-10-17'
        Statement:
    - Effect: Allow
            Principal:
    Service: pipes.amazonaws.com
            Action: sts: AssumeRole
      Policies:
    - PolicyName: PipesExecutionPolicy
          PolicyDocument:
    Version: '2012-10-17'
            Statement:
    - Effect: Allow
                Action:
    - sqs: ReceiveMessage
  - sqs: DeleteMessage
  - sqs: GetQueueAttributes
                Resource: '*'
  - Effect: Allow
                Action:
    - lambda: InvokeFunction
                Resource: '*'
  - Effect: Allow
                Action:
    - events: PutEvents
                Resource: '*'

  # Order Processing Pipe
  OrderProcessingPipe:
    Type: AWS:: Events:: Pipe
    Properties:
    Name: order - processing - pipe
      Description: Process order events with enrichment
      RoleArn: !GetAtt PipesExecutionRole.Arn
  Source: !GetAtt OrderQueue.Arn
  Target: !GetAtt OrderProcessorFunction.Arn
  SourceParameters:
  FilterCriteria:
  Filters:
  - Pattern: '{ "eventType": ["order.created"] }'
  SqsQueueParameters:
  BatchSize: 10
  MaximumBatchingWindowInSeconds: 30
  TargetParameters:
  InputTemplate: '{ "order": <aws.events.json.jsonParse($.body)>, "processedAt": <aws.events.schedulerScheduledTime> }'
  EventBridgeEventParameters:
  DetailType: 'OrderProcessed'
  Source: 'order.service'
  LogConfiguration:
  LogLevel: INFO
  CloudwatchLogsLogDestination:
  LogGroupArn: !GetAtt PipesLogGroup.Arn

  # Source SQS Queue
  OrderQueue:
  Type: AWS:: SQS:: Queue
  Properties:
  QueueName: order - events - queue
  VisibilityTimeout: 300
  MessageRetentionPeriod: 1209600

  # Target Lambda Function
  OrderProcessorFunction:
  Type: AWS:: Lambda:: Function
  Properties:
  FunctionName: order - processor
  Runtime: nodejs18.x
  Handler: index.handler
  Role: !GetAtt LambdaExecutionRole.Arn
  Code:
  ZipFile: |
    exports.handler = async (event) => {
      console.log('Processing orders:', event);
      // Process orders here
      return { statusCode: 200 };
    };

  # Log Group for Pipes
  PipesLogGroup:
    Type: AWS:: Logs:: LogGroup
  Properties:
  LogGroupName: /aws/eventbridge / pipes / order - processing - pipe
  RetentionInDays: 14`;

console.log('\n🏗️  CloudFormation Template for EventBridge Pipes:');
  console.log(pipesCloudFormation);

  return pipesCloudFormation;
}

// Main function to demonstrate EventBridge Pipes
async function main() {
  console.log('=== AWS EventBridge Pipes Example ===');

  try {
    // Step 1: Create source queue
    console.log('\n1. Creating source SQS queue...');
    const queueUrl = await createOrderQueue();

    // Step 2: Note: Lambda functions would be deployed separately
    console.log('\n2. Lambda function code for order processor:');
    console.log(orderProcessorLambda);

    // Step 3: Create EventBridge Pipe
    console.log('\n3. Creating EventBridge Pipe...');
    // await createOrderProcessingPipe(queueUrl, 'arn:aws:lambda:us-east-1:123456789012:function:order-processor');

    // Step 4: Create analytics pipe
    console.log('\n4. Creating analytics pipe...');
    await createAnalyticsPipe();

    // Step 5: List pipes
    console.log('\n5. Listing all pipes...');
    await listPipes();

    // Step 6: Create monitoring setup
    console.log('\n6. Setting up monitoring...');
    createPipeMonitoring();

    // Step 7: CloudFormation template
    console.log('\n7. Infrastructure as Code template...');
    createPipeMonitoring();

    console.log('\n✅ EventBridge Pipes example completed!');
    console.log('\nKey Features of EventBridge Pipes:');
    console.log('- Point-to-point integration without custom code');
    console.log('- Built-in filtering and transformation');
    console.log('- Enrichment with Lambda functions');
    console.log('- Integrated monitoring and logging');
    console.log('- Support for various sources and targets');
  } catch (error) {
    console.error('Error:', error);
  }
}

main().catch(err => {
  console.error('Error:', err);
}); 

💻 Archivo y Reproducción EventBridge javascript

🟡 intermediate ⭐⭐⭐⭐

Archivar eventos para cumplimiento y depuración, luego reproducir eventos para escenarios de prueba y recuperación

⏱️ 30 min 🏷️ eventbridge, archive, replay, compliance
Prerequisites: AWS account, EventBridge enabled, Understanding of event archiving
// AWS EventBridge Archive and Replay Example
// Install: npm install @aws-sdk/client-eventbridge @aws-sdk/client-lambda

const { EventBridgeClient, CreateArchiveCommand, DescribeArchiveCommand, UpdateArchiveCommand, ListArchivesCommand, StartReplayCommand, DescribeReplayCommand, PutEventsCommand } = require('@aws-sdk/client-eventbridge');

const eventbridge = new EventBridgeClient({ region: 'us-east-1' });

// Sample business events to archive
const businessEvents = [
  {
    Source: 'com.myapp.orders',
    DetailType: 'OrderCreated',
    Detail: JSON.stringify({
      orderId: 'ORD-001',
      customerId: 'CUST-001',
      total: 299.99,
      currency: 'USD',
      items: [
        { productId: 'PROD-001', quantity: 2, price: 149.99 },
      ],
      createdAt: '2025-12-15T10:00:00Z',
    }),
  },
  {
    Source: 'com.myapp.orders',
    DetailType: 'OrderUpdated',
    Detail: JSON.stringify({
      orderId: 'ORD-001',
      customerId: 'CUST-001',
      status: 'confirmed',
      updatedAt: '2025-12-15T10:05:00Z',
    }),
  },
  {
    Source: 'com.myapp.payments',
    DetailType: 'PaymentProcessed',
    Detail: JSON.stringify({
      paymentId: 'PAY-001',
      orderId: 'ORD-001',
      amount: 299.99,
      status: 'completed',
      processedAt: '2025-12-15T10:10:00Z',
    }),
  },
  {
    Source: 'com.myapp.shipping',
    DetailType: 'OrderShipped',
    Detail: JSON.stringify({
      orderId: 'ORD-001',
      trackingNumber: 'TRK-123456789',
      carrier: 'FedEx',
      shippedAt: '2025-12-15T14:00:00Z',
    }),
  },
  {
    Source: 'com.myapp.orders',
    DetailType: 'OrderDelivered',
    Detail: JSON.stringify({
      orderId: 'ORD-001',
      customerId: 'CUST-001',
      deliveredAt: '2025-12-16T16:30:00Z',
    }),
  },
];

// Send business events to EventBridge
async function sendBusinessEvents() {
  console.log('📤 Sending business events to EventBridge...');

  for (let i = 0; i < businessEvents.length; i++) {
    const event = businessEvents[i];

    const params = {
      Entries: [event],
    };

    try {
      const command = new PutEventsCommand(params);
      const result = await eventbridge.send(command);

      if (result.SuccessfulEntries && result.SuccessfulEntries.length > 0) {
        const entry = result.SuccessfulEntries[0];
        console.log(`✅ Event ${i + 1}/${businessEvents.length} sent: ${event.DetailType}`);
        console.log(`   Event ID: ${entry.EventId}`);

        // Add small delay between events
        await new Promise(resolve => setTimeout(resolve, 100));
      }
    } catch (error) {
      console.error(`❌ Failed to send event ${i + 1}:`, error);
    }
  }
}

// Create an event archive
async function createEventArchive(archiveName, description, eventPattern) {
  const params = {
    ArchiveName: archiveName,
    Description: description,
    EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
    EventPattern: eventPattern ? JSON.stringify(eventPattern) : undefined,
    RetentionDays: 30, // Archive for 30 days
  };

  try {
    const command = new CreateArchiveCommand(params);
    const result = await eventbridge.send(command);

    console.log(`✅ Archive '${archiveName}' created successfully`);
    console.log(`   ARN: ${result.ArchiveArn}`);
    console.log(`   State: ${result.State}`);
    console.log(`   Creation Time: ${result.CreationTime}`);
    console.log(`   Retention Days: ${result.RetentionDays}`);

    return result;
  } catch (error) {
    if (error.name === 'ResourceAlreadyExistsException') {
      console.log(`ℹ️  Archive '${archiveName}' already exists`);
      return { ArchiveArn: `arn:aws:events:us-east-1:123456789012:archive/${archiveName}` };
    }
    throw error;
  }
}

// Example: Create different types of archives
async function setupArchives() {
  console.log('\n📦 Setting up event archives...');

  // Archive 1: All order events for compliance
  const orderEventsPattern = {
    source: ['com.myapp.orders'],
  };

  await createEventArchive(
    'order-events-archive',
    'Archive all order events for compliance and audit',
    orderEventsPattern
  );

  // Archive 2: High-value transactions for fraud analysis
  const highValuePattern = {
    source: ['com.myapp.orders', 'com.myapp.payments'],
    detail: {
      total: [{ numeric: ['>=', 1000] }],
    },
  };

  await createEventArchive(
    'high-value-transactions-archive',
    'Archive high-value transactions for fraud analysis',
    highValuePattern
  );

  // Archive 3: Payment events for reconciliation
  const paymentEventsPattern = {
    source: ['com.myapp.payments'],
  };

  await createEventArchive(
    'payment-events-archive',
    'Archive payment events for financial reconciliation',
    paymentEventsPattern
  );

  // Archive 4: All events for disaster recovery
  await createEventArchive(
    'disaster-recovery-archive',
    'Archive all events for disaster recovery purposes'
  );
}

// List all archives
async function listArchives() {
  const params = {
    NamePrefix: '', // List all archives
    Limit: 50,
  };

  try {
    const command = new ListArchivesCommand(params);
    const result = await eventbridge.send(command);

    console.log('\n📋 Available Event Archives:');
    result.Archives.forEach(archive => {
      console.log(`   - ${archive.ArchiveName}`);
      console.log(`     State: ${archive.State}`);
      console.log(`     Event Source: ${archive.EventSourceArn}`);
      console.log(`     Creation Time: ${archive.CreationTime}`);
      console.log(`     Retention Days: ${archive.RetentionDays}`);
      if (archive.SizeBytes) {
        console.log(`     Size: ${(archive.SizeBytes / 1024 / 1024).toFixed(2)} MB`);
      }
      if (archive.EventCount) {
        console.log(`     Event Count: ${archive.EventCount}`);
      }
      console.log('');
    });

    return result.Archives;
  } catch (error) {
    console.error('Error listing archives:', error);
    return [];
  }
}

// Get detailed archive information
async function describeArchive(archiveName) {
  const params = {
    ArchiveName: archiveName,
  };

  try {
    const command = new DescribeArchiveCommand(params);
    const result = await eventbridge.send(command);

    console.log(`\n📖 Archive Details: ${archiveName}`);
    console.log(`   ARN: ${result.ArchiveArn}`);
    console.log(`   Description: ${result.Description}`);
    console.log(`   State: ${result.State}`);
    console.log(`   Event Source: ${result.EventSourceArn}`);
    console.log(`   Creation Time: ${result.CreationTime}`);
    console.log(`   Retention Days: ${result.RetentionDays}`);
    console.log(`   Size: ${result.SizeBytes ? (result.SizeBytes / 1024 / 1024).toFixed(2) + ' MB' : 'Unknown'}`);
    console.log(`   Event Count: ${result.EventCount || 'Unknown'}`);

    if (result.EventPattern) {
      console.log(`   Event Pattern: ${JSON.stringify(result.EventPattern)}`);
    }

    return result;
  } catch (error) {
    console.error(`Error describing archive '${archiveName}':`, error);
    return null;
  }
}

// Start replaying archived events
async function startReplay(archiveName, replayName, startTime, endTime, destination) {
  const params = {
    ArchiveName: archiveName,
    ReplayName: replayName,
    Description: `Replay events from ${archiveName} for testing`,
    EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
    StartTime: startTime,
    EndTime: endTime,
    Destination: {
      Arn: destination,
    },
  };

  try {
    const command = new StartReplayCommand(params);
    const result = await eventbridge.send(command);

    console.log(`✅ Replay '${replayName}' started successfully`);
    console.log(`   Replay ARN: ${result.ReplayArn}`);
    console.log(`   State: ${result.State}`);
    console.log(`   Start Time: ${result.ReplayStartTime}`);

    return result;
  } catch (error) {
    if (error.name === 'ResourceAlreadyExistsException') {
      console.log(`ℹ️  Replay '${replayName}' already exists`);
      return { ReplayArn: `arn:aws:events:us-east-1:123456789012:replay/${replayName}` };
    }
    throw error;
  }
}

// Example replay scenarios
async function demonstrateReplayScenarios() {
  console.log('\n🔄 Demonstrating replay scenarios...');

  // Scenario 1: Testing new Lambda function with historical data
  console.log('\n1. Testing new function with historical order events...');
  const orderReplayStart = new Date(Date.now() - 24 * 60 * 60 * 1000); // 24 hours ago
  const orderReplayEnd = new Date();

  await startReplay(
    'order-events-archive',
    'test-new-processor',
    orderReplayStart,
    orderReplayEnd,
    'arn:aws:lambda:us-east-1:123456789012:function:test-order-processor'
  );

  // Scenario 2: Debugging production issues
  console.log('\n2. Debugging payment processing issues...');
  const paymentReplayStart = new Date(Date.now() - 2 * 60 * 60 * 1000); // 2 hours ago
  const paymentReplayEnd = new Date();

  await startReplay(
    'payment-events-archive',
    'debug-payment-issues',
    paymentReplayStart,
    paymentReplayEnd,
    'arn:aws:lambda:us-east-1:123456789012:function:payment-debugger'
  );

  // Scenario 3: Rebuilding data warehouse
  console.log('\n3. Rebuilding data warehouse with all events...');
  const warehouseReplayStart = new Date('2025-12-01T00:00:00Z'); // Start of month
  const warehouseReplayEnd = new Date();

  await startReplay(
    'disaster-recovery-archive',
    'rebuild-data-warehouse',
    warehouseReplayStart,
    warehouseReplayEnd,
    'arn:aws:events:us-east-1:123456789012:event-bus/data-warehouse-ingest'
  );
}

// Get replay status
async function describeReplay(replayName) {
  const params = {
    ReplayName: replayName,
  };

  try {
    const command = new DescribeReplayCommand(params);
    const result = await eventbridge.send(command);

    console.log(`\n📊 Replay Status: ${replayName}`);
    console.log(`   State: ${result.State}`);
    console.log(`   Start Time: ${result.ReplayStartTime}`);
    console.log(`   End Time: ${result.ReplayEndTime || 'In progress'}`);
    console.log(`   Events Replayable: ${result.EventSourceArn}`);

    if (result.ReplayStartTime && result.ReplayEndTime) {
      const duration = (new Date(result.ReplayEndTime) - new Date(result.ReplayStartTime)) / 1000;
      console.log(`   Duration: ${duration} seconds`);
    }

    return result;
  } catch (error) {
    console.error(`Error describing replay '${replayName}':`, error);
    return null;
  }
}

// Archive compliance and retention management
function createArchiveManagement() {
  const management = `// Event Archive Management for Compliance

const { EventBridgeClient, UpdateArchiveCommand, DeleteArchiveCommand } = require('@aws-sdk/client-eventbridge');
  const { IAMClient, GetRolePolicyCommand } = require('@aws-sdk/client-iam');

  const eventbridge = new EventBridgeClient();

  // Update archive retention based on compliance requirements
  async function updateArchiveRetention(archiveName, retentionDays, reason) {
    const params = {
      ArchiveName: archiveName,
      RetentionDays: retentionDays,
      Description: `Updated retention for ${reason}`,
  };

  try {
    const command = new UpdateArchiveCommand(params);
    const result = await eventbridge.send(command);

    console.log(`✅ Archive '${archiveName}' retention updated to ${retentionDays} days`);
    console.log(`   Reason: ${reason}`);

    return result;
  } catch (error) {
    console.error(`Error updating archive retention: ${error}`);
    return null;
  }
}

// Check archive compliance
async function checkArchiveCompliance(archiveName) {
  const requiredRetention = 2555; // 7 years for financial data

  // Get archive details
  const archive = await describeArchive(archiveName);

  if (archive && archive.RetentionDays < requiredRetention) {
    console.log(`⚠️  COMPLIANCE WARNING: Archive '${archiveName}' retention (${archive.RetentionDays} days) is below required (${requiredRetention} days)`);

    // Auto-update for compliance
    await updateArchiveRetention(archiveName, requiredRetention, 'Automated compliance update');
  } else {
    console.log(`✅ Archive '${archiveName}' is compliant`);
  }
}

// Archive lifecycle management
const ARCHIVE_POLICIES = {
  'order-events-archive': {
    retentionDays: 2555, // 7 years for financial records
    autoExtend: true,
    complianceLevel: 'critical',
  },
  'payment-events-archive': {
    retentionDays: 2555, // 7 years for payment records
    autoExtend: true,
    complianceLevel: 'critical',
  },
  'high-value-transactions-archive': {
    retentionDays: 1825, // 5 years for fraud analysis
    autoExtend: true,
    complianceLevel: 'high',
  },
  'debug-events-archive': {
    retentionDays: 90, // 90 days for debugging
    autoExtend: false,
    complianceLevel: 'low',
  },
};

// Implement archive lifecycle
async function manageArchiveLifecycle() {
  for (const [archiveName, policy] of Object.entries(ARCHIVE_POLICIES)) {
    console.log(`\n🔄 Managing lifecycle for ${archiveName}`);

    if (policy.complianceLevel === 'critical') {
      await checkArchiveCompliance(archiveName);
    }

    // Check if archive needs to be extended
    if (policy.autoExtend) {
      console.log(`   Auto-extend policy: Active`);
    }
  }
}

// CloudWatch Events for archive monitoring
const archiveMonitoring = {
  // Event pattern for archive state changes
  ArchiveStateChange: {
    source: ['aws.events'],
    'detail-type': ['EventBridge Archive State Change'],
    detail: {
      state: ['ENABLED', 'DISABLED', 'CREATING', 'UPDATING', 'DELETING'],
    },
  },

  // Event pattern for replay state changes
  ReplayStateChange: {
    source: ['aws.events'],
    'detail-type': ['EventBridge Archive Replay State Change'],
    detail: {
      state: ['STARTING', 'RUNNING', 'COMPLETED', 'FAILED'],
    },
  },
};

console.log('Archive Monitoring Event Patterns:', JSON.stringify(archiveMonitoring, null, 2));`;

                      console.log('\n🔒 Archive Management Code:');
  console.log(management);

  return management;
}

// Lambda function for archive automation
const archiveAutomationLambda = {
  FunctionName: 'eventbridge-archive-manager',
  Runtime: 'nodejs18.x',
  Handler: 'index.handler',
  Code: {
    ZipFile: Buffer.from(`
const { EventBridgeClient, CreateArchiveCommand, ListArchivesCommand } = require('@aws-sdk/client-eventbridge');

const eventbridge = new EventBridgeClient();

exports.handler = async (event) => {
  console.log('Archive manager triggered:', event);

  // Daily archive creation for different event types
  const today = new Date().toISOString().split('T')[0];

  // Create daily archive for order events
  await createDailyArchive(`order-events-${today}`, {
    source: ['com.myapp.orders'],
  });

  // Create daily archive for payment events
  await createDailyArchive(`payment-events-${today}`, {
    source: ['com.myapp.payments'],
  });

  // Clean up old archives (keep last 30 days)
  await cleanupOldArchives();

  return {
    statusCode: 200,
    body: JSON.stringify({ message: 'Archive management completed' });
  };
};

async function createDailyArchive(archiveName, eventPattern) {
  try {
    const command = new CreateArchiveCommand({
      ArchiveName: archiveName,
      Description: `Daily archive for ${archiveName}`,
      EventSourceArn: 'arn:aws:events:us-east-1:123456789012:event-bus/default',
      EventPattern: JSON.stringify(eventPattern),
      RetentionDays: 30,
    });

    await eventbridge.send(command);
    console.log(`Created daily archive: ${archiveName}`);
  } catch (error) {
    if (error.name !== 'ResourceAlreadyExistsException') {
      console.error(`Error creating archive ${archiveName}:`, error);
    }
  }
}

async function cleanupOldArchives() {
  // Implementation for cleaning up old archives
  console.log('Cleaning up old archives...');
}
`).toString('base64'),
  },
};

console.log('\n🤖 Archive Automation Lambda:');
console.log(archiveAutomationLambda);

// Main function
async function main() {
  console.log('=== AWS EventBridge Archive and Replay Example ===');

  try {
    // Step 1: Send sample events
    console.log('\n1. Sending business events...');
    await sendBusinessEvents();

    // Step 2: Wait for events to be processed
    console.log('\n2. Waiting for events to be processed...');
    await new Promise(resolve => setTimeout(resolve, 5000));

    // Step 3: Set up archives
    console.log('\n3. Setting up event archives...');
    await setupArchives();

    // Step 4: List archives
    console.log('\n4. Listing archives...');
    await listArchives();

    // Step 5: Describe specific archive
    console.log('\n5. Getting archive details...');
    await describeArchive('order-events-archive');

    // Step 6: Demonstrate replay scenarios
    console.log('\n6. Setting up replay scenarios...');
    await demonstrateReplayScenarios();

    // Step 7: Archive management
    console.log('\n7. Setting up archive management...');
    createArchiveManagement();

    // Step 8: Lambda automation
    console.log('\n8. Archive automation Lambda configuration...');
    console.log(archiveAutomationLambda);

    console.log('\n✅ EventBridge Archive and Replay example completed!');
    console.log('\nKey Benefits:');
    console.log('- Event archival for compliance and audit');
    console.log('- Historical data replay for testing');
    console.log('- Disaster recovery capabilities');
    console.log('- Debugging with real production events');
    console.log('- Data warehouse rebuilding');
  } catch (error) {
    console.error('Error:', error);
  }
}

main().catch(err => {
  console.error('Error:', err);
});