🎯 Рекомендуемые коллекции
Балансированные коллекции примеров кода из различных категорий, которые вы можете исследовать
Примеры Event Sourcing
Комплексные паттерны event sourcing, включая хранение событий, архитектуру CQRS, восстановление состояния и управление версиями событий
💻 Паттерн Хранения Событий
🟡 intermediate
⭐⭐⭐
Базовая реализация хранения событий с event store и корнем агрегата
⏱️ 30 min
🏷️ event-storage, aggregate, domain-events
Prerequisites:
Understanding of Domain-Driven Design, Basic knowledge of event patterns
// Event Store Implementation
interface Event {
id: string
aggregateId: string
type: string
data: any
timestamp: Date
version: number
}
class EventStore {
private events: Map<string, Event[]> = new Map()
async saveEvents(aggregateId: string, events: Event[]): Promise<void> {
const existingEvents = this.events.get(aggregateId) || []
const newEvents = events.map((event, index) => ({
...event,
id: crypto.randomUUID(),
timestamp: new Date(),
version: existingEvents.length + index + 1
}))
this.events.set(aggregateId, [...existingEvents, ...newEvents])
}
async getEvents(aggregateId: string): Promise<Event[]> {
return this.events.get(aggregateId) || []
}
async getEventsFromVersion(aggregateId: string, fromVersion: number): Promise<Event[]> {
const events = this.events.get(aggregateId) || []
return events.filter(event => event.version >= fromVersion)
}
}
// Aggregate Root
abstract class AggregateRoot {
private _id: string
private _version: number = 0
private _uncommittedEvents: Event[] = []
constructor(id: string) {
this._id = id
}
get id(): string {
return this._id
}
get version(): number {
return this._version
}
protected apply(event: Event): void {
this.applyChange(event, true)
this._uncommittedEvents.push(event)
}
private applyChange(event: Event, isNew: boolean): void {
const handler = this.getEventHandler(event.type)
if (handler) {
handler.call(this, event.data)
}
if (isNew) {
this._version = event.version
}
}
protected abstract getEventHandler(eventType: string): Function | null
async markChangesAsCommitted(): Promise<void> {
this._uncommittedEvents = []
}
getUncommittedEvents(): Event[] {
return [...this._uncommittedEvents]
}
async loadFromHistory(events: Event[]): Promise<void> {
for (const event of events) {
this.applyChange(event, false)
this._version = event.version
}
}
}
// Shopping Cart Example
interface CartItem {
productId: string
productName: string
quantity: number
unitPrice: number
}
interface CartEventData {
productId?: string
productName?: string
quantity?: number
unitPrice?: number
items?: CartItem[]
}
class ShoppingCart extends AggregateRoot {
private items: CartItem[] = []
private isActive: boolean = true
constructor(id: string) {
super(id)
}
addItem(productId: string, productName: string, quantity: number, unitPrice: number): void {
if (!this.isActive) {
throw new Error('Cannot add items to inactive cart')
}
const event: Event = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ITEM_ADDED',
data: { productId, productName, quantity, unitPrice },
timestamp: new Date(),
version: this.version + 1
}
this.apply(event)
}
removeItem(productId: string): void {
const item = this.items.find(item => item.productId === productId)
if (!item) {
throw new Error('Item not found in cart')
}
const event: Event = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ITEM_REMOVED',
data: { productId },
timestamp: new Date(),
version: this.version + 1
}
this.apply(event)
}
checkout(): void {
if (this.items.length === 0) {
throw new Error('Cannot checkout empty cart')
}
const event: Event = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'CART_CHECKED_OUT',
data: { items: [...this.items] },
timestamp: new Date(),
version: this.version + 1
}
this.apply(event)
}
protected getEventHandler(eventType: string): Function | null {
const handlers: Record<string, Function> = {
'ITEM_ADDED': this.handleItemAdded.bind(this),
'ITEM_REMOVED': this.handleItemRemoved.bind(this),
'CART_CHECKED_OUT': this.handleCartCheckedOut.bind(this)
}
return handlers[eventType] || null
}
private handleItemAdded(data: CartEventData): void {
const existingItem = this.items.find(item => item.productId === data.productId)
if (existingItem) {
existingItem.quantity += data.quantity!
} else {
this.items.push({
productId: data.productId!,
productName: data.productName!,
quantity: data.quantity!,
unitPrice: data.unitPrice!
})
}
}
private handleItemRemoved(data: CartEventData): void {
this.items = this.items.filter(item => item.productId !== data.productId)
}
private handleCartCheckedOut(): void {
this.isActive = false
}
getItems(): CartItem[] {
return [...this.items]
}
getTotalPrice(): number {
return this.items.reduce((total, item) => total + (item.quantity * item.unitPrice), 0)
}
isCartActive(): boolean {
return this.isActive
}
}
// Usage Example
async function example() {
const eventStore = new EventStore()
const cart = new ShoppingCart('cart-123')
// Add items to cart
cart.addItem('prod-1', 'Laptop', 1, 999.99)
cart.addItem('prod-2', 'Mouse', 2, 29.99)
// Save uncommitted events
await eventStore.saveEvents(cart.id, cart.getUncommittedEvents())
await cart.markChangesAsCommitted()
console.log('Cart items:', cart.getItems())
console.log('Total price:', cart.getTotalPrice())
// Checkout
cart.checkout()
await eventStore.saveEvents(cart.id, cart.getUncommittedEvents())
await cart.markChangesAsCommitted()
console.log('Cart is active:', cart.isCartActive())
}
export { Event, EventStore, AggregateRoot, ShoppingCart, example }
💻 Архитектура CQRS
🔴 complex
⭐⭐⭐⭐
Реализация Command Query Responsibility Segregation с разделенными моделями чтения и записи
⏱️ 45 min
🏷️ cqrs, command-query-separation, read-write-separation
Prerequisites:
Understanding of event sourcing, Domain-Driven Design knowledge
// CQRS Base Classes
interface Command {
id: string
aggregateId: string
timestamp: Date
}
interface Query {
id: string
parameters: Record<string, any>
}
interface CommandResult {
success: boolean
message?: string
data?: any
}
interface QueryResult<T = any> {
success: boolean
data?: T
error?: string
}
// Command Handler Interface
interface ICommandHandler<T extends Command> {
handle(command: T): Promise<CommandResult>
}
// Query Handler Interface
interface IQueryHandler<T extends Query, R> {
handle(query: T): Promise<QueryResult<R>>
}
// Command Bus
class CommandBus {
private handlers: Map<string, ICommandHandler<any>> = new Map()
register<T extends Command>(commandType: string, handler: ICommandHandler<T>): void {
this.handlers.set(commandType, handler)
}
async dispatch<T extends Command>(command: T): Promise<CommandResult> {
const handler = this.handlers.get(command.constructor.name)
if (!handler) {
return {
success: false,
message: `No handler found for command: ${command.constructor.name}`
}
}
try {
return await handler.handle(command)
} catch (error) {
return {
success: false,
message: error instanceof Error ? error.message : 'Unknown error'
}
}
}
}
// Query Bus
class QueryBus {
private handlers: Map<string, IQueryHandler<any, any>> = new Map()
register<T extends Query, R>(queryType: string, handler: IQueryHandler<T, R>): void {
this.handlers.set(queryType, handler)
}
async dispatch<T extends Query, R>(query: T): Promise<QueryResult<R>> {
const handler = this.handlers.get(query.constructor.name)
if (!handler) {
return {
success: false,
error: `No handler found for query: ${query.constructor.name}`
}
}
try {
return await handler.handle(query)
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
}
}
}
}
// Domain Events
interface DomainEvent {
id: string
aggregateId: string
type: string
data: any
timestamp: Date
version: number
}
// Event Publisher
interface EventPublisher {
publish(events: DomainEvent[]): Promise<void>
}
// Console Event Publisher (for demo)
class ConsoleEventPublisher implements EventPublisher {
async publish(events: DomainEvent[]): Promise<void> {
console.log('Publishing events:', events.length)
for (const event of events) {
console.log(`Event: ${event.type}, Aggregate: ${event.aggregateId}`)
}
}
}
// Order Management Commands
class CreateOrderCommand implements Command {
id: string
aggregateId: string
timestamp: Date
constructor(
public customerId: string,
public items: OrderItem[]
) {
this.id = crypto.randomUUID()
this.aggregateId = crypto.randomUUID()
this.timestamp = new Date()
}
}
class UpdateOrderCommand implements Command {
id: string
aggregateId: string
timestamp: Date
constructor(
public orderId: string,
public items: OrderItem[]
) {
this.id = crypto.randomUUID()
this.aggregateId = orderId
this.timestamp = new Date()
}
}
class CancelOrderCommand implements Command {
id: string
aggregateId: string
timestamp: Date
constructor(
public orderId: string,
public reason: string
) {
this.id = crypto.randomUUID()
this.aggregateId = orderId
this.timestamp = new Date()
}
}
// Order Management Queries
class GetOrderByIdQuery implements Query {
id: string
parameters: Record<string, any>
constructor(public orderId: string) {
this.id = crypto.randomUUID()
this.parameters = { orderId }
}
}
class GetOrdersByCustomerQuery implements Query {
id: string
parameters: Record<string, any>
constructor(public customerId: string) {
this.id = crypto.randomUUID()
this.parameters = { customerId }
}
}
interface OrderItem {
productId: string
productName: string
quantity: number
unitPrice: number
}
// Write Model - Order Aggregate
class Order {
private events: DomainEvent[] = []
constructor(
public readonly id: string,
public readonly customerId: string,
public items: OrderItem[],
public status: 'PENDING' | 'CONFIRMED' | 'CANCELLED' | 'COMPLETED' = 'PENDING',
public createdAt: Date = new Date(),
public updatedAt: Date = new Date()
) {}
updateItems(newItems: OrderItem[]): void {
if (this.status !== 'PENDING') {
throw new Error('Cannot update order in current status')
}
const event: DomainEvent = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ORDER_ITEMS_UPDATED',
data: { oldItems: this.items, newItems },
timestamp: new Date(),
version: 1
}
this.items = newItems
this.updatedAt = new Date()
this.events.push(event)
}
cancel(reason: string): void {
if (this.status === 'COMPLETED') {
throw new Error('Cannot complete order')
}
const event: DomainEvent = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ORDER_CANCELLED',
data: { reason },
timestamp: new Date(),
version: 1
}
this.status = 'CANCELLED'
this.updatedAt = new Date()
this.events.push(event)
}
confirm(): void {
if (this.status !== 'PENDING') {
throw new Error('Order must be in PENDING status to be confirmed')
}
const event: DomainEvent = {
id: crypto.randomUUID(),
aggregateId: this.id,
type: 'ORDER_CONFIRMED',
data: { confirmedAt: new Date() },
timestamp: new Date(),
version: 1
}
this.status = 'CONFIRMED'
this.updatedAt = new Date()
this.events.push(event)
}
getUncommittedEvents(): DomainEvent[] {
return [...this.events]
}
markEventsAsCommitted(): void {
this.events = []
}
}
// Read Model - Order Summary
interface OrderSummary {
id: string
customerId: string
itemCount: number
totalAmount: number
status: string
createdAt: Date
updatedAt: Date
}
// Read Model Repository
class OrderReadRepository {
private orders: Map<string, OrderSummary> = new Map()
async save(orderSummary: OrderSummary): Promise<void> {
this.orders.set(orderSummary.id, orderSummary)
}
async findById(orderId: string): Promise<OrderSummary | null> {
return this.orders.get(orderId) || null
}
async findByCustomerId(customerId: string): Promise<OrderSummary[]> {
return Array.from(this.orders.values()).filter(order => order.customerId === customerId)
}
}
// Write Model Repository
class OrderWriteRepository {
private orders: Map<string, Order> = new Map()
async save(order: Order): Promise<void> {
this.orders.set(order.id, order)
}
async findById(orderId: string): Promise<Order | null> {
return this.orders.get(orderId) || null
}
}
// Command Handlers
class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private orderRepository: OrderWriteRepository,
private readRepository: OrderReadRepository,
private eventPublisher: EventPublisher
) {}
async handle(command: CreateOrderCommand): Promise<CommandResult> {
const order = new Order(command.aggregateId, command.customerId, command.items)
await this.orderRepository.save(order)
// Create read model
const orderSummary: OrderSummary = {
id: order.id,
customerId: order.customerId,
itemCount: order.items.length,
totalAmount: order.items.reduce((sum, item) => sum + (item.quantity * item.unitPrice), 0),
status: order.status,
createdAt: order.createdAt,
updatedAt: order.updatedAt
}
await this.readRepository.save(orderSummary)
// Publish events
await this.eventPublisher.publish(order.getUncommittedEvents())
order.markEventsAsCommitted()
return {
success: true,
message: 'Order created successfully',
data: { orderId: order.id }
}
}
}
class CancelOrderHandler implements ICommandHandler<CancelOrderCommand> {
constructor(
private orderRepository: OrderWriteRepository,
private readRepository: OrderReadRepository,
private eventPublisher: EventPublisher
) {}
async handle(command: CancelOrderCommand): Promise<CommandResult> {
const order = await this.orderRepository.findById(command.orderId)
if (!order) {
return {
success: false,
message: 'Order not found'
}
}
try {
order.cancel(command.reason)
await this.orderRepository.save(order)
// Update read model
const orderSummary = await this.readRepository.findById(command.orderId)
if (orderSummary) {
orderSummary.status = order.status
orderSummary.updatedAt = order.updatedAt
await this.readRepository.save(orderSummary)
}
// Publish events
await this.eventPublisher.publish(order.getUncommittedEvents())
order.markEventsAsCommitted()
return {
success: true,
message: 'Order cancelled successfully'
}
} catch (error) {
return {
success: false,
message: error instanceof Error ? error.message : 'Failed to cancel order'
}
}
}
}
// Query Handlers
class GetOrderByIdHandler implements IQueryHandler<GetOrderByIdQuery, OrderSummary | null> {
constructor(private readRepository: OrderReadRepository) {}
async handle(query: GetOrderByIdQuery): Promise<QueryResult<OrderSummary | null>> {
try {
const order = await this.readRepository.findById(query.orderId)
return {
success: true,
data: order
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to get order'
}
}
}
}
class GetOrdersByCustomerHandler implements IQueryHandler<GetOrdersByCustomerQuery, OrderSummary[]> {
constructor(private readRepository: OrderReadRepository) {}
async handle(query: GetOrdersByCustomerQuery): Promise<QueryResult<OrderSummary[]>> {
try {
const orders = await this.readRepository.findByCustomerId(query.customerId)
return {
success: true,
data: orders
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to get orders'
}
}
}
}
// Application Setup and Usage
async function setupApplication() {
const commandBus = new CommandBus()
const queryBus = new QueryBus()
const orderWriteRepo = new OrderWriteRepository()
const orderReadRepo = new OrderReadRepository()
const eventPublisher = new ConsoleEventPublisher()
// Register command handlers
commandBus.register('CreateOrderCommand', new CreateOrderHandler(orderWriteRepo, orderReadRepo, eventPublisher))
commandBus.register('CancelOrderCommand', new CancelOrderHandler(orderWriteRepo, orderReadRepo, eventPublisher))
// Register query handlers
queryBus.register('GetOrderByIdQuery', new GetOrderByIdHandler(orderReadRepo))
queryBus.register('GetOrdersByCustomerQuery', new GetOrdersByCustomerHandler(orderReadRepo))
return { commandBus, queryBus }
}
// Usage Example
async function cqrsExample() {
const { commandBus, queryBus } = await setupApplication()
// Create order
const createOrderCommand = new CreateOrderCommand('customer-123', [
{ productId: 'prod-1', productName: 'Laptop', quantity: 1, unitPrice: 999.99 },
{ productId: 'prod-2', productName: 'Mouse', quantity: 2, unitPrice: 29.99 }
])
const createResult = await commandBus.dispatch(createOrderCommand)
console.log('Create order result:', createResult)
if (createResult.success) {
const orderId = createResult.data.orderId
// Query order by ID
const getOrderQuery = new GetOrderByIdQuery(orderId)
const orderResult = await queryBus.dispatch(getOrderQuery)
console.log('Order details:', orderResult.data)
// Query orders by customer
const getCustomerOrdersQuery = new GetOrdersByCustomerQuery('customer-123')
const customerOrdersResult = await queryBus.dispatch(getCustomerOrdersQuery)
console.log('Customer orders:', customerOrdersResult.data)
// Cancel order
const cancelOrderCommand = new CancelOrderCommand(orderId, 'Customer requested cancellation')
const cancelResult = await commandBus.dispatch(cancelOrderCommand)
console.log('Cancel order result:', cancelResult)
}
}
export {
Command,
Query,
CommandBus,
QueryBus,
CommandResult,
QueryResult,
CreateOrderCommand,
CancelOrderCommand,
GetOrderByIdQuery,
GetOrdersByCustomerQuery,
cqrsExample
}
💻 Восстановление Состояния
🔴 complex
⭐⭐⭐⭐
Восстановление состояния из потоков событий с оптимизацией снимков
⏱️ 60 min
🏷️ state-reconstruction, snapshot, performance-optimization
Prerequisites:
Understanding of event sourcing, Knowledge of stream processing
// State Reconstruction Engine
interface Event {
id: string
aggregateId: string
type: string
data: any
timestamp: Date
version: number
}
interface Snapshot {
aggregateId: string
data: any
version: number
timestamp: Date
}
interface StateReconstructor<T> {
reconstruct(events: Event[]): T
createSnapshot(state: T, version: number): Snapshot
applySnapshot(snapshot: Snapshot, events: Event[]): T
}
// Snapshot Strategy
interface SnapshotStrategy {
shouldTakeSnapshot(aggregateId: string, version: number): boolean
getSnapshotInterval(): number
}
class IntervalSnapshotStrategy implements SnapshotStrategy {
constructor(private interval: number = 100) {}
shouldTakeSnapshot(aggregateId: string, version: number): boolean {
return version % this.interval === 0
}
getSnapshotInterval(): number {
return this.interval
}
}
class EventCountSnapshotStrategy implements SnapshotStrategy {
constructor(private eventCountThreshold: number = 50) {}
shouldTakeSnapshot(aggregateId: string, version: number): boolean {
// This would need to track event count since last snapshot
return version % this.eventCountThreshold === 0
}
getSnapshotInterval(): number {
return this.eventCountThreshold
}
}
// Snapshot Store
class SnapshotStore {
private snapshots: Map<string, Snapshot[]> = new Map()
async saveSnapshot(snapshot: Snapshot): Promise<void> {
const aggregateSnapshots = this.snapshots.get(snapshot.aggregateId) || []
// Remove older snapshots to save space (keep only the latest)
const filteredSnapshots = aggregateSnapshots.filter(s => s.version < snapshot.version)
filteredSnapshots.push(snapshot)
this.snapshots.set(snapshot.aggregateId, filteredSnapshots)
}
async getLatestSnapshot(aggregateId: string): Promise<Snapshot | null> {
const aggregateSnapshots = this.snapshots.get(aggregateId) || []
if (aggregateSnapshots.length === 0) {
return null
}
return aggregateSnapshots.reduce((latest, current) =>
current.version > latest.version ? current : latest
)
}
async getSnapshotBeforeVersion(aggregateId: string, version: number): Promise<Snapshot | null> {
const aggregateSnapshots = this.snapshots.get(aggregateId) || []
const matchingSnapshots = aggregateSnapshots.filter(s => s.version <= version)
if (matchingSnapshots.length === 0) {
return null
}
return matchingSnapshots.reduce((latest, current) =>
current.version > latest.version ? current : latest
)
}
async deleteSnapshotsBeforeVersion(aggregateId: string, version: number): Promise<void> {
const aggregateSnapshots = this.snapshots.get(aggregateId) || []
const filteredSnapshots = aggregateSnapshots.filter(s => s.version >= version)
this.snapshots.set(aggregateId, filteredSnapshots)
}
}
// Event Store with Snapshot Integration
class EnhancedEventStore {
constructor(
private events: Map<string, Event[]> = new Map(),
private snapshotStore: SnapshotStore = new SnapshotStore(),
private snapshotStrategy: SnapshotStrategy = new IntervalSnapshotStrategy(100)
) {}
async saveEvents(aggregateId: string, events: Event[]): Promise<void> {
const existingEvents = this.events.get(aggregateId) || []
const lastVersion = existingEvents.length
const newEvents = events.map((event, index) => ({
...event,
id: crypto.randomUUID(),
timestamp: new Date(),
version: lastVersion + index + 1
}))
this.events.set(aggregateId, [...existingEvents, ...newEvents])
// Check if we should create a snapshot
const finalVersion = lastVersion + newEvents.length
if (this.snapshotStrategy.shouldTakeSnapshot(aggregateId, finalVersion)) {
// This would typically reconstruct the state and create a snapshot
console.log(`Snapshot opportunity for aggregate ${aggregateId} at version ${finalVersion}`)
}
}
async getEvents(aggregateId: string): Promise<Event[]> {
return this.events.get(aggregateId) || []
}
async getEventsFromVersion(aggregateId: string, fromVersion: number): Promise<Event[]> {
const events = this.events.get(aggregateId) || []
return events.filter(event => event.version >= fromVersion)
}
async getEventsWithSnapshot(aggregateId: string): Promise<{ events: Event[], snapshot: Snapshot | null }> {
const snapshot = await this.snapshotStore.getLatestSnapshot(aggregateId)
let events: Event[]
if (snapshot) {
events = await this.getEventsFromVersion(aggregateId, snapshot.version + 1)
} else {
events = await this.getEvents(aggregateId)
}
return { events, snapshot }
}
}
// Shopping Cart State
interface CartState {
id: string
items: CartItem[]
isActive: boolean
totalAmount: number
version: number
lastUpdated: Date
}
interface CartItem {
productId: string
productName: string
quantity: number
unitPrice: number
}
// Shopping Cart State Reconstructor
class ShoppingCartReconstructor implements StateReconstructor<CartState> {
reconstruct(events: Event[]): CartState {
let state: CartState = {
id: '',
items: [],
isActive: true,
totalAmount: 0,
version: 0,
lastUpdated: new Date()
}
for (const event of events) {
state = this.applyEvent(state, event)
}
return state
}
createSnapshot(state: CartState, version: number): Snapshot {
return {
aggregateId: state.id,
data: {
id: state.id,
items: state.items,
isActive: state.isActive,
totalAmount: state.totalAmount,
lastUpdated: state.lastUpdated
},
version,
timestamp: new Date()
}
}
applySnapshot(snapshot: Snapshot, events: Event[]): CartState {
let state: CartState = {
...snapshot.data,
version: snapshot.version
}
for (const event of events) {
state = this.applyEvent(state, event)
}
return state
}
private applyEvent(state: CartState, event: Event): CartState {
const newState = { ...state, version: event.version, lastUpdated: event.timestamp }
switch (event.type) {
case 'CART_CREATED':
return {
...newState,
id: event.aggregateId,
items: [],
isActive: true,
totalAmount: 0
}
case 'ITEM_ADDED':
const existingItem = newState.items.find(item => item.productId === event.data.productId)
let updatedItems: CartItem[]
if (existingItem) {
updatedItems = newState.items.map(item =>
item.productId === event.data.productId
? { ...item, quantity: item.quantity + event.data.quantity }
: item
)
} else {
updatedItems = [
...newState.items,
{
productId: event.data.productId,
productName: event.data.productName,
quantity: event.data.quantity,
unitPrice: event.data.unitPrice
}
]
}
return {
...newState,
items: updatedItems,
totalAmount: this.calculateTotalAmount(updatedItems)
}
case 'ITEM_REMOVED':
const filteredItems = newState.items.filter(item => item.productId !== event.data.productId)
return {
...newState,
items: filteredItems,
totalAmount: this.calculateTotalAmount(filteredItems)
}
case 'CART_CHECKED_OUT':
return {
...newState,
isActive: false
}
case 'CART_ABANDONED':
return {
...newState,
isActive: false
}
default:
return newState
}
}
private calculateTotalAmount(items: CartItem[]): number {
return items.reduce((total, item) => total + (item.quantity * item.unitPrice), 0)
}
}
// State Reconstruction Service
class StateReconstructionService {
constructor(
private eventStore: EnhancedEventStore,
private snapshotStore: SnapshotStore,
private snapshotStrategy: SnapshotStrategy
) {}
async reconstructState<T>(
aggregateId: string,
reconstructor: StateReconstructor<T>
): Promise<T> {
// Try to get the latest snapshot first
const { events, snapshot } = await this.eventStore.getEventsWithSnapshot(aggregateId)
if (snapshot) {
console.log(`Found snapshot for ${aggregateId} at version ${snapshot.version}`)
return reconstructor.applySnapshot(snapshot, events)
} else {
console.log(`No snapshot found for ${aggregateId}, reconstructing from all events`)
return reconstructor.reconstruct(events)
}
}
async reconstructStateAtVersion<T>(
aggregateId: string,
targetVersion: number,
reconstructor: StateReconstructor<T>
): Promise<T> {
// Try to find a snapshot before the target version
const snapshot = await this.snapshotStore.getSnapshotBeforeVersion(aggregateId, targetVersion)
if (snapshot) {
const events = await this.eventStore.getEventsFromVersion(aggregateId, snapshot.version + 1)
const relevantEvents = events.filter(event => event.version <= targetVersion)
return reconstructor.applySnapshot(snapshot, relevantEvents)
} else {
const allEvents = await this.eventStore.getEvents(aggregateId)
const relevantEvents = allEvents.filter(event => event.version <= targetVersion)
return reconstructor.reconstruct(relevantEvents)
}
}
async createSnapshot<T>(
aggregateId: string,
reconstructor: StateReconstructor<T>
): Promise<void> {
const events = await this.eventStore.getEvents(aggregateId)
if (events.length === 0) {
throw new Error('No events found for aggregate')
}
const state = reconstructor.reconstruct(events)
const latestEvent = events[events.length - 1]
const snapshot = reconstructor.createSnapshot(state, latestEvent.version)
await this.snapshotStore.saveSnapshot(snapshot)
console.log(`Created snapshot for ${aggregateId} at version ${latestEvent.version}`)
}
async getReconstructionMetrics(aggregateId: string): Promise<{
totalEvents: number
hasSnapshot: boolean
snapshotVersion?: number
eventsSinceSnapshot: number
}> {
const { events, snapshot } = await this.eventStore.getEventsWithSnapshot(aggregateId)
return {
totalEvents: events.length + (snapshot ? 1 : 0),
hasSnapshot: !!snapshot,
snapshotVersion: snapshot?.version,
eventsSinceSnapshot: events.length
}
}
}
// Performance Comparison Utility
class ReconstructionPerformanceTester {
async compareReconstructionMethods(
aggregateId: string,
reconstructor: StateReconstructor<any>,
eventStore: EnhancedEventStore,
reconstructionService: StateReconstructionService
): Promise<{
fullReconstructionTime: number
snapshotReconstructionTime: number
performanceGain: number
}> {
// Measure full reconstruction
const fullStart = performance.now()
const allEvents = await eventStore.getEvents(aggregateId)
await reconstructor.reconstruct(allEvents)
const fullEnd = performance.now()
const fullReconstructionTime = fullEnd - fullStart
// Measure snapshot-based reconstruction
const snapshotStart = performance.now()
await reconstructionService.reconstructState(aggregateId, reconstructor)
const snapshotEnd = performance.now()
const snapshotReconstructionTime = snapshotEnd - snapshotStart
const performanceGain = fullReconstructionTime / snapshotReconstructionTime
return {
fullReconstructionTime,
snapshotReconstructionTime,
performanceGain
}
}
}
// Usage Example
async function stateReconstructionExample() {
const snapshotStore = new SnapshotStore()
const snapshotStrategy = new IntervalSnapshotStrategy(10) // Snapshot every 10 events
const eventStore = new EnhancedEventStore(new Map(), snapshotStore, snapshotStrategy)
const reconstructor = new ShoppingCartReconstructor()
const reconstructionService = new StateReconstructionService(eventStore, snapshotStore, snapshotStrategy)
const aggregateId = 'cart-123'
// Simulate adding many events to the cart
const events: Event[] = [
{
id: crypto.randomUUID(),
aggregateId,
type: 'CART_CREATED',
data: {},
timestamp: new Date(),
version: 1
}
]
// Add 25 items to create many events
for (let i = 1; i <= 25; i++) {
events.push({
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: `prod-${i}`,
productName: `Product ${i}`,
quantity: 1,
unitPrice: 10 * i
},
timestamp: new Date(),
version: i + 1
})
}
await eventStore.saveEvents(aggregateId, events)
// Create a snapshot at current state
await reconstructionService.createSnapshot(aggregateId, reconstructor)
// Add more events after snapshot
const moreEvents: Event[] = []
for (let i = 26; i <= 35; i++) {
moreEvents.push({
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: `prod-${i}`,
productName: `Product ${i}`,
quantity: 1,
unitPrice: 10 * i
},
timestamp: new Date(),
version: i + 1
})
}
await eventStore.saveEvents(aggregateId, moreEvents)
// Get reconstruction metrics
const metrics = await reconstructionService.getReconstructionMetrics(aggregateId)
console.log('Reconstruction metrics:', metrics)
// Reconstruct state using snapshot optimization
const cartState = await reconstructionService.reconstructState(aggregateId, reconstructor)
console.log('Reconstructed cart state:', {
id: cartState.id,
itemCount: cartState.items.length,
totalAmount: cartState.totalAmount,
isActive: cartState.isActive,
version: cartState.version
})
// Performance comparison
const performanceTester = new ReconstructionPerformanceTester()
const performance = await performanceTester.compareReconstructionMethods(
aggregateId,
reconstructor,
eventStore,
reconstructionService
)
console.log('Performance comparison:', {
fullReconstruction: `${performance.fullReconstructionTime.toFixed(2)}ms`,
snapshotReconstruction: `${performance.snapshotReconstructionTime.toFixed(2)}ms`,
performanceGain: `${performance.performanceGain.toFixed(2)}x faster`
})
}
export {
Event,
Snapshot,
StateReconstructor,
SnapshotStrategy,
IntervalSnapshotStrategy,
SnapshotStore,
EnhancedEventStore,
CartState,
ShoppingCartReconstructor,
StateReconstructionService,
stateReconstructionExample
}
💻 Управление Версиями Событий
🔴 complex
⭐⭐⭐⭐⭐
Управление эволюцией схемы и стратегиями версионирования для систем event sourcing
⏱️ 75 min
🏷️ event-versioning, schema-evolution, data-migration
Prerequisites:
Understanding of event sourcing, Schema management knowledge
// Event Versioning System
interface EventSchema {
type: string
version: number
schema: any
}
interface EventTransformer {
fromVersion: number
toVersion: number
transform(event: any): any
}
interface VersionedEvent {
id: string
aggregateId: string
type: string
data: any
timestamp: Date
version: number
eventVersion: number
}
// Schema Registry
class SchemaRegistry {
private schemas: Map<string, EventSchema[]> = new Map()
private transformers: Map<string, EventTransformer[]> = new Map()
registerSchema(eventType: string, version: number, schema: any): void {
const schemas = this.schemas.get(eventType) || []
schemas.push({ type: eventType, version, schema })
this.schemas.set(eventType, schemas)
}
registerTransformer(eventType: string, transformer: EventTransformer): void {
const transformers = this.transformers.get(eventType) || []
transformers.push(transformer)
this.transformers.set(eventType, transformers)
}
getLatestVersion(eventType: string): number {
const schemas = this.schemas.get(eventType) || []
if (schemas.length === 0) {
throw new Error(`No schema found for event type: ${eventType}`)
}
return Math.max(...schemas.map(s => s.version))
}
getSchema(eventType: string, version: number): EventSchema | null {
const schemas = this.schemas.get(eventType) || []
return schemas.find(s => s.version === version) || null
}
getTransformers(eventType: string): EventTransformer[] {
return this.transformers.get(eventType) || []
}
getTransformationPath(eventType: string, fromVersion: number, toVersion: number): EventTransformer[] {
const transformers = this.getTransformers(eventType)
const path: EventTransformer[] = []
let currentVersion = fromVersion
while (currentVersion < toVersion) {
const nextTransformer = transformers.find(t => t.fromVersion === currentVersion)
if (!nextTransformer) {
throw new Error(`No transformer found from version ${currentVersion} for event type ${eventType}`)
}
path.push(nextTransformer)
currentVersion = nextTransformer.toVersion
}
return path
}
}
// Event Upgrader
class EventUpgrader {
constructor(private schemaRegistry: SchemaRegistry) {}
async upgradeEvent(event: VersionedEvent, targetVersion?: number): Promise<VersionedEvent> {
const latestVersion = targetVersion || this.schemaRegistry.getLatestVersion(event.type)
if (event.eventVersion >= latestVersion) {
return event // Event is already at or above target version
}
const transformationPath = this.schemaRegistry.getTransformationPath(
event.type,
event.eventVersion,
latestVersion
)
let upgradedEvent = { ...event }
for (const transformer of transformationPath) {
upgradedEvent.data = transformer.transform(upgradedEvent.data)
upgradedEvent.eventVersion = transformer.toVersion
}
return upgradedEvent
}
async upgradeEvents(events: VersionedEvent[]): Promise<VersionedEvent[]> {
const upgradedEvents: VersionedEvent[] = []
for (const event of events) {
try {
const upgraded = await this.upgradeEvent(event)
upgradedEvents.push(upgraded)
} catch (error) {
console.error(`Failed to upgrade event ${event.id}:`, error)
// Keep original event if upgrade fails
upgradedEvents.push(event)
}
}
return upgradedEvents
}
}
// Event Validation
class EventValidator {
constructor(private schemaRegistry: SchemaRegistry) {}
validateEvent(event: VersionedEvent): { isValid: boolean; errors: string[] } {
const schema = this.schemaRegistry.getSchema(event.type, event.eventVersion)
if (!schema) {
return {
isValid: false,
errors: [`No schema found for event type ${event.type} version ${event.eventVersion}`]
}
}
return this.validateAgainstSchema(event.data, schema.schema)
}
private validateAgainstSchema(data: any, schema: any): { isValid: boolean; errors: string[] } {
const errors: string[] = []
// Simple validation logic (in production, use a proper JSON schema validator)
if (schema.type === 'object' && schema.properties) {
for (const [key, propSchema] of Object.entries(schema.properties as any)) {
if (schema.required?.includes(key) && !(key in data)) {
errors.push(`Missing required property: ${key}`)
}
if (key in data) {
const propErrors = this.validateProperty(data[key], propSchema)
errors.push(...propErrors.map(e => `${key}: ${e}`))
}
}
}
return {
isValid: errors.length === 0,
errors
}
}
private validateProperty(value: any, schema: any): string[] {
const errors: string[] = []
if (schema.type && typeof value !== schema.type) {
errors.push(`Expected type ${schema.type}, got ${typeof value}`)
}
return errors
}
}
// Example Event Schemas
// V1: ItemAdded event with simple structure
const ItemAddedV1Schema = {
type: 'object',
properties: {
productId: { type: 'string' },
quantity: { type: 'number' },
unitPrice: { type: 'number' }
},
required: ['productId', 'quantity', 'unitPrice']
}
// V2: ItemAdded event with additional productName field
const ItemAddedV2Schema = {
type: 'object',
properties: {
productId: { type: 'string' },
productName: { type: 'string' },
quantity: { type: 'number' },
unitPrice: { type: 'number' },
discount: { type: 'number', minimum: 0, maximum: 1 }
},
required: ['productId', 'productName', 'quantity', 'unitPrice']
}
// V3: ItemAdded event with category and metadata
const ItemAddedV3Schema = {
type: 'object',
properties: {
productId: { type: 'string' },
productName: { type: 'string' },
quantity: { type: 'number' },
unitPrice: { type: 'number' },
discount: { type: 'number', minimum: 0, maximum: 1 },
category: { type: 'string' },
metadata: { type: 'object' }
},
required: ['productId', 'productName', 'quantity', 'unitPrice', 'category']
}
// Event Transformers
// V1 to V2: Add productName based on productId lookup
class ItemAddedV1ToV2Transformer implements EventTransformer {
fromVersion = 1
toVersion = 2
// In a real implementation, this would query a product catalog
private productNames: Record<string, string> = {
'prod-1': 'Laptop',
'prod-2': 'Mouse',
'prod-3': 'Keyboard'
}
transform(event: any): any {
return {
...event,
productName: this.productNames[event.productId] || 'Unknown Product',
discount: event.discount || 0
}
}
}
// V2 to V3: Add category and metadata
class ItemAddedV2ToV3Transformer implements EventTransformer {
fromVersion = 2
toVersion = 3
// Simple categorization based on product ID patterns
private getProductCategory(productId: string): string {
if (productId.includes('laptop')) return 'Electronics'
if (productId.includes('mouse') || productId.includes('keyboard')) return 'Accessories'
return 'General'
}
transform(event: any): any {
return {
...event,
category: this.getProductCategory(event.productId),
metadata: {
source: 'legacy-system',
migratedAt: new Date().toISOString()
}
}
}
}
// V2 to V4: Remove discount, add pricing breakdown
class ItemAddedV3ToV4Transformer implements EventTransformer {
fromVersion = 3
toVersion = 4
transform(event: any): any {
const discountAmount = event.unitPrice * event.quantity * (event.discount || 0)
const finalPrice = event.unitPrice - (event.unitPrice * (event.discount || 0))
return {
productId: event.productId,
productName: event.productName,
quantity: event.quantity,
unitPrice: event.unitPrice,
finalPrice,
discountAmount,
category: event.category,
metadata: {
...event.metadata,
pricing: {
originalUnitPrice: event.unitPrice,
discountPercentage: event.discount || 0,
discountAmount,
finalUnitPrice: finalPrice
}
}
}
}
}
const ItemAddedV4Schema = {
type: 'object',
properties: {
productId: { type: 'string' },
productName: { type: 'string' },
quantity: { type: 'number' },
unitPrice: { type: 'number' },
finalPrice: { type: 'number' },
discountAmount: { type: 'number' },
category: { type: 'string' },
metadata: {
type: 'object',
properties: {
source: { type: 'string' },
migratedAt: { type: 'string' },
pricing: {
type: 'object',
properties: {
originalUnitPrice: { type: 'number' },
discountPercentage: { type: 'number' },
discountAmount: { type: 'number' },
finalUnitPrice: { type: 'number' }
}
}
}
}
},
required: ['productId', 'productName', 'quantity', 'unitPrice', 'finalPrice', 'category']
}
// Versioned Event Store
class VersionedEventStore {
private events: Map<string, VersionedEvent[]> = new Map()
constructor(
private schemaRegistry: SchemaRegistry,
private eventUpgrader: EventUpgrader,
private eventValidator: EventValidator
) {}
async saveEvent(event: VersionedEvent): Promise<void> {
// Validate event against its schema
const validation = this.eventValidator.validateEvent(event)
if (!validation.isValid) {
throw new Error(`Event validation failed: ${validation.errors.join(', ')}`)
}
const aggregateEvents = this.events.get(event.aggregateId) || []
aggregateEvents.push(event)
this.events.set(event.aggregateId, aggregateEvents)
}
async getEvents(aggregateId: string, targetVersion?: number): Promise<VersionedEvent[]> {
const events = this.events.get(aggregateId) || []
if (!targetVersion) {
return events
}
// Upgrade events to target version
const upgradedEvents: VersionedEvent[] = []
for (const event of events) {
const upgraded = await this.eventUpgrader.upgradeEvent(event, targetVersion)
upgradedEvents.push(upgraded)
}
return upgradedEvents
}
async getLatestEvents(aggregateId: string): Promise<VersionedEvent[]> {
const events = this.events.get(aggregateId) || []
const latestVersion = Math.max(...events.map(e => this.schemaRegistry.getLatestVersion(e.type)))
return this.getEvents(aggregateId, latestVersion)
}
async migrateEvents(aggregateId: string): Promise<{
totalEvents: number
upgradedEvents: number
errors: string[]
}> {
const events = this.events.get(aggregateId) || []
const errors: string[] = []
let upgradedCount = 0
const upgradedEvents = await this.eventUpgrader.upgradeEvents(events)
// Validate upgraded events
for (const event of upgradedEvents) {
const validation = this.eventValidator.validateEvent(event)
if (!validation.isValid) {
errors.push(`Event ${event.id}: ${validation.errors.join(', ')}`)
} else if (event.eventVersion !== this.schemaRegistry.getLatestVersion(event.type)) {
upgradedCount++
}
}
// Store upgraded events
this.events.set(aggregateId, upgradedEvents)
return {
totalEvents: events.length,
upgradedEvents: upgradedCount,
errors
}
}
}
// Event Versioning Analytics
class EventVersioningAnalytics {
constructor(
private eventStore: VersionedEventStore,
private schemaRegistry: SchemaRegistry
) {}
async getVersionDistribution(aggregateId?: string): Promise<Array<{
eventType: string
versionDistribution: Record<number, number>
totalEvents: number
}>> {
// This would analyze all events in the store
// For demo purposes, return sample data
return [
{
eventType: 'ITEM_ADDED',
versionDistribution: {
1: 10,
2: 25,
3: 15
},
totalEvents: 50
},
{
eventType: 'ITEM_REMOVED',
versionDistribution: {
1: 5,
2: 8
},
totalEvents: 13
}
]
}
async getMigrationReport(aggregateId: string): Promise<{
eventsMigrated: number
eventsWithErrors: number
errorDetails: string[]
}> {
const migration = await this.eventStore.migrateEvents(aggregateId)
return {
eventsMigrated: migration.upgradedEvents,
eventsWithErrors: migration.errors.length,
errorDetails: migration.errors
}
}
async getSchemaEvolutionTimeline(eventType: string): Promise<Array<{
version: number
introducedAt: Date
changes: string[]
}>> {
// This would track when each schema version was introduced
return [
{
version: 1,
introducedAt: new Date('2023-01-01'),
changes: ['Initial version', 'Basic item data']
},
{
version: 2,
introducedAt: new Date('2023-06-01'),
changes: ['Added productName field', 'Added optional discount']
},
{
version: 3,
introducedAt: new Date('2024-01-01'),
changes: ['Added category field', 'Added metadata object']
}
]
}
}
// Setup and Usage Example
async function setupEventVersioning() {
const schemaRegistry = new SchemaRegistry()
// Register schemas
schemaRegistry.registerSchema('ITEM_ADDED', 1, ItemAddedV1Schema)
schemaRegistry.registerSchema('ITEM_ADDED', 2, ItemAddedV2Schema)
schemaRegistry.registerSchema('ITEM_ADDED', 3, ItemAddedV3Schema)
schemaRegistry.registerSchema('ITEM_ADDED', 4, ItemAddedV4Schema)
// Register transformers
schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV1ToV2Transformer())
schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV2ToV3Transformer())
schemaRegistry.registerTransformer('ITEM_ADDED', new ItemAddedV3ToV4Transformer())
const eventUpgrader = new EventUpgrader(schemaRegistry)
const eventValidator = new EventValidator(schemaRegistry)
const eventStore = new VersionedEventStore(schemaRegistry, eventUpgrader, eventValidator)
const analytics = new EventVersioningAnalytics(eventStore, schemaRegistry)
return { eventStore, analytics, schemaRegistry }
}
async function eventVersioningExample() {
const { eventStore, analytics, schemaRegistry } = await setupEventVersioning()
// Create sample events with different versions
const aggregateId = 'cart-123'
const events: VersionedEvent[] = [
{
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: 'prod-1',
quantity: 1,
unitPrice: 999.99
},
timestamp: new Date('2023-05-01'),
version: 1,
eventVersion: 1 // V1 schema
},
{
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: 'prod-2',
productName: 'Wireless Mouse',
quantity: 2,
unitPrice: 29.99,
discount: 0.1
},
timestamp: new Date('2023-07-01'),
version: 2,
eventVersion: 2 // V2 schema
},
{
id: crypto.randomUUID(),
aggregateId,
type: 'ITEM_ADDED',
data: {
productId: 'prod-3',
productName: 'Mechanical Keyboard',
quantity: 1,
unitPrice: 149.99,
discount: 0.05,
category: 'Accessories',
metadata: {
source: 'web-app',
addedBy: 'user-456'
}
},
timestamp: new Date('2024-02-01'),
version: 3,
eventVersion: 3 // V3 schema
}
]
// Save events (validation will occur)
for (const event of events) {
try {
await eventStore.saveEvent(event)
console.log(`Saved event ${event.id} with version ${event.eventVersion}`)
} catch (error) {
console.error(`Failed to save event ${event.id}:`, error)
}
}
// Get events in their original versions
console.log('\n=== Original Events ===')
const originalEvents = await eventStore.getEvents(aggregateId)
originalEvents.forEach(event => {
console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
})
// Get events upgraded to latest version
console.log('\n=== Upgraded Events (Latest Version) ===')
const latestEvents = await eventStore.getLatestEvents(aggregateId)
latestEvents.forEach(event => {
console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
})
// Get events upgraded to specific version
console.log('\n=== Events Upgraded to V2 ===')
const v2Events = await eventStore.getEvents(aggregateId, 2)
v2Events.forEach(event => {
console.log(`Event ${event.id}: version ${event.eventVersion}, data:`, event.data)
})
// Migration report
console.log('\n=== Migration Report ===')
const migrationReport = await analytics.getMigrationReport(aggregateId)
console.log('Migration report:', migrationReport)
// Version distribution
console.log('\n=== Version Distribution ===')
const versionDistribution = await analytics.getVersionDistribution()
versionDistribution.forEach(dist => {
console.log(`Event type ${dist.eventType}:`, dist.versionDistribution)
})
// Schema evolution timeline
console.log('\n=== Schema Evolution Timeline ===')
const timeline = await analytics.getSchemaEvolutionTimeline('ITEM_ADDED')
timeline.forEach(entry => {
console.log(`Version ${entry.version} (${entry.introducedAt.toISOString().split('T')[0]}):`, entry.changes)
})
}
export {
VersionedEvent,
EventSchema,
EventTransformer,
SchemaRegistry,
EventUpgrader,
EventValidator,
VersionedEventStore,
EventVersioningAnalytics,
ItemAddedV1ToV2Transformer,
ItemAddedV2ToV3Transformer,
ItemAddedV3ToV4Transformer,
setupEventVersioning,
eventVersioningExample
}