Event Sourcing Samples

Comprehensive event sourcing patterns covering event storage, CQRS architecture, state reconstruction, and event versioning strategies

Key Facts

Category
Architecture
Items
4
Format Families
sample

Sample Overview

Comprehensive event sourcing patterns covering event storage, CQRS architecture, state reconstruction, and event versioning strategies This sample set belongs to Architecture and can be used to test related workflows inside Elysia Tools.

💻 Event Storage Pattern

🟡 intermediate ⭐⭐⭐

Basic event storage implementation with event store and aggregate root

⏱️ 30 min 🏷️ event-storage, aggregate, domain-events
Prerequisites: Understanding of Domain-Driven Design, Basic knowledge of event patterns

// Event Store Implementation
interface Event {
  id: string
  aggregateId: string
  type: string
  data: any
  timestamp: Date
  version: number
}

class EventStore {
  private events: Map<string, Event[]> = new Map()

  async saveEvents(aggregateId: string, events: Event[]): Promise<void> {
    const existingEvents = this.events.get(aggregateId) || []
    const newEvents = events.map((event, index) => ({
      ...event,
      id: crypto.randomUUID(),
      timestamp: new Date(),
      version: existingEvents.length + index + 1
    }))

    this.events.set(aggregateId, [...existingEvents, ...newEvents])
  }

  async getEvents(aggregateId: string): Promise<Event[]> {
    return this.events.get(aggregateId) || []
  }

  async getEventsFromVersion(aggregateId: string, fromVersion: number): Promise<Event[]> {
    const events = this.events.get(aggregateId) || []
    return events.filter(event => event.version >= fromVersion)
  }
}

// Aggregate Root
abstract class AggregateRoot {
  private _id: string
  private _version: number = 0
  private _uncommittedEvents: Event[] = []

  constructor(id: string) {
    this._id = id
  }

  get id(): string {
    return this._id
  }

  get version(): number {
    return this._version
  }

  protected apply(event: Event): void {
    this.applyChange(event, true)
    this._uncommittedEvents.push(event)
  }

  private applyChange(event: Event, isNew: boolean): void {
    const handler = this.getEventHandler(event.type)
    if (handler) {
      handler.call(this, event.data)
    }

    if (isNew) {
      this._version = event.version
    }
  }

  protected abstract getEventHandler(eventType: string): Function | null

  async markChangesAsCommitted(): Promise<void> {
    this._uncommittedEvents = []
  }

  getUncommittedEvents(): Event[] {
    return [...this._uncommittedEvents]
  }

  async loadFromHistory(events: Event[]): Promise<void> {
    for (const event of events) {
      this.applyChange(event, false)
      this._version = event.version
    }
  }
}

// Shopping Cart Example
interface CartItem {
  productId: string
  productName: string
  quantity: number
  unitPrice: number
}

interface CartEventData {
  productId?: string
  productName?: string
  quantity?: number
  unitPrice?: number
  items?: CartItem[]
}

class ShoppingCart extends AggregateRoot {
  private items: CartItem[] = []
  private isActive: boolean = true

  constructor(id: string) {
    super(id)
  }

  addItem(productId: string, productName: string, quantity: number, unitPrice: number): void {
    if (!this.isActive) {
      throw new Error('Cannot add items to inactive cart')
    }

    const event: Event = {
      id: crypto.randomUUID(),
      aggregateId: this.id,
      type: 'ITEM_ADDED',
      data: { productId, productName, quantity, unitPrice },
      timestamp: new Date(),
      version: this.version + 1
    }

    this.apply(event)
  }

  removeItem(productId: string): void {
    const item = this.items.find(item => item.productId === productId)
    if (!item) {
      throw new Error('Item not found in cart')
    }

    const event: Event = {
      id: crypto.randomUUID(),
      aggregateId: this.id,
      type: 'ITEM_REMOVED',
      data: { productId },
      timestamp: new Date(),
      version: this.version + 1
    }

    this.apply(event)
  }

  checkout(): void {
    if (this.items.length === 0) {
      throw new Error('Cannot checkout empty cart')
    }

    const event: Event = {
      id: crypto.randomUUID(),
      aggregateId: this.id,
      type: 'CART_CHECKED_OUT',
      data: { items: [...this.items] },
      timestamp: new Date(),
      version: this.version + 1
    }

    this.apply(event)
  }

  protected getEventHandler(eventType: string): Function | null {
    const handlers: Record<string, Function> = {
      'ITEM_ADDED': this.handleItemAdded.bind(this),
      'ITEM_REMOVED': this.handleItemRemoved.bind(this),
      'CART_CHECKED_OUT': this.handleCartCheckedOut.bind(this)
    }
    return handlers[eventType] || null
  }

  private handleItemAdded(data: CartEventData): void {
    const existingItem = this.items.find(item => item.productId === data.productId)

    if (existingItem) {
      existingItem.quantity += data.quantity!
    } else {
      this.items.push({
        productId: data.productId!,
        productName: data.productName!,
        quantity: data.quantity!,
        unitPrice: data.unitPrice!
      })
    }
  }

  private handleItemRemoved(data: CartEventData): void {
    this.items = this.items.filter(item => item.productId !== data.productId)
  }

  private handleCartCheckedOut(): void {
    this.isActive = false
  }

  getItems(): CartItem[] {
    return [...this.items]
  }

  getTotalPrice(): number {
    return this.items.reduce((total, item) => total + (item.quantity * item.unitPrice), 0)
  }

  isCartActive(): boolean {
    return this.isActive
  }
}

// Usage Example
async function example() {
  const eventStore = new EventStore()
  const cart = new ShoppingCart('cart-123')

  // Add items to cart
  cart.addItem('prod-1', 'Laptop', 1, 999.99)
  cart.addItem('prod-2', 'Mouse', 2, 29.99)

  // Save uncommitted events
  await eventStore.saveEvents(cart.id, cart.getUncommittedEvents())
  await cart.markChangesAsCommitted()

  console.log('Cart items:', cart.getItems())
  console.log('Total price:', cart.getTotalPrice())

  // Checkout
  cart.checkout()
  await eventStore.saveEvents(cart.id, cart.getUncommittedEvents())
  await cart.markChangesAsCommitted()

  console.log('Cart is active:', cart.isCartActive())
}

export { Event, EventStore, AggregateRoot, ShoppingCart, example }
        

💻 CQRS Architecture

🔴 complex ⭐⭐⭐⭐

Command Query Responsibility Segregation implementation with separate read and write models

