🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples Event Sourcing
Patterns complets de event sourcing couvrant le stockage d'événements, l'architecture CQRS, la reconstruction d'état et les stratégies de versionnement d'événements
💻 Pattern de Stockage d'Événements
🟡 intermediate
⭐⭐⭐
Implémentation de base de stockage d'événements avec event store et racine d'agrégat
⏱️ 30 min
🏷️ event-storage, aggregate, domain-events
Prerequisites:
Understanding of Domain-Driven Design, Basic knowledge of event patterns
// Event Store Implementation
interface Event {
id: string
aggregateId: string
type: string
data: any
timestamp: Date
version: number
}
class EventStore {
private events: Map<string, Event[]> = new Map()
async saveEvents(aggregateId: string, events: Event[]): Promise<void> {
const existingEvents = this.events.get(aggregateId) || []
const newEvents = events.map((event, index) => ({
...event,
id: crypto.randomUUID(),
timestamp: new Date(),
version: existingEvents.length + index + 1
}))
this.events.set(aggregateId, [...existingEvents, ...newEvents])
}
async getEvents(aggregateId: string): Promise<Event[]> {
return this.events.get(aggregateId) || []
}
async getEventsFromVersion(aggregateId: string, fromVersion: number): Promise<Event[]> {
const events = this.events.get(aggregateId) || []
return events.filter(event => event.version >= fromVersion)
}
}
// Aggregate Root
abstract class AggregateRoot {
private _id: string
private _version: number = 0
private _uncommittedEvents: Event[] = []
constructor(id: string) {
this._id = id
}
get id(): string {
return this._id
}
get version(): number {
return this._version
}
protected apply(event: Event): void {
this.applyChange(event, true)
this._uncommittedEvents.push(event)
}
private applyChange(event: Event, isNew: boolean): void {
const handler = this.getEventHandler(event.type)
if (handler) {
handler.call(this, event.data)
}
if (isNew) {
this._version = event.version
}
}
protected abstract getEventHandler(eventType: string): Function | null
async markChangesAsCommitted(): Promise<void> {
this._uncommittedEvents = []
}
getUncommittedEvents(): Event[] {
return [...this._uncommittedEvents]
}
async loadFromHistory(events: Event[]): Promise<void> {
for (const event of events) {
this.applyChange(event, false)
this._version = event.version
}
}
}
// Shopping Cart Example
interface CartItem {
productId: string
productName: string
quantity: number
unitPrice: number
}
interface CartEventData {
productId?: string
productName?: string
quantity?: number
unitPrice?: number
items?: CartItem[]
}
class ShoppingCart extends AggregateRoot {
private items: CartItem[] = []
private isActive: boolean = true
constructor(id: string) {
super(id)
}
addItem(productId: string, productName: string, quantity: number, unitPrice: number): void {
if (!this.isActive) {
throw new Error('Cannot add items to inactive cart')
}
const event: Event = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ITEM_ADDED',
data: { productId, productName, quantity, unitPrice },
timestamp: new Date(),
version: this.version + 1
}
this.apply(event)
}
removeItem(productId: string): void {
const item = this.items.find(item => item.productId === productId)
if (!item) {
throw new Error('Item not found in cart')
}
const event: Event = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ITEM_REMOVED',
data: { productId },
timestamp: new Date(),
version: this.version + 1
}
this.apply(event)
}
checkout(): void {
if (this.items.length === 0) {
throw new Error('Cannot checkout empty cart')
}
const event: Event = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'CART_CHECKED_OUT',
data: { items: [...this.items] },
timestamp: new Date(),
version: this.version + 1
}
this.apply(event)
}
protected getEventHandler(eventType: string): Function | null {
const handlers: Record<string, Function> = {
'ITEM_ADDED': this.handleItemAdded.bind(this),
'ITEM_REMOVED': this.handleItemRemoved.bind(this),
'CART_CHECKED_OUT': this.handleCartCheckedOut.bind(this)
}
return handlers[eventType] || null
}
private handleItemAdded(data: CartEventData): void {
const existingItem = this.items.find(item => item.productId === data.productId)
if (existingItem) {
existingItem.quantity += data.quantity!
} else {
this.items.push({
productId: data.productId!,
productName: data.productName!,
quantity: data.quantity!,
unitPrice: data.unitPrice!
})
}
}
private handleItemRemoved(data: CartEventData): void {
this.items = this.items.filter(item => item.productId !== data.productId)
}
private handleCartCheckedOut(): void {
this.isActive = false
}
getItems(): CartItem[] {
return [...this.items]
}
getTotalPrice(): number {
return this.items.reduce((total, item) => total + (item.quantity * item.unitPrice), 0)
}
isCartActive(): boolean {
return this.isActive
}
}
// Usage Example
async function example() {
const eventStore = new EventStore()
const cart = new ShoppingCart('cart-123')
// Add items to cart
cart.addItem('prod-1', 'Laptop', 1, 999.99)
cart.addItem('prod-2', 'Mouse', 2, 29.99)
// Save uncommitted events
await eventStore.saveEvents(cart.id, cart.getUncommittedEvents())
await cart.markChangesAsCommitted()
console.log('Cart items:', cart.getItems())
console.log('Total price:', cart.getTotalPrice())
// Checkout
cart.checkout()
await eventStore.saveEvents(cart.id, cart.getUncommittedEvents())
await cart.markChangesAsCommitted()
console.log('Cart is active:', cart.isCartActive())
}
export { Event, EventStore, AggregateRoot, ShoppingCart, example }
💻 Architecture CQRS
🔴 complex
⭐⭐⭐⭐
Implémentation Command Query Responsibility Segregation avec modèles de lecture et écriture séparés
⏱️ 45 min
🏷️ cqrs, command-query-separation, read-write-separation
Prerequisites:
Understanding of event sourcing, Domain-Driven Design knowledge
// CQRS Base Classes
interface Command {
id: string
aggregateId: string
timestamp: Date
}
interface Query {
id: string
parameters: Record<string, any>
}
interface CommandResult {
success: boolean
message?: string
data?: any
}
interface QueryResult<T = any> {
success: boolean
data?: T
error?: string
}
// Command Handler Interface
interface ICommandHandler<T extends Command> {
handle(command: T): Promise<CommandResult>
}
// Query Handler Interface
interface IQueryHandler<T extends Query, R> {
handle(query: T): Promise<QueryResult<R>>
}
// Command Bus
class CommandBus {
private handlers: Map<string, ICommandHandler<any>> = new Map()
register<T extends Command>(commandType: string, handler: ICommandHandler<T>): void {
this.handlers.set(commandType, handler)
}
async dispatch<T extends Command>(command: T): Promise<CommandResult> {
const handler = this.handlers.get(command.constructor.name)
if (!handler) {
return {
success: false,
message: `No handler found for command: ${command.constructor.name}`
}
}
try {
return await handler.handle(command)
} catch (error) {
return {
success: false,
message: error instanceof Error ? error.message : 'Unknown error'
}
}
}
}
// Query Bus
class QueryBus {
private handlers: Map<string, IQueryHandler<any, any>> = new Map()
register<T extends Query, R>(queryType: string, handler: IQueryHandler<T, R>): void {
this.handlers.set(queryType, handler)
}
async dispatch<T extends Query, R>(query: T): Promise<QueryResult<R>> {
const handler = this.handlers.get(query.constructor.name)
if (!handler) {
return {
success: false,
error: `No handler found for query: ${query.constructor.name}`
}
}
try {
return await handler.handle(query)
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
}
}
}
}
// Domain Events
interface DomainEvent {
id: string
aggregateId: string
type: string
data: any
timestamp: Date
version: number
}
// Event Publisher
interface EventPublisher {
publish(events: DomainEvent[]): Promise<void>
}
// Console Event Publisher (for demo)
class ConsoleEventPublisher implements EventPublisher {
async publish(events: DomainEvent[]): Promise<void> {
console.log('Publishing events:', events.length)
for (const event of events) {
console.log(`Event: ${event.type}, Aggregate: ${event.aggregateId}`)
}
}
}
// Order Management Commands
class CreateOrderCommand implements Command {
id: string
aggregateId: string
timestamp: Date
constructor(
public customerId: string,
public items: OrderItem[]
) {
this.id = crypto.randomUUID()
this.aggregateId = crypto.randomUUID()
this.timestamp = new Date()
}
}
class UpdateOrderCommand implements Command {
id: string
aggregateId: string
timestamp: Date
constructor(
public orderId: string,
public items: OrderItem[]
) {
this.id = crypto.randomUUID()
this.aggregateId = orderId
this.timestamp = new Date()
}
}
class CancelOrderCommand implements Command {
id: string
aggregateId: string
timestamp: Date
constructor(
public orderId: string,
public reason: string
) {
this.id = crypto.randomUUID()
this.aggregateId = orderId
this.timestamp = new Date()
}
}
// Order Management Queries
class GetOrderByIdQuery implements Query {
id: string
parameters: Record<string, any>
constructor(public orderId: string) {
this.id = crypto.randomUUID()
this.parameters = { orderId }
}
}
class GetOrdersByCustomerQuery implements Query {
id: string
parameters: Record<string, any>
constructor(public customerId: string) {
this.id = crypto.randomUUID()
this.parameters = { customerId }
}
}
interface OrderItem {
productId: string
productName: string
quantity: number
unitPrice: number
}
// Write Model - Order Aggregate
class Order {
private events: DomainEvent[] = []
constructor(
public readonly id: string,
public readonly customerId: string,
public items: OrderItem[],
public status: 'PENDING' | 'CONFIRMED' | 'CANCELLED' | 'COMPLETED' = 'PENDING',
public createdAt: Date = new Date(),
public updatedAt: Date = new Date()
) {}
updateItems(newItems: OrderItem[]): void {
if (this.status !== 'PENDING') {
throw new Error('Cannot update order in current status')
}
const event: DomainEvent = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ORDER_ITEMS_UPDATED',
data: { oldItems: this.items, newItems },
timestamp: new Date(),
version: 1
}
this.items = newItems
this.updatedAt = new Date()
this.events.push(event)
}
cancel(reason: string): void {
if (this.status === 'COMPLETED') {
throw new Error('Cannot complete order')
}
const event: DomainEvent = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ORDER_CANCELLED',
data: { reason },
timestamp: new Date(),
version: 1
}
this.status = 'CANCELLED'
this.updatedAt = new Date()
this.events.push(event)
}
confirm(): void {
if (this.status !== 'PENDING') {
throw new Error('Order must be in PENDING status to be confirmed')
}
const event: DomainEvent = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ORDER_CONFIRMED',
data: { confirmedAt: new Date() },
timestamp: new Date(),
version: 1
}
this.status = 'CONFIRMED'
this.updatedAt = new Date()
this.events.push(event)
}
getUncommittedEvents(): DomainEvent[] {
return [...this.events]
}
markEventsAsCommitted(): void {
this.events = []
}
}
// Read Model - Order Summary
interface OrderSummary {
id: string
customerId: string
itemCount: number
totalAmount: number
status: string
createdAt: Date
updatedAt: Date
}
// Read Model Repository
class OrderReadRepository {
private orders: Map<string, OrderSummary> = new Map()
async save(orderSummary: OrderSummary): Promise<void> {
this.orders.set(orderSummary.id, orderSummary)
}
async findById(orderId: string): Promise<OrderSummary | null> {
return this.orders.get(orderId) || null
}
async findByCustomerId(customerId: string): Promise<OrderSummary[]> {
return Array.from(this.orders.values()).filter(order => order.customerId === customerId)
}
}
// Write Model Repository
class OrderWriteRepository {
private orders: Map<string, Order> = new Map()
async save(order: Order): Promise<void> {
this.orders.set(order.id, order)
}
async findById(orderId: string): Promise<Order | null> {
return this.orders.get(orderId) || null
}
}
// Command Handlers
class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private orderRepository: OrderWriteRepository,
private readRepository: OrderReadRepository,
private eventPublisher: EventPublisher
) {}
async handle(command: CreateOrderCommand): Promise<CommandResult> {
const order = new Order(command.aggregateId, command.customerId, command.items)
await this.orderRepository.save(order)
// Create read model
const orderSummary: OrderSummary = {
id: order.id,
customerId: order.customerId,
itemCount: order.items.length,
totalAmount: order.items.reduce((sum, item) => sum + (item.quantity * item.unitPrice), 0),
status: order.status,
createdAt: order.createdAt,
updatedAt: order.updatedAt
}
await this.readRepository.save(orderSummary)
// Publish events
await this.eventPublisher.publish(order.getUncommittedEvents())
order.markEventsAsCommitted()
return {
success: true,
message: 'Order created successfully',
data: { orderId: order.id }
}
}
}
class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
constructor(
private orderRepository: OrderWriteRepository,
private readRepository: OrderReadRepository,
private eventPublisher: EventPublisher
) {}
async handle(command: CancelOrderCommand): Promise<CommandResult> {
const order = await this.orderRepository.findById(command.orderId)
if (!order) {
return {
success: false,
message: 'Order not found'
}
}
try {
order.cancel(command.reason)
await this.orderRepository.save(order)
// Update read model
const orderSummary = await this.readRepository.findById(command.orderId)
if (orderSummary) {
orderSummary.status = order.status
orderSummary.updatedAt = order.updatedAt
await this.readRepository.save(orderSummary)
}
// Publish events
await this.eventPublisher.publish(order.getUncommittedEvents())
order.markEventsAsCommitted()
return {
success: true,
message: 'Order cancelled successfully'
}
} catch (error) {
return {
success: false,
message: error instanceof Error ? error.message : 'Failed to cancel order'
}
}
}
}
// Query Handlers
class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery, OrderSummary | null> {
constructor(private readRepository: OrderReadRepository) {}
async handle(query: GetOrderByIdQuery): Promise<QueryResult<OrderSummary | null>> {
try {
const order = await this.readRepository.findById(query.orderId)
return {
success: true,
data: order
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to get order'
}
}
}
}
class GetOrdersByCustomerHandler implements IQueryHandler<GetOrdersByCustomerQuery, OrderSummary[]> {
constructor(private readRepository: OrderReadRepository) {}
async handle(query: GetOrdersByCustomerQuery): Promise<QueryResult<OrderSummary[]>> {
try {
const orders = await this.readRepository.findByCustomerId(query.customerId)
return {
success: true,
data: orders
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to get orders'
}
}
}
}
// Application Setup and Usage
async function setupApplication() {
const commandBus = new CommandBus()
const queryBus = new QueryBus()
const orderWriteRepo = new OrderWriteRepository()
const orderReadRepo = new OrderReadRepository()
const eventPublisher = new ConsoleEventPublisher()
// Register command handlers
commandBus.register('CreateOrderCommand', new CreateOrderHandler(orderWriteRepo, orderReadRepo, eventPublisher))
commandBus.register('CancelOrderCommand', new CancelOrderHandler(orderWriteRepo, orderReadRepo, eventPublisher))
// Register query handlers
queryBus.register('GetOrderByIdQuery', new GetOrderByIdHandler(orderReadRepo))
queryBus.register('GetOrdersByCustomerQuery', new GetOrdersByCustomerHandler(orderReadRepo))
return { commandBus, queryBus }
}
// Usage Example
async function cqrsExample() {
const { commandBus, queryBus } = await setupApplication()
// Create order
const createOrderCommand = new CreateOrderCommand('customer-123', [
{ productId: 'prod-1', productName: 'Laptop', quantity: 1, unitPrice: 999.99 },
{ productId: 'prod-2', productName: 'Mouse', quantity: 2, unitPrice: 29.99 }
])
const createResult = await commandBus.dispatch(createOrderCommand)
console.log('Create order result:', createResult)
if (createResult.success) {
const orderId = createResult.data.orderId
// Query order by ID
const getOrderQuery = new GetOrderByIdQuery(orderId)
const orderResult = await queryBus.dispatch(getOrderQuery)
console.log('Order details:', orderResult.data)
// Query orders by customer
const getCustomerOrdersQuery = new GetOrdersByCustomerQuery('customer-123')
const customerOrdersResult = await queryBus.dispatch(getCustomerOrdersQuery)
console.log('Customer orders:', customerOrdersResult.data)
// Cancel order
const cancelOrderCommand = new CancelOrderCommand(orderId, 'Customer requested cancellation')
const cancelResult = await commandBus.dispatch(cancelOrderCommand)
console.log('Cancel order result:', cancelResult)
}
}
export {
Command,
Query,
CommandBus,
QueryBus,
CommandResult,
QueryResult,
CreateOrderCommand,
CancelOrderCommand,
GetOrderByIdQuery,
GetOrdersByCustomerQuery,
cqrsExample
}
💻 Reconstruction d'État
🔴 complex
⭐⭐⭐⭐
Reconstruction d'état depuis les flux d'événements avec optimisation de snapshots
⏱️ 60 min
🏷️ state-reconstruction, snapshot, performance-optimization
Prerequisites:
Understanding of event sourcing, Knowledge of stream processing
// State Reconstruction Engine
interface Event {
id: string
aggregateId: string
type: string
data: any
timestamp: Date
version: number
}
interface Snapshot {
aggregateId: string
data: any
version: number
timestamp: Date
}
interface StateReconstructor<T> {
reconstruct(events: Event[]): T
createSnapshot(state: T, version: number): Snapshot
applySnapshot(snapshot: Snapshot, events: Event[]): T
}
// Snapshot Strategy
interface SnapshotStrategy {
shouldTakeSnapshot(aggregateId: string, version: number): boolean
getSnapshotInterval(): number
}
class IntervalSnapshotStrategy implements SnapshotStrategy {
constructor(private interval: number = 100) {}
shouldTakeSnapshot(aggregateId: string, version: number): boolean {
return version % this.interval === 0
}
getSnapshotInterval(): number {
return this.interval
}
}
class EventCountSnapshotStrategy implements SnapshotStrategy {
constructor(private eventCountThreshold: number = 50) {}
shouldTakeSnapshot(aggregateId: string, version: number): boolean {
// This would need to track event count since last snapshot
return version % this.eventCountThreshold === 0
}
getSnapshotInterval(): number {
return this.eventCountThreshold
}
}
// Snapshot Store
class SnapshotStore {
private snapshots: Map<string, Snapshot[]> = new Map()
async saveSnapshot(snapshot: Snapshot): Promise<void> {
const aggregateSnapshots = this.snapshots.get(snapshot.aggregateId) || []
// Remove older snapshots to save space (keep only the latest)
const filteredSnapshots = aggregateSnapshots.filter(s => s.version < snapshot.version)
filteredSnapshots.push(snapshot)
this.snapshots.set(snapshot.aggregateId, filteredSnapshots)
}
async getLatestSnapshot(aggregateId: string): Promise<Snapshot | null> {
const aggregateSnapshots = this.snapshots.get(aggregateId) || []
if (aggregateSnapshots.length === 0) {
return null
}
return aggregateSnapshots.reduce((latest, current) =>
current.version > latest.version ? current : latest
)
}
async getSnapshotBeforeVersion(aggregateId: string, version: number): Promise<Snapshot | null> {
const aggregateSnapshots = this.snapshots.get(aggregateId) || []
const matchingSnapshots = aggregateSnapshots.filter(s => s.version <= version)
if (matchingSnapshots.length === 0) {
return null
}
return matchingSnapshots.reduce((latest, current) =>
current.version > latest.version ? current : latest
)
}
async deleteSnapshotsBeforeVersion(aggregateId: string, version: number): Promise<void> {
const aggregateSnapshots = this.snapshots.get(aggregateId) || []
const filteredSnapshots = aggregateSnapshots.filter(s => s.version >= version)
this.snapshots.set(aggregateId, filteredSnapshots)
}
}
// Event Store with Snapshot Integration
class EnhancedEventStore {
constructor(
private events: Map<string, Event[]> = new Map(),
private snapshotStore: SnapshotStore = new SnapshotStore(),
private snapshotStrategy: SnapshotStrategy = new IntervalSnapshotStrategy(100)
) {}
async saveEvents(aggregateId: string, events: Event[]): Promise<void> {
const existingEvents = this.events.get(aggregateId) || []
const lastVersion = existingEvents.length
const newEvents = events.map((event, index) => ({
...event,
id: crypto.randomUUID(),
timestamp: new Date(),
version: lastVersion + index + 1
}))
this.events.set(aggregateId, [...existingEvents, ...newEvents])
// Check if we should create a snapshot
const finalVersion = lastVersion + newEvents.length
if (this.snapshotStrategy.shouldTakeSnapshot(aggregateId, finalVersion)) {
// This would typically reconstruct the state and create a snapshot
console.log(`Snapshot opportunity for aggregate ${aggregateId} at version ${finalVersion}`)
}
}
async getEvents(aggregateId: string): Promise<Event[]> {
return this.events.get(aggregateId) || []
}
async getEventsFromVersion(aggregateId: string, fromVersion: number): Promise<Event[]> {
const events = this.events.get(aggregateId) || []
return events.filter(event => event.version >= fromVersion)
}
async getEventsWithSnapshot(aggregateId: string): Promise<{ events: Event[], snapshot: Snapshot | null }> {
const snapshot = await this.snapshotStore.getLatestSnapshot(aggregateId)
let events: Event[]
if (snapshot) {
events = await this.getEventsFromVersion(aggregateId, snapshot.version + 1)
} else {
events = await this.getEvents(aggregateId)
}
return { events, snapshot }
}
}
// Shopping Cart State
interface CartState {
id: string
items: CartItem[]
isActive: boolean
totalAmount: number
version: number
lastUpdated: Date
}
interface CartItem {
productId: string
productName: string
quantity: number
unitPrice: number
}
// Shopping Cart State Reconstructor
class ShoppingCartReconstructor implements StateReconstructor<CartState> {
reconstruct(events: Event[]): CartState {
let state: CartState = {
id: '',
items: [],
isActive: true,
totalAmount: 0,
version: 0,
lastUpdated: new Date()
}
for (const event of events) {
state = this.applyEvent(state, event)
}
return state
}
createSnapshot(state: CartState, version: number): Snapshot {
return {
aggregateId: state.id,
data: {
id: state.id,
items: state.items,
isActive: state.isActive,
totalAmount: state.totalAmount,
lastUpdated: state.lastUpdated
},
version,
timestamp: new Date()
}
}
applySnapshot(snapshot: Snapshot, events: Event[]): CartState {
let state: CartState = {
...snapshot.data,
version: snapshot.version
}
for (const event of events) {
state = this.applyEvent(state, event)
}
return state
}
private applyEvent(state: CartState, event: Event): CartState {
const newState = { ...state, version: event.version, lastUpdated: event.timestamp }
switch (event.type) {
case 'CART_CREATED':
return {
...newState,
id: event.aggregateId,
items: [],
isActive: true,
totalAmount: 0
}
case 'ITEM_ADDED':
const existingItem = newState.items.find(item => item.productId === event.data.productId)
let updatedItems: CartItem[]
if (existingItem) {
updatedItems = newState.items.map(item =>
item.productId === event.data.productId
? { ...item, quantity: item.quantity + event.data.quantity }
: item
)
} else {
updatedItems = [
...newState.items,
{
productId: event.data.productId,
productName: event.data.productName,
quantity: event.data.quantity,
unitPrice: event.data.unitPrice
}
]
}
return {
...newState,
items: updatedItems,
totalAmount: this.calculateTotalAmount(updatedItems)
}
case 'ITEM_REMOVED':
const filteredItems = newState.items.filter(item => item.productId !== event.data.productId)
return {
...newState,
items: filteredItems,
totalAmount: this.calculateTotalAmount(filteredItems)
}
case 'CART_CHECKED_OUT':
return {
...newState,
isActive: false
}
case 'CART_ABANDONED':
return {
...newState,
isActive: false
}
default:
return newState
}
}
private calculateTotalAmount(items: CartItem[]): number {
return items.reduce((total, item) => total + (item.quantity * item.unitPrice), 0)
}
}
// State Reconstruction Service
class StateReconstructionService {
constructor(
private eventStore: EnhancedEventStore,
private snapshotStore: SnapshotStore,
private snapshotStrategy: SnapshotStrategy
) {}
async reconstructState<T>(
aggregateId: string,
reconstructor: StateReconstructor<T>
): Promise<T> {
// Try to get the latest snapshot first
const { events, snapshot } = await this.eventStore.getEventsWithSnapshot(aggregateId)
if (snapshot) {
console.log(`Found snapshot for ${aggregateId} at version ${snapshot.version}`)
return reconstructor.applySnapshot(snapshot, events)
} else {
console.log(`No snapshot found for ${aggregateId}, reconstructing from all events`)
return reconstructor.reconstruct(events)
}
}
async reconstructStateAtVersion<T>(
aggregateId: string,
targetVersion: number,
reconstructor: StateReconstructor<T>
): Promise<T> {
// Try to find a snapshot before the target version
const snapshot = await this.snapshotStore.getSnapshotBeforeVersion(aggregateId, targetVersion)
if (snapshot) {
const events = await this.eventStore.getEventsFromVersion(aggregateId, snapshot.version + 1)
const relevantEvents = events.filter(event => event.version <= targetVersion)
return reconstructor.applySnapshot(snapshot, relevantEvents)
} else {
const allEvents = await this.eventStore.getEvents(aggregateId)
const relevantEvents = allEvents.filter(event => event.version <= targetVersion)
return reconstructor.reconstruct(relevantEvents)
}
}
async createSnapshot<T>(
aggregateId: string,
reconstructor: StateReconstructor<T>
): Promise<void> {
const events = await this.eventStore.getEvents(aggregateId)
if (events.length === 0) {
throw new Error('No events found for aggregate')
}
const state = reconstructor.reconstruct(events)
const latestEvent = events[events.length - 1]
const snapshot = reconstructor.createSnapshot(state, latestEvent.version)
await this.snapshotStore.saveSnapshot(snapshot)
console.log(`Created snapshot for ${aggregateId} at version ${latestEvent.version}`)
}
async getReconstructionMetrics(aggregateId: string): Promise<{
totalEvents: number
hasSnapshot: boolean
snapshotVersion?: number
eventsSinceSnapshot: number
}> {
const { events, snapshot } = await this.eventStore.getEventsWithSnapshot(aggregateId)
return {
totalEvents: events.length + (snapshot ? 1 : 0),
hasSnapshot: !!snapshot,
snapshotVersion: snapshot?.version,
eventsSinceSnapshot: events.length
}
}
}
// Performance Comparison Utility
class ReconstructionPerformanceTester {
async compareReconstructionMethods(
aggregateId: string,
reconstructor: StateReconstructor<any>,
eventStore: EnhancedEventStore,
reconstructionService: StateReconstructionService
): Promise<{
fullReconstructionTime: number
snapshotReconstructionTime: number
performanceGain: number
}> {
// Measure full reconstruction
const fullStart = performance.now()
const allEvents = await eventStore.getEvents(aggregateId)
await reconstructor.reconstruct(allEvents)
const fullEnd = performance.now()
const fullReconstructionTime = fullEnd - fullStart
// Measure snapshot-based reconstruction
const snapshotStart = performance.now()
await reconstructionService.reconstructState(aggregateId, reconstructor)
const snapshotEnd = performance.now()
const snapshotReconstructionTime = snapshotEnd - snapshotStart
const performanceGain = fullReconstructionTime / snapshotReconstructionTime
return {
fullReconstructionTime,
snapshotReconstructionTime,
performanceGain
}
}
}
// Usage Example
async function stateReconstructionExample() {
const snapshotStore = new SnapshotStore()
const snapshotStrategy = new IntervalSnapshotStrategy(10) // Snapshot every 10 events
const eventStore = new EnhancedEventStore(new Map(), snapshotStore, snapshotStrategy)
const reconstructor = new ShoppingCartReconstructor()
const reconstructionService = new StateReconstructionService(eventStore, snapshotStore, snapshotStrategy)
const aggregateId = 'cart-123'
// Simulate adding many events to the cart
const events: Event[] = [
{
id: crypto.randomUUID(),
aggregateId,
type: 'CART_CREATED',
data: {},
timestamp: new Date(),
version: 1
}
]
// Add 25 items to create many events
for (let i = 1; i <= 25; i++) {
events.push({
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: `prod-${i}`,
productName: `Product ${i}`,
quantity: 1,
unitPrice: 10 * i
},
timestamp: new Date(),
version: i + 1
})
}
await eventStore.saveEvents(aggregateId, events)
// Create a snapshot at current state
await reconstructionService.createSnapshot(aggregateId, reconstructor)
// Add more events after snapshot
const moreEvents: Event[] = []
for (let i = 26; i <= 35; i++) {
moreEvents.push({
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: `prod-${i}`,
productName: `Product ${i}`,
quantity: 1,
unitPrice: 10 * i
},
timestamp: new Date(),
version: i + 1
})
}
await eventStore.saveEvents(aggregateId, moreEvents)
// Get reconstruction metrics
const metrics = await reconstructionService.getReconstructionMetrics(aggregateId)
console.log('Reconstruction metrics:', metrics)
// Reconstruct state using snapshot optimization
const cartState = await reconstructionService.reconstructState(aggregateId, reconstructor)
console.log('Reconstructed cart state:', {
id: cartState.id,
itemCount: cartState.items.length,
totalAmount: cartState.totalAmount,
isActive: cartState.isActive,
version: cartState.version
})
// Performance comparison
const performanceTester = new ReconstructionPerformanceTester()
const performance = await performanceTester.compareReconstructionMethods(
aggregateId,
reconstructor,
eventStore,
reconstructionService
)
console.log('Performance comparison:', {
fullReconstruction: `${performance.fullReconstructionTime.toFixed(2)}ms`,
snapshotReconstruction: `${performance.snapshotReconstructionTime.toFixed(2)}ms`,
performanceGain: `${performance.performanceGain.toFixed(2)}x faster`
})
}
export {
Event,
Snapshot,
StateReconstructor,
SnapshotStrategy,
IntervalSnapshotStrategy,
SnapshotStore,
EnhancedEventStore,
CartState,
ShoppingCartReconstructor,
StateReconstructionService,
stateReconstructionExample
}
💻 Versionnement d'Événements
🔴 complex
⭐⭐⭐⭐⭐
Gestion de l'évolution de schéma et des stratégies de versionnement pour les systèmes event sourcing
⏱️ 75 min
🏷️ event-versioning, schema-evolution, data-migration
Prerequisites:
Understanding of event sourcing, Schema management knowledge
// Event Versioning System
interface EventSchema {
type: string
version: number
schema: any
}
interface EventTransformer {
fromVersion: number
toVersion: number
transform(event: any): any
}
interface VersionedEvent {
id: string
aggregateId: string
type: string
data: any
timestamp: Date
version: number
eventVersion: number
}
// Schema Registry
class SchemaRegistry {
private schemas: Map<string, EventSchema[]> = new Map()
private transformers: Map<string, EventTransformer[]> = new Map()
registerSchema(eventType: string, version: number, schema: any): void {
const schemas = this.schemas.get(eventType) || []
schemas.push({ type: eventType, version, schema })
this.schemas.set(eventType, schemas)
}
registerTransformer(eventType: string, transformer: EventTransformer): void {
const transformers = this.transformers.get(eventType) || []
transformers.push(transformer)
this.transformers.set(eventType, transformers)
}
getLatestVersion(eventType: string): number {
const schemas = this.schemas.get(eventType) || []
if (schemas.length === 0) {
throw new Error(`No schema found for event type: ${eventType}`)
}
return Math.max(...schemas.map(s => s.version))
}
getSchema(eventType: string, version: number): EventSchema | null {
const schemas = this.schemas.get(eventType) || []
return schemas.find(s => s.version === version) || null
}
getTransformers(eventType: string): EventTransformer[] {
return this.transformers.get(eventType) || []
}
getTransformationPath(eventType: string, fromVersion: number, toVersion: number): EventTransformer[] {
const transformers = this.getTransformers(eventType)
const path: EventTransformer[] = []
let currentVersion = fromVersion
while (currentVersion < toVersion) {
const nextTransformer = transformers.find(t => t.fromVersion === currentVersion)
if (!nextTransformer) {
throw new Error(`No transformer found from version ${currentVersion} for event type ${eventType}`)
}
path.push(nextTransformer)
currentVersion = nextTransformer.toVersion
}
return path
}
}
// Event Upgrader
class EventUpgrader {
constructor(private schemaRegistry: SchemaRegistry) {}
async upgradeEvent(event: VersionedEvent, targetVersion?: number): Promise<VersionedEvent> {
const latestVersion = targetVersion || this.schemaRegistry.getLatestVersion(event.type)
if (event.eventVersion >= latestVersion) {
return event // Event is already at or above target version
}
const transformationPath = this.schemaRegistry.getTransformationPath(
event.type,
event.eventVersion,
latestVersion
)
let upgradedEvent = { ...event }
for (const transformer of transformationPath) {
upgradedEvent.data = transformer.transform(upgradedEvent.data)
upgradedEvent.eventVersion = transformer.toVersion
}
return upgradedEvent
}
async upgradeEvents(events: VersionedEvent[]): Promise<VersionedEvent[]> {
const upgradedEvents: VersionedEvent[] = []
for (const event of events) {
try {
const upgraded = await this.upgradeEvent(event)
upgradedEvents.push(upgraded)
} catch (error) {
console.error(`Failed to upgrade event ${event.id}:`, error)
// Keep original event if upgrade fails
upgradedEvents.push(event)
}
}
return upgradedEvents
}
}
// Event Validation
class EventValidator {
constructor(private schemaRegistry: SchemaRegistry) {}
validateEvent(event: VersionedEvent): { isValid: boolean; errors: string[] } {
const schema = this.schemaRegistry.getSchema(event.type, event.eventVersion)
if (!schema) {
return {
isValid: false,
errors: [`No schema found for event type ${event.type} version ${event.eventVersion}`]
}
}
return this.validateAgainstSchema(event.data, schema.schema)
}
private validateAgainstSchema(data: any, schema: any): { isValid: boolean; errors: string[] } {
const errors: string[] = []
// Simple validation logic (in production, use a proper JSON schema validator)
if (schema.type === 'object' && schema.properties) {
for (const [key, propSchema] of Object.entries(schema.properties as any)) {
if (schema.required?.includes(key) && !(key in data)) {
errors.push(`Missing required property: ${key}`)
}
if (key in data) {
const propErrors = this.validateProperty(data[key], propSchema)
errors.push(...propErrors.map(e => `${key}: ${e}`))
}
}
}
return {
isValid: errors.length === 0,
errors
}
}
private validateProperty(value: any, schema: any): string[] {
const errors: string[] = []
if (schema.type && typeof value !== schema.type) {
errors.push(`Expected type ${schema.type}, got ${typeof value}`)
}
return errors
}
}
// Example Event Schemas
// V1: ItemAdded event with simple structure
const ItemAddedV1Schema = {
type: 'object',
properties: {
productId: { type: 'string' },
quantity: { type: 'number' },
unitPrice: { type: 'number' }
},
required: ['productId', 'quantity', 'unitPrice']
}
// V2: ItemAdded event with additional productName field
const ItemAddedV2Schema = {
type: 'object',
properties: {
productId: { type: 'string' },
productName: { type: 'string' },
quantity: { type: 'number' },
unitPrice: { type: 'number' },
discount: { type: 'number', minimum: 0, maximum: 1 }
},
required: ['productId', 'productName', 'quantity', 'unitPrice']
}
// V3: ItemAdded event with category and metadata
const ItemAddedV3Schema = {
type: 'object',
properties: {
productId: { type: 'string' },
productName: { type: 'string' },
quantity: { type: 'number' },
unitPrice: { type: 'number' },
discount: { type: 'number', minimum: 0, maximum: 1 },
category: { type: 'string' },
metadata: { type: 'object' }
},
required: ['productId', 'productName', 'quantity', 'unitPrice', 'category']
}
// Event Transformers
// V1 to V2: Add productName based on productId lookup
class ItemAddedV1ToV2Transformer implements EventTransformer {
fromVersion = 1
toVersion = 2
// In a real implementation, this would query a product catalog
private productNames: Record<string, string> = {
'prod-1': 'Laptop',
'prod-2': 'Mouse',
'prod-3': 'Keyboard'
}
transform(event: any): any {
return {
...event,
productName: this.productNames[event.productId] || 'Unknown Product',
discount: event.discount || 0
}
}
}
// V2 to V3: Add category and metadata
class ItemAddedV2ToV3Transformer implements EventTransformer {
fromVersion = 2
toVersion = 3
// Simple categorization based on product ID patterns
private getProductCategory(productId: string): string {
if (productId.includes('laptop')) return 'Electronics'
if (productId.includes('mouse') || productId.includes('keyboard')) return 'Accessories'
return 'General'
}
transform(event: any): any {
return {
...event,
category: this.getProductCategory(event.productId),
metadata: {
source: 'legacy-system',
migratedAt: new Date().toISOString()
}
}
}
}
// V2 to V4: Remove discount, add pricing breakdown
class ItemAddedV3ToV4Transformer implements EventTransformer {
fromVersion = 3
toVersion = 4
transform(event: any): any {
const discountAmount = event.unitPrice * event.quantity * (event.discount || 0)
const finalPrice = event.unitPrice - (event.unitPrice * (event.discount || 0))
return {
productId: event.productId,
productName: event.productName,
quantity: event.quantity,
unitPrice: event.unitPrice,
finalPrice,
discountAmount,
category: event.category,
metadata: {
...event.metadata,
pricing: {
originalUnitPrice: event.unitPrice,
discountPercentage: event.discount || 0,
discountAmount,
finalUnitPrice: finalPrice
}
}
}
}
}
const ItemAddedV4Schema = {
type: 'object',
properties: {
productId: { type: 'string' },
productName: { type: 'string' },
quantity: { type: 'number' },
unitPrice: { type: 'number' },
finalPrice: { type: 'number' },
discountAmount: { type: 'number' },
category: { type: 'string' },
metadata: {
type: 'object',
properties: {
source: { type: 'string' },
migratedAt: { type: 'string' },
pricing: {
type: 'object',
properties: {
originalUnitPrice: { type: 'number' },
discountPercentage: { type: 'number' },
discountAmount: { type: 'number' },
finalUnitPrice: { type: 'number' }
}
}
}
}
},
required: ['productId', 'productName', 'quantity', 'unitPrice', 'finalPrice', 'category']
}
// Versioned Event Store
class VersionedEventStore {
private events: Map<string, VersionedEvent[]> = new Map()
constructor(
private schemaRegistry: SchemaRegistry,
private eventUpgrader: EventUpgrader,
private eventValidator: EventValidator
) {}
async saveEvent(event: VersionedEvent): Promise<void> {
// Validate event against its schema
const validation = this.eventValidator.validateEvent(event)
if (!validation.isValid) {
throw new Error(`Event validation failed: ${validation.errors.join(', ')}`)
}
const aggregateEvents = this.events.get(event.aggregateId) || []
aggregateEvents.push(event)
this.events.set(event.aggregateId, aggregateEvents)
}
async getEvents(aggregateId: string, targetVersion?: number): Promise<VersionedEvent[]> {
const events = this.events.get(aggregateId) || []
if (!targetVersion) {
return events
}
// Upgrade events to target version
const upgradedEvents: VersionedEvent[] = []
for (const event of events) {
const upgraded = await this.eventUpgrader.upgradeEvent(event, targetVersion)
upgradedEvents.push(upgraded)
}
return upgradedEvents
}
async getLatestEvents(aggregateId: string): Promise<VersionedEvent[]> {
const events = this.events.get(aggregateId) || []
const latestVersion = Math.max(...events.map(e => this.schemaRegistry.getLatestVersion(e.type)))
return this.getEvents(aggregateId, latestVersion)
}
async migrateEvents(aggregateId: string): Promise<{
totalEvents: number
upgradedEvents: number
errors: string[]
}> {
const events = this.events.get(aggregateId) || []
const errors: string[] = []
let upgradedCount = 0
const upgradedEvents = await this.eventUpgrader.upgradeEvents(events)
// Validate upgraded events
for (const event of upgradedEvents) {
const validation = this.eventValidator.validateEvent(event)
if (!validation.isValid) {
errors.push(`Event ${event.id}: ${validation.errors.join(', ')}`)
} else if (event.eventVersion !== this.schemaRegistry.getLatestVersion(event.type)) {
upgradedCount++
}
}
// Store upgraded events
this.events.set(aggregateId, upgradedEvents)
return {
totalEvents: events.length,
upgradedEvents: upgradedCount,
errors
}
}
}
// Event Versioning Analytics
class EventVersioningAnalytics {
constructor(
private eventStore: VersionedEventStore,
private schemaRegistry: SchemaRegistry
) {}
async getVersionDistribution(aggregateId?: string): Promise<Array<{
eventType: string
versionDistribution: Record<number, number>
totalEvents: number
}>> {
// This would analyze all events in the store
// For demo purposes, return sample data
return [
{
eventType: 'ITEM_ADDED',
versionDistribution: {
1: 10,
2: 25,
3: 15
},
totalEvents: 50
},
{
eventType: 'ITEM_REMOVED',
versionDistribution: {
1: 5,
2: 8
},
totalEvents: 13
}
]
}
async getMigrationReport(aggregateId: string): Promise<{
eventsMigrated: number
eventsWithErrors: number
errorDetails: string[]
}> {
const migration = await this.eventStore.migrateEvents(aggregateId)
return {
eventsMigrated: migration.upgradedEvents,
eventsWithErrors: migration.errors.length,
errorDetails: migration.errors
}
}
async getSchemaEvolutionTimeline(eventType: string): Promise<Array<{
version: number
introducedAt: Date
changes: string[]
}>> {
// This would track when each schema version was introduced
return [
{
version: 1,
introducedAt: new Date('2023-01-01'),
changes: ['Initial version', 'Basic item data']
},
{
version: 2,
introducedAt: new Date('2023-06-01'),
changes: ['Added productName field', 'Added optional discount']
},
{
version: 3,
introducedAt: new Date('2024-01-01'),
changes: ['Added category field', 'Added metadata object']
}
]
}
}
// Setup and Usage Example
async function setupEventVersioning() {
const schemaRegistry = new SchemaRegistry()
// Register schemas
schemaRegistry.registerSchema('ITEM_ADDED', 1, ItemAddedV1Schema)
schemaRegistry.registerSchema('ITEM_ADDED', 2, ItemAddedV2Schema)
schemaRegistry.registerSchema('ITEM_ADDED', 3, ItemAddedV3Schema)
schemaRegistry.registerSchema('ITEM_ADDED', 4, ItemAddedV4Schema)
// Register transformers
schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV1ToV2Transformer())
schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV2ToV3Transformer())
schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV3ToV4Transformer())
const eventUpgrader = new EventUpgrader(schemaRegistry)
const eventValidator = new EventValidator(schemaRegistry)
const eventStore = new VersionedEventStore(schemaRegistry, eventUpgrader, eventValidator)
const analytics = new EventVersioningAnalytics(eventStore, schemaRegistry)
return { eventStore, analytics, schemaRegistry }
}
async function eventVersioningExample() {
const { eventStore, analytics, schemaRegistry } = await setupEventVersioning()
// Create sample events with different versions
const aggregateId = 'cart-123'
const events: VersionedEvent[] = [
{
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: 'prod-1',
quantity: 1,
unitPrice: 999.99
},
timestamp: new Date('2023-05-01'),
version: 1,
eventVersion: 1 // V1 schema
},
{
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: 'prod-2',
productName: 'Wireless Mouse',
quantity: 2,
unitPrice: 29.99,
discount: 0.1
},
timestamp: new Date('2023-07-01'),
version: 2,
eventVersion: 2 // V2 schema
},
{
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: 'prod-3',
productName: 'Mechanical Keyboard',
quantity: 1,
unitPrice: 149.99,
discount: 0.05,
category: 'Accessories',
metadata: {
source: 'web-app',
addedBy: 'user-456'
}
},
timestamp: new Date('2024-02-01'),
version: 3,
eventVersion: 3 // V3 schema
}
]
// Save events (validation will occur)
for (const event of events) {
try {
await eventStore.saveEvent(event)
console.log(`Saved event ${event.id} with version ${event.eventVersion}`)
} catch (error) {
console.error(`Failed to save event ${event.id}:`, error)
}
}
// Get events in their original versions
console.log('\n=== Original Events ===')
const originalEvents = await eventStore.getEvents(aggregateId)
originalEvents.forEach(event => {
console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
})
// Get events upgraded to latest version
console.log('\n=== Upgraded Events (Latest Version) ===')
const latestEvents = await eventStore.getLatestEvents(aggregateId)
latestEvents.forEach(event => {
console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
})
// Get events upgraded to specific version
console.log('\n=== Events Upgraded to V2 ===')
const v2Events = await eventStore.getEvents(aggregateId, 2)
v2Events.forEach(event => {
console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
})
// Migration report
console.log('\n=== Migration Report ===')
const migrationReport = await analytics.getMigrationReport(aggregateId)
console.log('Migration report:', migrationReport)
// Version distribution
console.log('\n=== Version Distribution ===')
const versionDistribution = await analytics.getVersionDistribution()
versionDistribution.forEach(dist => {
console.log(`Event type ${dist.eventType}:`, dist.versionDistribution)
})
// Schema evolution timeline
console.log('\n=== Schema Evolution Timeline ===')
const timeline = await analytics.getSchemaEvolutionTimeline('ITEM_ADDED')
timeline.forEach(entry => {
console.log(`Version ${entry.version} (${entry.introducedAt.toISOString().split('T')[0]}):`, entry.changes)
})
}
export {
VersionedEvent,
EventSchema,
EventTransformer,
SchemaRegistry,
EventUpgrader,
EventValidator,
VersionedEventStore,
EventVersioningAnalytics,
ItemAddedV1ToV2Transformer,
ItemAddedV2ToV3Transformer,
ItemAddedV3ToV4Transformer,
setupEventVersioning,
eventVersioningExample
}