⏱️ 45 min 🏷️ cqrs, command-query-separation, read-write-separation
Prerequisites: Understanding of event sourcing, Domain-Driven Design knowledge

// CQRS Base Classes
interface Command {
  id: string
  aggregateId: string
  timestamp: Date
}

interface Query {
  id: string
  parameters: Record<string, any>
}

interface CommandResult {
  success: boolean
  message?: string
  data?: any
}

interface QueryResult<T = any> {
  success: boolean
  data?: T
  error?: string
}

// Command Handler Interface
interface ICommandHandler<T extends Command> {
  handle(command: T): Promise<CommandResult>
}

// Query Handler Interface
interface IQueryHandler<T extends Query, R> {
  handle(query: T): Promise<QueryResult<R>>
}

// Command Bus
class CommandBus {
  private handlers: Map<string, ICommandHandler<any>> = new Map()

  register<T extends Command>(commandType: string, handler: ICommandHandler<T>): void {
    this.handlers.set(commandType, handler)
  }

  async dispatch<T extends Command>(command: T): Promise<CommandResult> {
    const handler = this.handlers.get(command.constructor.name)
    if (!handler) {
      return {
        success: false,
        message: `No handler found for command: ${command.constructor.name}`
      }
    }

    try {
      return await handler.handle(command)
    } catch (error) {
      return {
        success: false,
        message: error instanceof Error ? error.message : 'Unknown error'
      }
    }
  }
}

// Query Bus
class QueryBus {
  private handlers: Map<string, IQueryHandler<any, any>> = new Map()

  register<T extends Query, R>(queryType: string, handler: IQueryHandler<T, R>): void {
    this.handlers.set(queryType, handler)
  }

  async dispatch<T extends Query, R>(query: T): Promise<QueryResult<R>> {
    const handler = this.handlers.get(query.constructor.name)
    if (!handler) {
      return {
        success: false,
        error: `No handler found for query: ${query.constructor.name}`
      }
    }

    try {
      return await handler.handle(query)
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error.message : 'Unknown error'
      }
    }
  }
}

// Domain Events
interface DomainEvent {
  id: string
  aggregateId: string
  type: string
  data: any
  timestamp: Date
  version: number
}

// Event Publisher
interface EventPublisher {
  publish(events: DomainEvent[]): Promise<void>
}

// Console Event Publisher (for demo)
class ConsoleEventPublisher implements EventPublisher {
  async publish(events: DomainEvent[]): Promise<void> {
    console.log('Publishing events:', events.length)
    for (const event of events) {
      console.log(`Event: ${event.type}, Aggregate: ${event.aggregateId}`)
    }
  }
}

// Order Management Commands
class CreateOrderCommand implements Command {
  id: string
  aggregateId: string
  timestamp: Date

  constructor(
    public customerId: string,
    public items: OrderItem[]
  ) {
    this.id = crypto.randomUUID()
    this.aggregateId = crypto.randomUUID()
    this.timestamp = new Date()
  }
}

class UpdateOrderCommand implements Command {
  id: string
  aggregateId: string
  timestamp: Date

  constructor(
    public orderId: string,
    public items: OrderItem[]
  ) {
    this.id = crypto.randomUUID()
    this.aggregateId = orderId
    this.timestamp = new Date()
  }
}

class CancelOrderCommand implements Command {
  id: string
  aggregateId: string
  timestamp: Date

  constructor(
    public orderId: string,
    public reason: string
  ) {
    this.id = crypto.randomUUID()
    this.aggregateId = orderId
    this.timestamp = new Date()
  }
}

// Order Management Queries
class GetOrderByIdQuery implements Query {
  id: string
  parameters: Record<string, any>

  constructor(public orderId: string) {
    this.id = crypto.randomUUID()
    this.parameters = { orderId }
  }
}

class GetOrdersByCustomerQuery implements Query {
  id: string
  parameters: Record<string, any>

  constructor(public customerId: string) {
    this.id = crypto.randomUUID()
    this.parameters = { customerId }
  }
}

interface OrderItem {
  productId: string
  productName: string
  quantity: number
  unitPrice: number
}

// Write Model - Order Aggregate
class Order {
  private events: DomainEvent[] = []

  constructor(
    public readonly id: string,
    public readonly customerId: string,
    public items: OrderItem[],
    public status: 'PENDING' | 'CONFIRMED' | 'CANCELLED' | 'COMPLETED' = 'PENDING',
    public createdAt: Date = new Date(),
    public updatedAt: Date = new Date()
  ) {}

  updateItems(newItems: OrderItem[]): void {
    if (this.status !== 'PENDING') {
      throw new Error('Cannot update order in current status')
    }

    const event: DomainEvent = {
      id: crypto.randomUUID(),
      aggregateId: this.id,
      type: 'ORDER_ITEMS_UPDATED',
      data: { oldItems: this.items, newItems },
      timestamp: new Date(),
      version: 1
    }

    this.items = newItems
    this.updatedAt = new Date()
    this.events.push(event)
  }

  cancel(reason: string): void {
    if (this.status === 'COMPLETED' || this.status === 'CANCELLED') {
      throw new Error('Cannot cancel order in current status')
    }

    const event: DomainEvent = {
      id: crypto.randomUUID(),
      aggregateId: this.id,
      type: 'ORDER_CANCELLED',
      data: { reason },
      timestamp: new Date(),
      version: 1
    }

    this.status = 'CANCELLED'
    this.updatedAt = new Date()
    this.events.push(event)
  }

  confirm(): void {
    if (this.status !== 'PENDING') {
      throw new Error('Order must be in PENDING status to be confirmed')
    }

    const event: DomainEvent = {
      id: crypto.randomUUID(),
      aggregateId: this.id,
      type: 'ORDER_CONFIRMED',
      data: { confirmedAt: new Date() },
      timestamp: new Date(),
      version: 1
    }

    this.status = 'CONFIRMED'
    this.updatedAt = new Date()
    this.events.push(event)
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.events]
  }

  markEventsAsCommitted(): void {
    this.events = []
  }
}

// Read Model - Order Summary
interface OrderSummary {
  id: string
  customerId: string
  itemCount: number
  totalAmount: number
  status: string
  createdAt: Date
  updatedAt: Date
}

// Read Model Repository
class OrderReadRepository {
  private orders: Map<string, OrderSummary> = new Map()

  async save(orderSummary: OrderSummary): Promise<void> {
    this.orders.set(orderSummary.id, orderSummary)
  }

  async findById(orderId: string): Promise<OrderSummary | null> {
    return this.orders.get(orderId) || null
  }

  async findByCustomerId(customerId: string): Promise<OrderSummary[]> {
    return Array.from(this.orders.values()).filter(order => order.customerId === customerId)
  }
}

// Write Model Repository
class OrderWriteRepository {
  private orders: Map<string, Order> = new Map()

  async save(order: Order): Promise<void> {
    this.orders.set(order.id, order)
  }

  async findById(orderId: string): Promise<Order | null> {
    return this.orders.get(orderId) || null
  }
}

// Command Handlers
class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  constructor(
    private orderRepository: OrderWriteRepository,
    private readRepository: OrderReadRepository,
    private eventPublisher: EventPublisher
  ) {}

  async handle(command: CreateOrderCommand): Promise<CommandResult> {
    const order = new Order(command.aggregateId, command.customerId, command.items)

    await this.orderRepository.save(order)

    // Create read model
    const orderSummary: OrderSummary = {
      id: order.id,
      customerId: order.customerId,
      itemCount: order.items.length,
      totalAmount: order.items.reduce((sum, item) => sum + (item.quantity * item.unitPrice), 0),
      status: order.status,
      createdAt: order.createdAt,
      updatedAt: order.updatedAt
    }

    await this.readRepository.save(orderSummary)

    // Publish events
    await this.eventPublisher.publish(order.getUncommittedEvents())
    order.markEventsAsCommitted()

    return {
      success: true,
      message: 'Order created successfully',
      data: { orderId: order.id }
    }
  }
}

class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
  constructor(
    private orderRepository: OrderWriteRepository,
    private readRepository: OrderReadRepository,
    private eventPublisher: EventPublisher
  ) {}

  async handle(command: CancelOrderCommand): Promise<CommandResult> {
    const order = await this.orderRepository.findById(command.orderId)
    if (!order) {
      return {
        success: false,
        message: 'Order not found'
      }
    }

    try {
      order.cancel(command.reason)
      await this.orderRepository.save(order)

      // Update read model
      const orderSummary = await this.readRepository.findById(command.orderId)
      if (orderSummary) {
        orderSummary.status = order.status
        orderSummary.updatedAt = order.updatedAt
        await this.readRepository.save(orderSummary)
      }

      // Publish events
      await this.eventPublisher.publish(order.getUncommittedEvents())
      order.markEventsAsCommitted()

      return {
        success: true,
        message: 'Order cancelled successfully'
      }
    } catch (error) {
      return {
        success: false,
        message: error instanceof Error ? error.message : 'Failed to cancel order'
      }
    }
  }
}

// Query Handlers
class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery, OrderSummary | null> {
  constructor(private readRepository: OrderReadRepository) {}

  async handle(query: GetOrderByIdQuery): Promise<QueryResult<OrderSummary | null>> {
    try {
      const order = await this.readRepository.findById(query.orderId)
      return {
        success: true,
        data: order
      }
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error.message : 'Failed to get order'
      }
    }
  }
}

class GetOrdersByCustomerHandler implements IQueryHandler<GetOrdersByCustomerQuery, OrderSummary[]> {
  constructor(private readRepository: OrderReadRepository) {}

  async handle(query: GetOrdersByCustomerQuery): Promise<QueryResult<OrderSummary[]>> {
    try {
      const orders = await this.readRepository.findByCustomerId(query.customerId)
      return {
        success: true,
        data: orders
      }
    } catch (error) {
      return {
        success: false,
        error: error instanceof Error ? error.message : 'Failed to get orders'
      }
    }
  }
}

// Application Setup and Usage
async function setupApplication() {
  const commandBus = new CommandBus()
  const queryBus = new QueryBus()

  const orderWriteRepo = new OrderWriteRepository()
  const orderReadRepo = new OrderReadRepository()
  const eventPublisher = new ConsoleEventPublisher()

  // Register command handlers
  commandBus.register('CreateOrderCommand', new CreateOrderHandler(orderWriteRepo, orderReadRepo, eventPublisher))
  commandBus.register('CancelOrderCommand', new CancelOrderHandler(orderWriteRepo, orderReadRepo, eventPublisher))

  // Register query handlers
  queryBus.register('GetOrderByIdQuery', new GetOrderByIdHandler(orderReadRepo))
  queryBus.register('GetOrdersByCustomerQuery', new GetOrdersByCustomerHandler(orderReadRepo))

  return { commandBus, queryBus }
}

// Usage Example
async function cqrsExample() {
  const { commandBus, queryBus } = await setupApplication()

  // Create order
  const createOrderCommand = new CreateOrderCommand('customer-123', [
    { productId: 'prod-1', productName: 'Laptop', quantity: 1, unitPrice: 999.99 },
    { productId: 'prod-2', productName: 'Mouse', quantity: 2, unitPrice: 29.99 }
  ])

  const createResult = await commandBus.dispatch(createOrderCommand)
  console.log('Create order result:', createResult)

  if (createResult.success) {
    const orderId = createResult.data.orderId

    // Query order by ID
    const getOrderQuery = new GetOrderByIdQuery(orderId)
    const orderResult = await queryBus.dispatch(getOrderQuery)
    console.log('Order details:', orderResult.data)

    // Query orders by customer
    const getCustomerOrdersQuery = new GetOrdersByCustomerQuery('customer-123')
    const customerOrdersResult = await queryBus.dispatch(getCustomerOrdersQuery)
    console.log('Customer orders:', customerOrdersResult.data)

    // Cancel order
    const cancelOrderCommand = new CancelOrderCommand(orderId, 'Customer requested cancellation')
    const cancelResult = await commandBus.dispatch(cancelOrderCommand)
    console.log('Cancel order result:', cancelResult)
  }
}

export {
  Command,
  Query,
  CommandBus,
  QueryBus,
  CommandResult,
  QueryResult,
  CreateOrderCommand,
  CancelOrderCommand,
  GetOrderByIdQuery,
  GetOrdersByCustomerQuery,
  cqrsExample
}
        

💻 State Reconstruction

🔴 complex ⭐⭐⭐⭐

Implementing state reconstruction from event streams with snapshot optimization

⏱️ 60 min 🏷️ state-reconstruction, snapshot, performance-optimization
Prerequisites: Understanding of event sourcing, Knowledge of stream processing

// State Reconstruction Engine
interface Event {
  id: string
  aggregateId: string
  type: string
  data: any
  timestamp: Date
  version: number
}

interface Snapshot {
  aggregateId: string
  data: any
  version: number
  timestamp: Date
}

interface StateReconstructor<T> {
  reconstruct(events: Event[]): T
  createSnapshot(state: T, version: number): Snapshot
  applySnapshot(snapshot: Snapshot, events: Event[]): T
}

// Snapshot Strategy
interface SnapshotStrategy {
  shouldTakeSnapshot(aggregateId: string, version: number): boolean
  getSnapshotInterval(): number
}

class IntervalSnapshotStrategy implements SnapshotStrategy {
  constructor(private interval: number = 100) {}

  shouldTakeSnapshot(aggregateId: string, version: number): boolean {
    return version % this.interval === 0
  }

  getSnapshotInterval(): number {
    return this.interval
  }
}

class EventCountSnapshotStrategy implements SnapshotStrategy {
  constructor(private eventCountThreshold: number = 50) {}

  shouldTakeSnapshot(aggregateId: string, version: number): boolean {
    // This would need to track event count since last snapshot
    return version % this.eventCountThreshold === 0
  }

  getSnapshotInterval(): number {
    return this.eventCountThreshold
  }
}

// Snapshot Store
class SnapshotStore {
  private snapshots: Map<string, Snapshot[]> = new Map()

  async saveSnapshot(snapshot: Snapshot): Promise<void> {
    const aggregateSnapshots = this.snapshots.get(snapshot.aggregateId) || []

    // Remove older snapshots to save space (keep only the latest)
    const filteredSnapshots = aggregateSnapshots.filter(s => s.version < snapshot.version)
    filteredSnapshots.push(snapshot)

    this.snapshots.set(snapshot.aggregateId, filteredSnapshots)
  }

  async getLatestSnapshot(aggregateId: string): Promise<Snapshot | null> {
    const aggregateSnapshots = this.snapshots.get(aggregateId) || []
    if (aggregateSnapshots.length === 0) {
      return null
    }

    return aggregateSnapshots.reduce((latest, current) =>
      current.version > latest.version ? current : latest
    )
  }

  async getSnapshotBeforeVersion(aggregateId: string, version: number): Promise<Snapshot | null> {
    const aggregateSnapshots = this.snapshots.get(aggregateId) || []
    const matchingSnapshots = aggregateSnapshots.filter(s => s.version <= version)

    if (matchingSnapshots.length === 0) {
      return null
    }

    return matchingSnapshots.reduce((latest, current) =>
      current.version > latest.version ? current : latest
    )
  }

  async deleteSnapshotsBeforeVersion(aggregateId: string, version: number): Promise<void> {
    const aggregateSnapshots = this.snapshots.get(aggregateId) || []
    const filteredSnapshots = aggregateSnapshots.filter(s => s.version >= version)
    this.snapshots.set(aggregateId, filteredSnapshots)
  }
}

// Event Store with Snapshot Integration
class EnhancedEventStore {
  constructor(
    private events: Map<string, Event[]> = new Map(),
    private snapshotStore: SnapshotStore = new SnapshotStore(),
    private snapshotStrategy: SnapshotStrategy = new IntervalSnapshotStrategy(100)
  ) {}

  async saveEvents(aggregateId: string, events: Event[]): Promise<void> {
    const existingEvents = this.events.get(aggregateId) || []
    const lastVersion = existingEvents.length

    const newEvents = events.map((event, index) => ({
      ...event,
      id: crypto.randomUUID(),
      timestamp: new Date(),
      version: lastVersion + index + 1
    }))

    this.events.set(aggregateId, [...existingEvents, ...newEvents])

    // Check if we should create a snapshot
    const finalVersion = lastVersion + newEvents.length
    if (this.snapshotStrategy.shouldTakeSnapshot(aggregateId, finalVersion)) {
      // This would typically reconstruct the state and create a snapshot
      console.log(`Snapshot opportunity for aggregate ${aggregateId} at version ${finalVersion}`)
    }
  }

  async getEvents(aggregateId: string): Promise<Event[]> {
    return this.events.get(aggregateId) || []
  }

  async getEventsFromVersion(aggregateId: string, fromVersion: number): Promise<Event[]> {
    const events = this.events.get(aggregateId) || []
    return events.filter(event => event.version >= fromVersion)
  }

  async getEventsWithSnapshot(aggregateId: string): Promise<{ events: Event[], snapshot: Snapshot | null }> {
    const snapshot = await this.snapshotStore.getLatestSnapshot(aggregateId)
    let events: Event[]

    if (snapshot) {
      events = await this.getEventsFromVersion(aggregateId, snapshot.version + 1)
    } else {
      events = await this.getEvents(aggregateId)
    }

    return { events, snapshot }
  }
}

// Shopping Cart State
interface CartState {
  id: string
  items: CartItem[]
  isActive: boolean
  totalAmount: number
  version: number
  lastUpdated: Date
}

interface CartItem {
  productId: string
  productName: string
  quantity: number
  unitPrice: number
}

// Shopping Cart State Reconstructor
class ShoppingCartReconstructor implements StateReconstructor<CartState> {
  reconstruct(events: Event[]): CartState {
    let state: CartState = {
      id: '',
      items: [],
      isActive: true,
      totalAmount: 0,
      version: 0,
      lastUpdated: new Date()
    }

    for (const event of events) {
      state = this.applyEvent(state, event)
    }

    return state
  }

  createSnapshot(state: CartState, version: number): Snapshot {
    return {
      aggregateId: state.id,
      data: {
        id: state.id,
        items: state.items,
        isActive: state.isActive,
        totalAmount: state.totalAmount,
        lastUpdated: state.lastUpdated
      },
      version,
      timestamp: new Date()
    }
  }

  applySnapshot(snapshot: Snapshot, events: Event[]): CartState {
    let state: CartState = {
      ...snapshot.data,
      version: snapshot.version
    }

    for (const event of events) {
      state = this.applyEvent(state, event)
    }

    return state
  }

  private applyEvent(state: CartState, event: Event): CartState {
    const newState = { ...state, version: event.version, lastUpdated: event.timestamp }

    switch (event.type) {
      case 'CART_CREATED':
        return {
          ...newState,
          id: event.aggregateId,
          items: [],
          isActive: true,
          totalAmount: 0
        }

      case 'ITEM_ADDED':
        const existingItem = newState.items.find(item => item.productId === event.data.productId)
        let updatedItems: CartItem[]

        if (existingItem) {
          updatedItems = newState.items.map(item =>
            item.productId === event.data.productId
              ? { ...item, quantity: item.quantity + event.data.quantity }
              : item
          )
        } else {
          updatedItems = [
            ...newState.items,
            {
              productId: event.data.productId,
              productName: event.data.productName,
              quantity: event.data.quantity,
              unitPrice: event.data.unitPrice
            }
          ]
        }

        return {
          ...newState,
          items: updatedItems,
          totalAmount: this.calculateTotalAmount(updatedItems)
        }

      case 'ITEM_REMOVED':
        const filteredItems = newState.items.filter(item => item.productId !== event.data.productId)
        return {
          ...newState,
          items: filteredItems,
          totalAmount: this.calculateTotalAmount(filteredItems)
        }

      case 'CART_CHECKED_OUT':
        return {
          ...newState,
          isActive: false
        }

      case 'CART_ABANDONED':
        return {
          ...newState,
          isActive: false
        }

      default:
        return newState
    }
  }

  private calculateTotalAmount(items: CartItem[]): number {
    return items.reduce((total, item) => total + (item.quantity * item.unitPrice), 0)
  }
}

// State Reconstruction Service
class StateReconstructionService {
  constructor(
    private eventStore: EnhancedEventStore,
    private snapshotStore: SnapshotStore,
    private snapshotStrategy: SnapshotStrategy
  ) {}

  async reconstructState<T>(
    aggregateId: string,
    reconstructor: StateReconstructor<T>
  ): Promise<T> {
    // Try to get the latest snapshot first
    const { events, snapshot } = await this.eventStore.getEventsWithSnapshot(aggregateId)

    if (snapshot) {
      console.log(`Found snapshot for ${aggregateId} at version ${snapshot.version}`)
      return reconstructor.applySnapshot(snapshot, events)
    } else {
      console.log(`No snapshot found for ${aggregateId}, reconstructing from all events`)
      return reconstructor.reconstruct(events)
    }
  }

  async reconstructStateAtVersion<T>(
    aggregateId: string,
    targetVersion: number,
    reconstructor: StateReconstructor<T>
  ): Promise<T> {
    // Try to find a snapshot before the target version
    const snapshot = await this.snapshotStore.getSnapshotBeforeVersion(aggregateId, targetVersion)

    if (snapshot) {
      const events = await this.eventStore.getEventsFromVersion(aggregateId, snapshot.version + 1)
      const relevantEvents = events.filter(event => event.version <= targetVersion)
      return reconstructor.applySnapshot(snapshot, relevantEvents)
    } else {
      const allEvents = await this.eventStore.getEvents(aggregateId)
      const relevantEvents = allEvents.filter(event => event.version <= targetVersion)
      return reconstructor.reconstruct(relevantEvents)
    }
  }

  async createSnapshot<T>(
    aggregateId: string,
    reconstructor: StateReconstructor<T>
  ): Promise<void> {
    const events = await this.eventStore.getEvents(aggregateId)

    if (events.length === 0) {
      throw new Error('No events found for aggregate')
    }

    const state = reconstructor.reconstruct(events)
    const latestEvent = events[events.length - 1]

    const snapshot = reconstructor.createSnapshot(state, latestEvent.version)
    await this.snapshotStore.saveSnapshot(snapshot)

    console.log(`Created snapshot for ${aggregateId} at version ${latestEvent.version}`)
  }

  async getReconstructionMetrics(aggregateId: string): Promise<{
    totalEvents: number
    hasSnapshot: boolean
    snapshotVersion?: number
    eventsSinceSnapshot: number
  }> {
    const { events, snapshot } = await this.eventStore.getEventsWithSnapshot(aggregateId)

    return {
      totalEvents: events.length + (snapshot ? 1 : 0),
      hasSnapshot: !!snapshot,
      snapshotVersion: snapshot?.version,
      eventsSinceSnapshot: events.length
    }
  }
}

// Performance Comparison Utility
class ReconstructionPerformanceTester {
  async compareReconstructionMethods(
    aggregateId: string,
    reconstructor: StateReconstructor<any>,
    eventStore: EnhancedEventStore,
    reconstructionService: StateReconstructionService
  ): Promise<{
    fullReconstructionTime: number
    snapshotReconstructionTime: number
    performanceGain: number
  }> {
    // Measure full reconstruction
    const fullStart = performance.now()
    const allEvents = await eventStore.getEvents(aggregateId)
    await reconstructor.reconstruct(allEvents)
    const fullEnd = performance.now()
    const fullReconstructionTime = fullEnd - fullStart

    // Measure snapshot-based reconstruction
    const snapshotStart = performance.now()
    await reconstructionService.reconstructState(aggregateId, reconstructor)
    const snapshotEnd = performance.now()
    const snapshotReconstructionTime = snapshotEnd - snapshotStart

    const performanceGain = fullReconstructionTime / snapshotReconstructionTime

    return {
      fullReconstructionTime,
      snapshotReconstructionTime,
      performanceGain
    }
  }
}

// Usage Example
async function stateReconstructionExample() {
  const snapshotStore = new SnapshotStore()
  const snapshotStrategy = new IntervalSnapshotStrategy(10) // Snapshot every 10 events
  const eventStore = new EnhancedEventStore(new Map(), snapshotStore, snapshotStrategy)
  const reconstructor = new ShoppingCartReconstructor()
  const reconstructionService = new StateReconstructionService(eventStore, snapshotStore, snapshotStrategy)

  const aggregateId = 'cart-123'

  // Simulate adding many events to the cart
  const events: Event[] = [
    {
      id: crypto.randomUUID(),
      aggregateId,
      type: 'CART_CREATED',
      data: {},
      timestamp: new Date(),
      version: 1
    }
  ]

  // Add 25 items to create many events
  for (let i = 1; i <= 25; i++) {
    events.push({
      id: crypto.randomUUID(),
      aggregateId,
      type: 'ITEM_ADDED',
      data: {
        productId: `prod-${i}`,
        productName: `Product ${i}`,
        quantity: 1,
        unitPrice: 10 * i
      },
      timestamp: new Date(),
      version: i + 1
    })
  }

  await eventStore.saveEvents(aggregateId, events)

  // Create a snapshot at current state
  await reconstructionService.createSnapshot(aggregateId, reconstructor)

  // Add more events after snapshot
  const moreEvents: Event[] = []
  for (let i = 26; i <= 35; i++) {
    moreEvents.push({
      id: crypto.randomUUID(),
      aggregateId,
      type: 'ITEM_ADDED',
      data: {
        productId: `prod-${i}`,
        productName: `Product ${i}`,
        quantity: 1,
        unitPrice: 10 * i
      },
      timestamp: new Date(),
      version: i + 1
    })
  }

  await eventStore.saveEvents(aggregateId, moreEvents)

  // Get reconstruction metrics
  const metrics = await reconstructionService.getReconstructionMetrics(aggregateId)
  console.log('Reconstruction metrics:', metrics)

  // Reconstruct state using snapshot optimization
  const cartState = await reconstructionService.reconstructState(aggregateId, reconstructor)
  console.log('Reconstructed cart state:', {
    id: cartState.id,
    itemCount: cartState.items.length,
    totalAmount: cartState.totalAmount,
    isActive: cartState.isActive,
    version: cartState.version
  })

  // Performance comparison
  const performanceTester = new ReconstructionPerformanceTester()
  const performance = await performanceTester.compareReconstructionMethods(
    aggregateId,
    reconstructor,
    eventStore,
    reconstructionService
  )

  console.log('Performance comparison:', {
    fullReconstruction: `${performance.fullReconstructionTime.toFixed(2)}ms`,
    snapshotReconstruction: `${performance.snapshotReconstructionTime.toFixed(2)}ms`,
    performanceGain: `${performance.performanceGain.toFixed(2)}x faster`
  })
}

export {
  Event,
  Snapshot,
  StateReconstructor,
  SnapshotStrategy,
  IntervalSnapshotStrategy,
  SnapshotStore,
  EnhancedEventStore,
  CartState,
  ShoppingCartReconstructor,
  StateReconstructionService,
  stateReconstructionExample
}
        

💻 Event Versioning

🔴 complex ⭐⭐⭐⭐⭐

Managing event schema evolution and versioning strategies for event sourcing systems

⏱️ 75 min 🏷️ event-versioning, schema-evolution, data-migration
Prerequisites: Understanding of event sourcing, Schema management knowledge

// Event Versioning System
interface EventSchema {
  type: string
  version: number
  schema: any
}

interface EventTransformer {
  fromVersion: number
  toVersion: number
  transform(event: any): any
}

interface VersionedEvent {
  id: string
  aggregateId: string
  type: string
  data: any
  timestamp: Date
  version: number
  eventVersion: number
}

// Schema Registry
class SchemaRegistry {
  private schemas: Map<string, EventSchema[]> = new Map()
  private transformers: Map<string, EventTransformer[]> = new Map()

  registerSchema(eventType: string, version: number, schema: any): void {
    const schemas = this.schemas.get(eventType) || []
    schemas.push({ type: eventType, version, schema })
    this.schemas.set(eventType, schemas)
  }

  registerTransformer(eventType: string, transformer: EventTransformer): void {
    const transformers = this.transformers.get(eventType) || []
    transformers.push(transformer)
    this.transformers.set(eventType, transformers)
  }

  getLatestVersion(eventType: string): number {
    const schemas = this.schemas.get(eventType) || []
    if (schemas.length === 0) {
      throw new Error(`No schema found for event type: ${eventType}`)
    }
    return Math.max(...schemas.map(s => s.version))
  }

  getSchema(eventType: string, version: number): EventSchema | null {
    const schemas = this.schemas.get(eventType) || []
    return schemas.find(s => s.version === version) || null
  }

  getTransformers(eventType: string): EventTransformer[] {
    return this.transformers.get(eventType) || []
  }

  getTransformationPath(eventType: string, fromVersion: number, toVersion: number): EventTransformer[] {
    const transformers = this.getTransformers(eventType)
    const path: EventTransformer[] = []

    let currentVersion = fromVersion
    while (currentVersion < toVersion) {
      const nextTransformer = transformers.find(t => t.fromVersion === currentVersion)
      if (!nextTransformer) {
        throw new Error(`No transformer found from version ${currentVersion} for event type ${eventType}`)
      }
      path.push(nextTransformer)
      currentVersion = nextTransformer.toVersion
    }

    return path
  }
}

// Event Upgrader
class EventUpgrader {
  constructor(private schemaRegistry: SchemaRegistry) {}

  async upgradeEvent(event: VersionedEvent, targetVersion?: number): Promise<VersionedEvent> {
    const latestVersion = targetVersion || this.schemaRegistry.getLatestVersion(event.type)

    if (event.eventVersion >= latestVersion) {
      return event // Event is already at or above target version
    }

    const transformationPath = this.schemaRegistry.getTransformationPath(
      event.type,
      event.eventVersion,
      latestVersion
    )

    let upgradedEvent = { ...event }

    for (const transformer of transformationPath) {
      upgradedEvent.data = transformer.transform(upgradedEvent.data)
      upgradedEvent.eventVersion = transformer.toVersion
    }

    return upgradedEvent
  }

  async upgradeEvents(events: VersionedEvent[]): Promise<VersionedEvent[]> {
    const upgradedEvents: VersionedEvent[] = []

    for (const event of events) {
      try {
        const upgraded = await this.upgradeEvent(event)
        upgradedEvents.push(upgraded)
      } catch (error) {
        console.error(`Failed to upgrade event ${event.id}:`, error)
        // Keep original event if upgrade fails
        upgradedEvents.push(event)
      }
    }

    return upgradedEvents
  }
}

// Event Validation
class EventValidator {
  constructor(private schemaRegistry: SchemaRegistry) {}

  validateEvent(event: VersionedEvent): { isValid: boolean; errors: string[] } {
    const schema = this.schemaRegistry.getSchema(event.type, event.eventVersion)
    if (!schema) {
      return {
        isValid: false,
        errors: [`No schema found for event type ${event.type} version ${event.eventVersion}`]
      }
    }

    return this.validateAgainstSchema(event.data, schema.schema)
  }

  private validateAgainstSchema(data: any, schema: any): { isValid: boolean; errors: string[] } {
    const errors: string[] = []

    // Simple validation logic (in production, use a proper JSON schema validator)
    if (schema.type === 'object' && schema.properties) {
      for (const [key, propSchema] of Object.entries(schema.properties as any)) {
        if (schema.required?.includes(key) && !(key in data)) {
          errors.push(`Missing required property: ${key}`)
        }

        if (key in data) {
          const propErrors = this.validateProperty(data[key], propSchema)
          errors.push(...propErrors.map(e => `${key}: ${e}`))
        }
      }
    }

    return {
      isValid: errors.length === 0,
      errors
    }
  }

  private validateProperty(value: any, schema: any): string[] {
    const errors: string[] = []

    if (schema.type && typeof value !== schema.type) {
      errors.push(`Expected type ${schema.type}, got ${typeof value}`)
    }

    return errors
  }
}

// Example Event Schemas

// V1: ItemAdded event with simple structure
const ItemAddedV1Schema = {
  type: 'object',
  properties: {
    productId: { type: 'string' },
    quantity: { type: 'number' },
    unitPrice: { type: 'number' }
  },
  required: ['productId', 'quantity', 'unitPrice']
}

// V2: ItemAdded event with additional productName field
const ItemAddedV2Schema = {
  type: 'object',
  properties: {
    productId: { type: 'string' },
    productName: { type: 'string' },
    quantity: { type: 'number' },
    unitPrice: { type: 'number' },
    discount: { type: 'number', minimum: 0, maximum: 1 }
  },
  required: ['productId', 'productName', 'quantity', 'unitPrice']
}

// V3: ItemAdded event with category and metadata
const ItemAddedV3Schema = {
  type: 'object',
  properties: {
    productId: { type: 'string' },
    productName: { type: 'string' },
    quantity: { type: 'number' },
    unitPrice: { type: 'number' },
    discount: { type: 'number', minimum: 0, maximum: 1 },
    category: { type: 'string' },
    metadata: { type: 'object' }
  },
  required: ['productId', 'productName', 'quantity', 'unitPrice', 'category']
}

// Event Transformers

// V1 to V2: Add productName based on productId lookup
class ItemAddedV1ToV2Transformer implements EventTransformer {
  fromVersion = 1
  toVersion = 2

  // In a real implementation, this would query a product catalog
  private productNames: Record<string, string> = {
    'prod-1': 'Laptop',
    'prod-2': 'Mouse',
    'prod-3': 'Keyboard'
  }

  transform(event: any): any {
    return {
      ...event,
      productName: this.productNames[event.productId] || 'Unknown Product',
      discount: event.discount || 0
    }
  }
}

// V2 to V3: Add category and metadata
class ItemAddedV2ToV3Transformer implements EventTransformer {
  fromVersion = 2
  toVersion = 3

  // Simple categorization based on product ID patterns
  private getProductCategory(productId: string): string {
    if (productId.includes('laptop')) return 'Electronics'
    if (productId.includes('mouse') || productId.includes('keyboard')) return 'Accessories'
    return 'General'
  }

  transform(event: any): any {
    return {
      ...event,
      category: this.getProductCategory(event.productId),
      metadata: {
        source: 'legacy-system',
        migratedAt: new Date().toISOString()
      }
    }
  }
}

// V2 to V4: Remove discount, add pricing breakdown
class ItemAddedV3ToV4Transformer implements EventTransformer {
  fromVersion = 3
  toVersion = 4

  transform(event: any): any {
    const discountAmount = event.unitPrice * event.quantity * (event.discount || 0)
    const finalPrice = event.unitPrice - (event.unitPrice * (event.discount || 0))

    return {
      productId: event.productId,
      productName: event.productName,
      quantity: event.quantity,
      unitPrice: event.unitPrice,
      finalPrice,
      discountAmount,
      category: event.category,
      metadata: {
        ...event.metadata,
        pricing: {
          originalUnitPrice: event.unitPrice,
          discountPercentage: event.discount || 0,
          discountAmount,
          finalUnitPrice: finalPrice
        }
      }
    }
  }
}

const ItemAddedV4Schema = {
  type: 'object',
  properties: {
    productId: { type: 'string' },
    productName: { type: 'string' },
    quantity: { type: 'number' },
    unitPrice: { type: 'number' },
    finalPrice: { type: 'number' },
    discountAmount: { type: 'number' },
    category: { type: 'string' },
    metadata: {
      type: 'object',
      properties: {
        source: { type: 'string' },
        migratedAt: { type: 'string' },
        pricing: {
          type: 'object',
          properties: {
            originalUnitPrice: { type: 'number' },
            discountPercentage: { type: 'number' },
            discountAmount: { type: 'number' },
            finalUnitPrice: { type: 'number' }
          }
        }
      }
    }
  },
  required: ['productId', 'productName', 'quantity', 'unitPrice', 'finalPrice', 'category']
}

// Versioned Event Store
class VersionedEventStore {
  private events: Map<string, VersionedEvent[]> = new Map()

  constructor(
    private schemaRegistry: SchemaRegistry,
    private eventUpgrader: EventUpgrader,
    private eventValidator: EventValidator
  ) {}

  async saveEvent(event: VersionedEvent): Promise<void> {
    // Validate event against its schema
    const validation = this.eventValidator.validateEvent(event)
    if (!validation.isValid) {
      throw new Error(`Event validation failed: ${validation.errors.join(', ')}`)
    }

    const aggregateEvents = this.events.get(event.aggregateId) || []
    aggregateEvents.push(event)
    this.events.set(event.aggregateId, aggregateEvents)
  }

  async getEvents(aggregateId: string, targetVersion?: number): Promise<VersionedEvent[]> {
    const events = this.events.get(aggregateId) || []

    if (!targetVersion) {
      return events
    }

    // Upgrade events to target version
    const upgradedEvents: VersionedEvent[] = []
    for (const event of events) {
      const upgraded = await this.eventUpgrader.upgradeEvent(event, targetVersion)
      upgradedEvents.push(upgraded)
    }

    return upgradedEvents
  }

  async getLatestEvents(aggregateId: string): Promise<VersionedEvent[]> {
    const events = this.events.get(aggregateId) || []
    const latestVersion = Math.max(...events.map(e => this.schemaRegistry.getLatestVersion(e.type)))
    return this.getEvents(aggregateId, latestVersion)
  }

  async migrateEvents(aggregateId: string): Promise<{
    totalEvents: number
    upgradedEvents: number
    errors: string[]
  }> {
    const events = this.events.get(aggregateId) || []
    const errors: string[] = []
    let upgradedCount = 0

    const upgradedEvents = await this.eventUpgrader.upgradeEvents(events)

    // Validate upgraded events
    for (const event of upgradedEvents) {
      const validation = this.eventValidator.validateEvent(event)
      if (!validation.isValid) {
        errors.push(`Event ${event.id}: ${validation.errors.join(', ')}`)
      } else if (event.eventVersion !== this.schemaRegistry.getLatestVersion(event.type)) {
        upgradedCount++
      }
    }

    // Store upgraded events
    this.events.set(aggregateId, upgradedEvents)

    return {
      totalEvents: events.length,
      upgradedEvents: upgradedCount,
      errors
    }
  }
}

// Event Versioning Analytics
class EventVersioningAnalytics {
  constructor(
    private eventStore: VersionedEventStore,
    private schemaRegistry: SchemaRegistry
  ) {}

  async getVersionDistribution(aggregateId?: string): Promise<Array<{
    eventType: string
    versionDistribution: Record<number, number>
    totalEvents: number
  }>> {
    // This would analyze all events in the store
    // For demo purposes, return sample data
    return [
      {
        eventType: 'ITEM_ADDED',
        versionDistribution: {
          1: 10,
          2: 25,
          3: 15
        },
        totalEvents: 50
      },
      {
        eventType: 'ITEM_REMOVED',
        versionDistribution: {
          1: 5,
          2: 8
        },
        totalEvents: 13
      }
    ]
  }

  async getMigrationReport(aggregateId: string): Promise<{
    eventsMigrated: number
    eventsWithErrors: number
    errorDetails: string[]
  }> {
    const migration = await this.eventStore.migrateEvents(aggregateId)

    return {
      eventsMigrated: migration.upgradedEvents,
      eventsWithErrors: migration.errors.length,
      errorDetails: migration.errors
    }
  }

  async getSchemaEvolutionTimeline(eventType: string): Promise<Array<{
    version: number
    introducedAt: Date
    changes: string[]
  }>> {
    // This would track when each schema version was introduced
    return [
      {
        version: 1,
        introducedAt: new Date('2023-01-01'),
        changes: ['Initial version', 'Basic item data']
      },
      {
        version: 2,
        introducedAt: new Date('2023-06-01'),
        changes: ['Added productName field', 'Added optional discount']
      },
      {
        version: 3,
        introducedAt: new Date('2024-01-01'),
        changes: ['Added category field', 'Added metadata object']
      }
    ]
  }
}

// Setup and Usage Example
async function setupEventVersioning() {
  const schemaRegistry = new SchemaRegistry()

  // Register schemas
  schemaRegistry.registerSchema('ITEM_ADDED', 1, ItemAddedV1Schema)
  schemaRegistry.registerSchema('ITEM_ADDED', 2, ItemAddedV2Schema)
  schemaRegistry.registerSchema('ITEM_ADDED', 3, ItemAddedV3Schema)
  schemaRegistry.registerSchema('ITEM_ADDED', 4, ItemAddedV4Schema)

  // Register transformers
  schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV1ToV2Transformer())
  schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV2ToV3Transformer())
  schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV3ToV4Transformer())

  const eventUpgrader = new EventUpgrader(schemaRegistry)
  const eventValidator = new EventValidator(schemaRegistry)
  const eventStore = new VersionedEventStore(schemaRegistry, eventUpgrader, eventValidator)
  const analytics = new EventVersioningAnalytics(eventStore, schemaRegistry)

  return { eventStore, analytics, schemaRegistry }
}

async function eventVersioningExample() {
  const { eventStore, analytics, schemaRegistry } = await setupEventVersioning()

  // Create sample events with different versions
  const aggregateId = 'cart-123'

  const events: VersionedEvent[] = [
    {
      id: crypto.randomUUID(),
      aggregateId,
      type: 'ITEM_ADDED',
      data: {
        productId: 'prod-1',
        quantity: 1,
        unitPrice: 999.99
      },
      timestamp: new Date('2023-05-01'),
      version: 1,
      eventVersion: 1 // V1 schema
    },
    {
      id: crypto.randomUUID(),
      aggregateId,
      type: 'ITEM_ADDED',
      data: {
        productId: 'prod-2',
        productName: 'Wireless Mouse',
        quantity: 2,
        unitPrice: 29.99,
        discount: 0.1
      },
      timestamp: new Date('2023-07-01'),
      version: 2,
      eventVersion: 2 // V2 schema
    },
    {
      id: crypto.randomUUID(),
      aggregateId,
      type: 'ITEM_ADDED',
      data: {
        productId: 'prod-3',
        productName: 'Mechanical Keyboard',
        quantity: 1,
        unitPrice: 149.99,
        discount: 0.05,
        category: 'Accessories',
        metadata: {
          source: 'web-app',
          addedBy: 'user-456'
        }
      },
      timestamp: new Date('2024-02-01'),
      version: 3,
      eventVersion: 3 // V3 schema
    }
  ]

  // Save events (validation will occur)
  for (const event of events) {
    try {
      await eventStore.saveEvent(event)
      console.log(`Saved event ${event.id} with version ${event.eventVersion}`)
    } catch (error) {
      console.error(`Failed to save event ${event.id}:`, error)
    }
  }

  // Get events in their original versions
  console.log('\n=== Original Events ===')
  const originalEvents = await eventStore.getEvents(aggregateId)
  originalEvents.forEach(event => {
    console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
  })

  // Get events upgraded to latest version
  console.log('\n=== Upgraded Events (Latest Version) ===')
  const latestEvents = await eventStore.getLatestEvents(aggregateId)
  latestEvents.forEach(event => {
    console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
  })

  // Get events upgraded to specific version
  console.log('\n=== Events Upgraded to V2 ===')
  const v2Events = await eventStore.getEvents(aggregateId, 2)
  v2Events.forEach(event => {
    console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
  })

  // Migration report
  console.log('\n=== Migration Report ===')
  const migrationReport = await analytics.getMigrationReport(aggregateId)
  console.log('Migration report:', migrationReport)

  // Version distribution
  console.log('\n=== Version Distribution ===')
  const versionDistribution = await analytics.getVersionDistribution()
  versionDistribution.forEach(dist => {
    console.log(`Event type ${dist.eventType}:`, dist.versionDistribution)
  })

  // Schema evolution timeline
  console.log('\n=== Schema Evolution Timeline ===')
  const timeline = await analytics.getSchemaEvolutionTimeline('ITEM_ADDED')
  timeline.forEach(entry => {
    console.log(`Version ${entry.version} (${entry.introducedAt.toISOString().split('T')[0]}):`, entry.changes)
  })
}

export {
  VersionedEvent,
  EventSchema,
  EventTransformer,
  SchemaRegistry,
  EventUpgrader,
  EventValidator,
  VersionedEventStore,
  EventVersioningAnalytics,
  ItemAddedV1ToV2Transformer,
  ItemAddedV2ToV3Transformer,
  ItemAddedV3ToV4Transformer,
  setupEventVersioning,
  eventVersioningExample
}