🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples de Patterns de Systèmes Distribués
Patterns complets de systèmes distribués couvrant la cohérence, les algorithmes de consensus, le sharding et les mécanismes de tolérance aux pannes
💻 Modèles de Cohérence Distribuée
🔴 complex
⭐⭐⭐⭐⭐
Implémentation de différents modèles de cohérence et patterns de coordination
⏱️ 80 min
🏷️ distributed-consistency, strong-consistency, eventual-consistency, vector-clocks
Prerequisites:
Distributed systems fundamentals, CAP theorem understanding
// Distributed Consistency Models Implementation
// Consistency Model Types
enum ConsistencyModel {
STRONG = 'strong',
SEQUENTIAL = 'sequential',
CAUSAL = 'causal',
EVENTUAL = 'eventual',
WEAK = 'weak'
}
interface DataItem {
key: string
value: any
version: number
timestamp: Date
nodeId: string
}
interface Operation {
id: string
type: 'READ' | 'WRITE' | 'DELETE'
key: string
value?: any
timestamp: Date
nodeId: string
}
interface ConsistencyResult {
success: boolean
value?: any
version?: number
errorMessage?: string
}
// Strong Consistency Implementation (Linearizability)
class StrongConsistencyManager {
private data: Map<string, DataItem> = new Map()
private operationLog: Operation[] = []
private lockTable: Map<string, string> = new Map() // key -> nodeId
async write(key: string, value: any, nodeId: string): Promise<ConsistencyResult> {
// Acquire lock for strong consistency
const lockAcquired = await this.acquireLock(key, nodeId)
if (!lockAcquired) {
return { success: false, errorMessage: 'Could not acquire lock for write operation' }
}
try {
const currentData = this.data.get(key)
const newVersion = (currentData?.version || 0) + 1
const newDataItem: DataItem = {
key,
value,
version: newVersion,
timestamp: new Date(),
nodeId
}
// Log the operation
const operation: Operation = {
id: crypto.randomUUID(),
type: 'WRITE',
key,
value,
timestamp: new Date(),
nodeId
}
this.operationLog.push(operation)
this.data.set(key, newDataItem)
console.log(`[STRONG] Write operation for key ${key} by ${nodeId} - Version: ${newVersion}`)
return { success: true, value, version: newVersion }
} finally {
await this.releaseLock(key, nodeId)
}
}
async read(key: string, nodeId: string): Promise<ConsistencyResult> {
// For strong consistency, read from primary with lock
const lockAcquired = await this.acquireLock(key, nodeId, 5000) // 5 second timeout
if (!lockAcquired) {
return { success: false, errorMessage: 'Could not acquire lock for read operation' }
}
try {
const dataItem = this.data.get(key)
if (!dataItem) {
return { success: false, errorMessage: 'Key not found' }
}
const operation: Operation = {
id: crypto.randomUUID(),
type: 'READ',
key,
timestamp: new Date(),
nodeId
}
this.operationLog.push(operation)
console.log(`[STRONG] Read operation for key ${key} by ${nodeId} - Version: ${dataItem.version}`)
return {
success: true,
value: dataItem.value,
version: dataItem.version
}
} finally {
await this.releaseLock(key, nodeId)
}
}
private async acquireLock(key: string, nodeId: string, timeout: number = 30000): Promise<boolean> {
const startTime = Date.now()
while (Date.now() - startTime < timeout) {
const currentLock = this.lockTable.get(key)
if (!currentLock || currentLock === nodeId) {
this.lockTable.set(key, nodeId)
return true
}
// Wait before retrying
await new Promise(resolve => setTimeout(resolve, 100))
}
return false
}
private async releaseLock(key: string, nodeId: string): Promise<void> {
const currentLock = this.lockTable.get(key)
if (currentLock === nodeId) {
this.lockTable.delete(key)
}
}
getOperationHistory(): Operation[] {
return [...this.operationLog]
}
}
// Eventual Consistency Implementation with Gossip Protocol
class EventualConsistencyManager {
private nodeData: Map<string, Map<string, DataItem>> = new Map() // nodeId -> (key -> data)
private vectorClocks: Map<string, Map<string, number>> = new Map() // nodeId -> (nodeId -> counter)
private gossipInterval: number = 5000 // 5 seconds
constructor(private nodeId: string, private knownNodes: string[]) {
this.initializeNode()
this.startGossipProtocol()
}
private initializeNode(): void {
this.nodeData.set(this.nodeId, new Map())
this.vectorClocks.set(this.nodeId, new Map())
const nodeClock = this.vectorClocks.get(this.nodeId)!
for (const node of this.knownNodes) {
nodeClock.set(node, 0)
}
nodeClock.set(this.nodeId, 0)
}
async write(key: string, value: any): Promise<ConsistencyResult> {
const nodeData = this.nodeData.get(this.nodeId)!
const nodeClock = this.vectorClocks.get(this.nodeId)!
// Increment vector clock for this node
const currentCounter = nodeClock.get(this.nodeId) || 0
nodeClock.set(this.nodeId, currentCounter + 1)
const newDataItem: DataItem = {
key,
value,
version: currentCounter + 1,
timestamp: new Date(),
nodeId: this.nodeId
}
// Store with vector clock
(newDataItem as any).vectorClock = new Map(nodeClock)
nodeData.set(key, newDataItem)
console.log(`[EVENTUAL] Local write for key ${key} on node ${this.nodeId} - VC: ${JSON.stringify(Object.fromEntries(nodeClock))}`)
return { success: true, value, version: currentCounter + 1 }
}
async read(key: string): Promise<ConsistencyResult> {
const nodeData = this.nodeData.get(this.nodeId)!
const dataItem = nodeData.get(key)
if (!dataItem) {
return { success: false, errorMessage: 'Key not found' }
}
console.log(`[EVENTUAL] Read key ${key} on node ${this.nodeId} - Version: ${dataItem.version}`)
return {
success: true,
value: dataItem.value,
version: dataItem.version
}
}
private async startGossipProtocol(): Promise<void> {
setInterval(async () => {
await this.performGossip()
}, this.gossipInterval)
}
private async performGossip(): Promise<void> {
if (this.knownNodes.length === 0) return
// Select random node to gossip with
const targetNode = this.knownNodes[Math.floor(Math.random() * this.knownNodes.length)]
try {
await this.exchangeDataWithNode(targetNode)
} catch (error) {
console.log(`Gossip with node ${targetNode} failed:`, error)
}
}
private async exchangeDataWithNode(targetNodeId: string): Promise<void> {
console.log(`[GOSSIP] Exchanging data between ${this.nodeId} and ${targetNodeId}`)
// Simulate data exchange with another node
const targetNodeData = this.getSimulatedNodeData(targetNodeId)
const myNodeData = this.nodeData.get(this.nodeId)!
// Merge data from target node
for (const [key, targetData] of targetNodeData.entries()) {
const myData = myNodeData.get(key)
if (!myData) {
// Key doesn't exist locally, adopt it
myNodeData.set(key, targetData)
console.log(`[GOSSIP] Adopted key ${key} from node ${targetNodeId}`)
} else {
// Resolve conflicts using vector clocks
const resolved = this.resolveConflict(myData, targetData)
if (resolved !== myData) {
myNodeData.set(key, resolved)
console.log(`[GOSSIP] Resolved conflict for key ${key}`)
}
}
}
}
private resolveConflict(data1: DataItem, data2: DataItem): DataItem {
const vc1 = (data1 as any).vectorClock as Map<string, number>
const vc2 = (data2 as any).vectorClock as Map<string, number>
if (!vc1 || !vc2) {
// Fallback to timestamp
return data1.timestamp >= data2.timestamp ? data1 : data2
}
// Compare vector clocks
const result = this.compareVectorClocks(vc1, vc2)
if (result === 'concurrent') {
// Conflict - choose the one with higher timestamp (last-writer-wins)
return data1.timestamp >= data2.timestamp ? data1 : data2
} else if (result === 'greater') {
return data1
} else {
return data2
}
}
private compareVectorClocks(vc1: Map<string, number>, vc2: Map<string, number>): 'greater' | 'less' | 'equal' | 'concurrent' {
let greater = false
let less = false
const allKeys = new Set([...vc1.keys(), ...vc2.keys()])
for (const key of allKeys) {
const v1 = vc1.get(key) || 0
const v2 = vc2.get(key) || 0
if (v1 > v2) greater = true
if (v1 < v2) less = true
if (greater && less) return 'concurrent'
}
if (greater && !less) return 'greater'
if (!greater && less) return 'less'
return 'equal'
}
private getSimulatedNodeData(nodeId: string): Map<string, DataItem> {
// Simulate getting data from another node
const simulatedData = new Map<string, DataItem>()
// Add some sample data for simulation
if (Math.random() > 0.5) {
simulatedData.set('shared_key_1', {
key: 'shared_key_1',
value: `value_from_${nodeId}`,
version: Math.floor(Math.random() * 10),
timestamp: new Date(Date.now() - Math.random() * 10000),
nodeId
})
}
return simulatedData
}
getLocalData(): Map<string, DataItem> {
return new Map(this.nodeData.get(this.nodeId))
}
}
// Causal Consistency Implementation
class CausalConsistencyManager {
private data: Map<string, DataItem> = new Map()
private dependencyGraph: Map<string, Set<string>> = new Map() // operation -> dependencies
private pendingOperations: Map<string, Operation> = new Map()
async write(key: string, value: any, dependencies: string[] = []): Promise<ConsistencyResult> {
const operationId = crypto.randomUUID()
const timestamp = new Date()
const operation: Operation = {
id: operationId,
type: 'WRITE',
key,
value,
timestamp,
nodeId: 'causal-node'
}
// Check if dependencies are satisfied
const dependenciesSatisfied = this.checkDependencies(dependencies)
if (!dependenciesSatisfied) {
// Add to pending operations
this.pendingOperations.set(operationId, operation)
this.dependencyGraph.set(operationId, new Set(dependencies))
console.log(`[CAUSAL] Write for key ${key} pending dependencies: ${dependencies.join(', ')}`)
return { success: false, errorMessage: 'Dependencies not satisfied' }
}
// Execute operation
const newDataItem: DataItem = {
key,
value,
version: Date.now(), // Use timestamp as version
timestamp,
nodeId: 'causal-node'
}
this.data.set(key, newDataItem)
this.dependencyGraph.delete(operationId)
// Process pending operations that now have satisfied dependencies
await this.processPendingOperations(operationId)
console.log(`[CAUSAL] Write for key ${key} executed successfully`)
return { success: true, value, version: newDataItem.version }
}
async read(key: string): Promise<ConsistencyResult> {
const dataItem = this.data.get(key)
if (!dataItem) {
return { success: false, errorMessage: 'Key not found' }
}
console.log(`[CAUSAL] Read key ${key} - Version: ${dataItem.version}`)
return {
success: true,
value: dataItem.value,
version: dataItem.version
}
}
private checkDependencies(dependencies: string[]): boolean {
return dependencies.every(dep => !this.pendingOperations.has(dep))
}
private async processPendingOperations(completedOperationId: string): Promise<void> {
const toProcess: string[] = []
// Find operations that depend on the completed operation
for (const [opId, dependencies] of this.dependencyGraph.entries()) {
if (dependencies.has(completedOperationId)) {
dependencies.delete(completedOperationId)
if (dependencies.size === 0) {
toProcess.push(opId)
}
}
}
// Process operations whose dependencies are now satisfied
for (const opId of toProcess) {
const operation = this.pendingOperations.get(opId)
if (!operation) continue
this.pendingOperations.delete(opId)
this.dependencyGraph.delete(opId)
// Execute the pending operation
if (operation.type === 'WRITE' && operation.value !== undefined) {
const newDataItem: DataItem = {
key: operation.key,
value: operation.value,
version: Date.now(),
timestamp: operation.timestamp,
nodeId: operation.nodeId
}
this.data.set(operation.key, newDataItem)
console.log(`[CAUSAL] Executed pending write for key ${operation.key}`)
}
// Recursively process more pending operations
await this.processPendingOperations(opId)
}
}
getPendingOperationsCount(): number {
return this.pendingOperations.size
}
getDependencyGraph(): Map<string, Set<string>> {
return new Map(this.dependencyGraph)
}
}
// Consistency Performance Analyzer
class ConsistencyPerformanceAnalyzer {
private metrics: Map<string, Array<{
operation: string
latency: number
timestamp: Date
success: boolean
}>> = new Map()
recordMetric(consistencyModel: string, operation: string, latency: number, success: boolean): void {
if (!this.metrics.has(consistencyModel)) {
this.metrics.set(consistencyModel, [])
}
const modelMetrics = this.metrics.get(consistencyModel)!
modelMetrics.push({
operation,
latency,
timestamp: new Date(),
success
})
// Keep only last 1000 metrics per model
if (modelMetrics.length > 1000) {
modelMetrics.shift()
}
}
getPerformanceReport(): {
models: Array<{
name: string
avgLatency: number
successRate: number
totalOperations: number
readLatency: number
writeLatency: number
}>
recommendations: string[]
} {
const models: Array<{
name: string
avgLatency: number
successRate: number
totalOperations: number
readLatency: number
writeLatency: number
}> = []
for (const [modelName, metrics] of this.metrics.entries()) {
if (metrics.length === 0) continue
const successfulOps = metrics.filter(m => m.success)
const readOps = metrics.filter(m => m.operation === 'READ')
const writeOps = metrics.filter(m => m.operation === 'WRITE')
const avgLatency = metrics.reduce((sum, m) => sum + m.latency, 0) / metrics.length
const successRate = (successfulOps.length / metrics.length) * 100
const readLatency = readOps.length > 0 ? readOps.reduce((sum, m) => sum + m.latency, 0) / readOps.length : 0
const writeLatency = writeOps.length > 0 ? writeOps.reduce((sum, m) => sum + m.latency, 0) / writeOps.length : 0
models.push({
name: modelName,
avgLatency,
successRate,
totalOperations: metrics.length,
readLatency,
writeLatency
})
}
const recommendations = this.generateRecommendations(models)
return { models, recommendations }
}
private generateRecommendations(models: any[]): string[] {
const recommendations: string[] = []
// Analyze latency patterns
const avgLatencies = models.map(m => m.avgLatency)
const maxLatency = Math.max(...avgLatencies)
if (maxLatency > 1000) { // 1 second
recommendations.push('High latency detected. Consider optimizing network communication or using caching.')
}
// Analyze success rates
const lowSuccessModels = models.filter(m => m.successRate < 95)
if (lowSuccessModels.length > 0) {
recommendations.push('Low success rates detected. Check for network failures or implement retry mechanisms.')
}
// Compare read vs write latencies
const highWriteLatencyModels = models.filter(m => m.writeLatency > m.readLatency * 2)
if (highWriteLatencyModels.length > 0) {
recommendations.push('Write operations significantly slower than reads. Consider write-optimizations or eventual consistency for writes.')
}
// Model-specific recommendations
const strongModel = models.find(m => m.name.includes('STRONG'))
const eventualModel = models.find(m => m.name.includes('EVENTUAL'))
if (strongModel && eventualModel) {
if (strongModel.avgLatency > eventualModel.avgLatency * 3) {
recommendations.push('Strong consistency shows high latency penalty. Evaluate if eventual consistency could be sufficient for your use case.')
}
}
if (recommendations.length === 0) {
recommendations.push('Performance metrics look good. Continue monitoring for any degradation.')
}
return recommendations
}
}
// Usage Example
async function consistencyModelsExample() {
console.log('=== Distributed Consistency Models ===\n')
// Initialize different consistency managers
const strongManager = new StrongConsistencyManager()
const eventualManager1 = new EventualConsistencyManager('node-1', ['node-2', 'node-3'])
const eventualManager2 = new EventualConsistencyManager('node-2', ['node-1', 'node-3'])
const causalManager = new CausalConsistencyManager()
const analyzer = new ConsistencyPerformanceAnalyzer()
console.log('1. Testing Strong Consistency')
// Test strong consistency with concurrent operations
const strongStartTime = Date.now()
await strongManager.write('key1', 'value1', 'node-1')
await strongManager.read('key1', 'node-2')
await strongManager.write('key1', 'value2', 'node-2')
const strongResult = await strongManager.read('key1', 'node-1')
const strongLatency = Date.now() - strongStartTime
console.log(`Strong consistency result: ${JSON.stringify(strongResult)}`)
analyzer.recordMetric('STRONG', 'WRITE', strongLatency / 2, true)
analyzer.recordMetric('STRONG', 'READ', strongLatency / 2, true)
console.log('\n2. Testing Eventual Consistency')
// Test eventual consistency
const eventualStartTime = Date.now()
await eventualManager1.write('key1', 'value1')
await eventualManager2.write('key2', 'value2')
// Wait for gossip to propagate
await new Promise(resolve => setTimeout(resolve, 6000))
const read1 = await eventualManager1.read('key1')
const read2 = await eventualManager2.read('key2')
const eventualLatency = Date.now() - eventualStartTime
console.log(`Eventual consistency results: node1=${JSON.stringify(read1)}, node2=${JSON.stringify(read2)}`)
analyzer.recordMetric('EVENTUAL', 'WRITE', eventualLatency / 2, true)
analyzer.recordMetric('EVENTUAL', 'READ', eventualLatency / 2, true)
// Show data convergence
console.log('\nNode 1 data:', Array.from(eventualManager1.getLocalData().keys()))
console.log('Node 2 data:', Array.from(eventualManager2.getLocalData().keys()))
console.log('\n3. Testing Causal Consistency')
// Test causal consistency with dependencies
const causalStartTime = Date.now()
await causalManager.write('key1', 'value1') // First operation
await causalManager.write('key2', 'value2', ['op-1']) // Depends on first operation
const causalResult = await causalManager.read('key2')
const causalLatency = Date.now() - causalStartTime
console.log(`Causal consistency result: ${JSON.stringify(causalResult)}`)
console.log(`Pending operations: ${causalManager.getPendingOperationsCount()}`)
analyzer.recordMetric('CAUSAL', 'WRITE', causalLatency / 2, true)
analyzer.recordMetric('CAUSAL', 'READ', causalLatency / 2, true)
// Test concurrent operations with potential conflicts
console.log('\n4. Testing Concurrent Operations')
const concurrentPromises = []
// Strong consistency - should serialize
for (let i = 0; i < 5; i++) {
concurrentPromises.push(strongManager.write(`concurrent_key_${i}`, `value_${i}`, `node-${i}`))
}
const concurrentStartTime = Date.now()
await Promise.all(concurrentPromises)
const concurrentLatency = Date.now() - concurrentStartTime
console.log(`Concurrent strong operations completed in ${concurrentLatency}ms`)
// Performance analysis
console.log('\n5. Performance Analysis Report')
// Add some more sample metrics
for (let i = 0; i < 10; i++) {
analyzer.recordMetric('STRONG', 'READ', Math.random() * 200 + 50, true)
analyzer.recordMetric('STRONG', 'WRITE', Math.random() * 500 + 200, true)
analyzer.recordMetric('EVENTUAL', 'READ', Math.random() * 50 + 10, true)
analyzer.recordMetric('EVENTUAL', 'WRITE', Math.random() * 100 + 20, true)
analyzer.recordMetric('CAUSAL', 'READ', Math.random() * 150 + 30, true)
analyzer.recordMetric('CAUSAL', 'WRITE', Math.random() * 300 + 100, true)
}
const report = analyzer.getPerformanceReport()
console.log('\nPerformance by Model:')
report.models.forEach(model => {
console.log(` ${model.name}:`)
console.log(` Average Latency: ${model.avgLatency.toFixed(2)}ms`)
console.log(` Success Rate: ${model.successRate.toFixed(1)}%`)
console.log(` Read Latency: ${model.readLatency.toFixed(2)}ms`)
console.log(` Write Latency: ${model.writeLatency.toFixed(2)}ms`)
console.log(` Total Operations: ${model.totalOperations}`)
})
console.log('\nRecommendations:')
report.recommendations.forEach(rec => {
console.log(` • ${rec}`)
})
}
export {
ConsistencyModel,
DataItem,
Operation,
ConsistencyResult,
StrongConsistencyManager,
EventualConsistencyManager,
CausalConsistencyManager,
ConsistencyPerformanceAnalyzer,
consistencyModelsExample
}
💻 Algorithmes de Consensus (Raft, PBFT)
🔴 complex
⭐⭐⭐⭐⭐
Implémentation des algorithmes de consensus Raft et PBFT pour accord distribué
⏱️ 90 min
🏷️ consensus-algorithms, raft, pbft, byzantine-fault-tolerance
Prerequisites:
Advanced distributed systems knowledge, Understanding of consensus protocols
// Consensus Algorithms Implementation
// Node Types and States
enum NodeState {
FOLLOWER = 'FOLLOWER',
CANDIDATE = 'CANDIDATE',
LEADER = 'LEADER'
}
enum RaftMessageType {
APPEND_ENTRIES = 'APPEND_ENTRIES',
APPEND_ENTRIES_RESPONSE = 'APPEND_ENTRIES_RESPONSE',
REQUEST_VOTE = 'REQUEST_VOTE',
REQUEST_VOTE_RESPONSE = 'REQUEST_VOTE_RESPONSE',
HEARTBEAT = 'HEARTBEAT'
}
enum PBFTMessageType {
PRE_PREPARE = 'PRE_PREPARE',
PREPARE = 'PREPARE',
COMMIT = 'COMMIT',
VIEW_CHANGE = 'VIEW_CHANGE',
NEW_VIEW = 'NEW_VIEW'
}
interface RaftMessage {
type: RaftMessageType
term: number
senderId: string
receiverId: string
data?: any
}
interface PBFTMessage {
type: PBFTMessageType
view: number
sequenceNumber: number
senderId: string
digest: string
data?: any
signatures?: string[]
}
interface LogEntry {
term: number
index: number
command: any
timestamp: Date
}
// Raft Consensus Algorithm Implementation
class RaftNode {
private state: NodeState = NodeState.FOLLOWER
private currentTerm: number = 0
private votedFor: string | null = null
private log: LogEntry[] = []
private commitIndex: number = 0
private lastApplied: number = 0
// Leader state
private leaderId: string | null = null
private nextIndex: Map<string, number> = new Map()
private matchIndex: Map<string, number> = new Map()
// Timing
private electionTimeout: number
private heartbeatInterval: number
private lastHeartbeat: number = Date.now()
private electionTimer: NodeJS.Timeout | null = null
private heartbeatTimer: NodeJS.Timeout | null = null
constructor(
public nodeId: string,
public clusterNodes: string[],
private messageHandler: (message: RaftMessage) => void
) {
this.electionTimeout = Math.random() * 3000 + 5000 // 5-8 seconds
this.heartbeatInterval = 1000 // 1 second
this.initializeNode()
}
private initializeNode(): void {
// Initialize nextIndex for all nodes
for (const nodeId of this.clusterNodes) {
if (nodeId !== this.nodeId) {
this.nextIndex.set(nodeId, 1)
this.matchIndex.set(nodeId, 0)
}
}
this.startElectionTimer()
}
// Client request to append command
async clientRequest(command: any): Promise<{
success: boolean
message?: string
term?: number
}> {
if (this.state !== NodeState.LEADER) {
return {
success: false,
message: `Not a leader. Current leader: ${this.leaderId || 'unknown'}`,
term: this.currentTerm
}
}
const logEntry: LogEntry = {
term: this.currentTerm,
index: this.log.length + 1,
command,
timestamp: new Date()
}
this.log.push(logEntry)
console.log(`[RAFT-${this.nodeId}] Client request appended to log at index ${logEntry.index}`)
// Replicate to followers
const success = await this.replicateLog()
if (success) {
return { success: true, term: this.currentTerm }
} else {
return { success: false, message: 'Failed to replicate to majority', term: this.currentTerm }
}
}
private async replicateLog(): Promise<boolean> {
let successCount = 1 // Count self
const majority = Math.floor(this.clusterNodes.length / 2) + 1
for (const followerId of this.clusterNodes) {
if (followerId === this.nodeId) continue
const nextIdx = this.nextIndex.get(followerId) || 1
const entries = this.log.slice(nextIdx - 1)
if (entries.length === 0) {
// Send heartbeat
this.sendHeartbeat(followerId)
successCount++
} else {
// Send append entries
const success = await this.sendAppendEntries(followerId, entries)
if (success) successCount++
}
}
return successCount >= majority
}
private sendHeartbeat(followerId: string): void {
const message: RaftMessage = {
type: RaftMessageType.HEARTBEAT,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: followerId,
data: { leaderId: this.nodeId }
}
this.messageHandler(message)
}
private async sendAppendEntries(followerId: string, entries: LogEntry[]): Promise<boolean> {
const prevLogIndex = (this.nextIndex.get(followerId) || 1) - 1
const prevLogTerm = prevLogIndex > 0 ? this.log[prevLogIndex - 1].term : 0
const message: RaftMessage = {
type: RaftMessageType.APPEND_ENTRIES,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: followerId,
data: {
entries,
prevLogIndex,
prevLogTerm,
leaderCommit: this.commitIndex
}
}
// Simulate sending message and getting response
const response = await this.simulateMessageExchange(message)
if (response && response.data?.success) {
const nextIdx = (this.nextIndex.get(followerId) || 1) + entries.length
this.nextIndex.set(followerId, nextIdx)
this.matchIndex.set(followerId, nextIdx - 1)
// Try to commit entries
this.updateCommitIndex()
return true
} else {
// Decrement nextIndex and retry
this.nextIndex.set(followerId, Math.max(1, (this.nextIndex.get(followerId) || 1) - 1))
return false
}
}
private async simulateMessageExchange(message: RaftMessage): Promise<RaftMessage | null> {
// Simulate network delay and potential failure
await new Promise(resolve => setTimeout(resolve, Math.random() * 100 + 50))
// 90% success rate
if (Math.random() > 0.1) {
return {
type: RaftMessageType.APPEND_ENTRIES_RESPONSE,
term: message.term,
senderId: message.receiverId,
receiverId: message.senderId,
data: { success: true }
}
}
return null
}
// Handle incoming messages
handleMessage(message: RaftMessage): void {
if (message.term > this.currentTerm) {
this.currentTerm = message.term
this.votedFor = null
this.state = NodeState.FOLLOWER
this.leaderId = null
}
switch (message.type) {
case RaftMessageType.APPEND_ENTRIES:
this.handleAppendEntries(message)
break
case RaftMessageType.APPEND_ENTRIES_RESPONSE:
this.handleAppendEntriesResponse(message)
break
case RaftMessageType.REQUEST_VOTE:
this.handleRequestVote(message)
break
case RaftMessageType.REQUEST_VOTE_RESPONSE:
this.handleRequestVoteResponse(message)
break
case RaftMessageType.HEARTBEAT:
this.handleHeartbeat(message)
break
}
}
private handleAppendEntries(message: RaftMessage): void {
const { entries, prevLogIndex, prevLogTerm, leaderCommit } = message.data || {}
// Check if we can accept the entries
if (prevLogIndex > 0) {
if (this.log.length < prevLogIndex || this.log[prevLogIndex - 1].term !== prevLogTerm) {
// Reject append entries
this.sendMessage({
type: RaftMessageType.APPEND_ENTRIES_RESPONSE,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: message.senderId,
data: { success: false }
})
return
}
}
// Accept and append entries
this.log = this.log.slice(0, prevLogIndex)
this.log.push(...entries)
// Update commit index
if (leaderCommit > this.commitIndex) {
this.commitIndex = Math.min(leaderCommit, this.log.length)
}
// Reset election timer
this.lastHeartbeat = Date.now()
this.leaderId = message.senderId
// Send success response
this.sendMessage({
type: RaftMessageType.APPEND_ENTRIES_RESPONSE,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: message.senderId,
data: { success: true }
})
console.log(`[RAFT-${this.nodeId}] Appended ${entries.length} entries from leader ${message.senderId}`)
}
private handleHeartbeat(message: RaftMessage): void {
this.lastHeartbeat = Date.now()
this.leaderId = message.senderId
this.state = NodeState.FOLLOWER
}
private handleRequestVote(message: RaftMessage): void {
const { candidateId, lastLogIndex, lastLogTerm } = message.data || {}
let voteGranted = false
if (this.votedFor === null || this.votedFor === candidateId) {
// Check if candidate's log is at least as up-to-date as ours
const ourLastLog = this.log[this.log.length - 1]
const ourLastTerm = ourLastLog ? ourLastLog.term : 0
const ourLastIndex = this.log.length
if (message.term > ourLastTerm ||
(message.term === ourLastTerm && lastLogIndex >= ourLastIndex)) {
voteGranted = true
this.votedFor = candidateId
}
}
this.sendMessage({
type: RaftMessageType.REQUEST_VOTE_RESPONSE,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: message.senderId,
data: { voteGranted }
})
console.log(`[RAFT-${this.nodeId}] Voted for ${candidateId}: ${voteGranted}`)
}
private handleAppendEntriesResponse(message: RaftMessage): void {
if (this.state !== NodeState.LEADER) return
const { success } = message.data || {}
if (success) {
// Update follower's match index
const nextIdx = (this.nextIndex.get(message.senderId) || 1)
this.matchIndex.set(message.senderId, nextIdx - 1)
} else {
// Decrement next index and retry
this.nextIndex.set(message.senderId, Math.max(1, (this.nextIndex.get(message.senderId) || 1) - 1))
}
}
private handleRequestVoteResponse(message: RaftMessage): void {
if (this.state !== NodeState.CANDIDATE) return
const { voteGranted } = message.data || {}
if (voteGranted) {
// Count votes (simplified - in real implementation, track votes)
console.log(`[RAFT-${this.nodeId}] Received vote from ${message.senderId}`)
}
}
private updateCommitIndex(): void {
for (let n = this.log.length; n > this.commitIndex; n--) {
let count = 0
for (const nodeId of this.clusterNodes) {
if (nodeId === this.nodeId) {
count++
} else {
const matchIdx = this.matchIndex.get(nodeId) || 0
if (matchIdx >= n) count++
}
}
if (count >= Math.floor(this.clusterNodes.length / 2) + 1) {
this.commitIndex = n
console.log(`[RAFT-${this.nodeId}] Committed log entry ${n}`)
break
}
}
}
private startElectionTimer(): void {
if (this.electionTimer) {
clearTimeout(this.electionTimer)
}
this.electionTimer = setTimeout(() => {
this.startElection()
}, this.electionTimeout)
}
private startElection(): void {
console.log(`[RAFT-${this.nodeId}] Starting election for term ${this.currentTerm + 1}`)
this.currentTerm++
this.state = NodeState.CANDIDATE
this.votedFor = this.nodeId
// Request votes from all nodes
const lastLog = this.log[this.log.length - 1]
const lastLogIndex = this.log.length
const lastLogTerm = lastLog ? lastLog.term : 0
for (const nodeId of this.clusterNodes) {
if (nodeId === this.nodeId) continue
this.sendMessage({
type: RaftMessageType.REQUEST_VOTE,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: nodeId,
data: {
candidateId: this.nodeId,
lastLogIndex,
lastLogTerm
}
})
}
// Restart election timer
this.startElectionTimer()
}
private becomeLeader(): void {
console.log(`[RAFT-${this.nodeId}] Became leader for term ${this.currentTerm}`)
this.state = NodeState.LEADER
this.leaderId = this.nodeId
// Initialize leader state
for (const nodeId of this.clusterNodes) {
if (nodeId !== this.nodeId) {
this.nextIndex.set(nodeId, this.log.length + 1)
this.matchIndex.set(nodeId, 0)
}
}
// Start sending heartbeats
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
}
this.heartbeatTimer = setInterval(() => {
if (this.state === NodeState.LEADER) {
this.sendHeartbeats()
}
}, this.heartbeatInterval)
}
private sendHeartbeats(): void {
for (const nodeId of this.clusterNodes) {
if (nodeId !== this.nodeId) {
this.sendHeartbeat(nodeId)
}
}
}
private sendMessage(message: RaftMessage): void {
this.messageHandler(message)
}
getState(): {
state: NodeState
term: number
leaderId: string | null
logLength: number
commitIndex: number
} {
return {
state: this.state,
term: this.currentTerm,
leaderId: this.leaderId,
logLength: this.log.length,
commitIndex: this.commitIndex
}
}
}
// PBFT (Practical Byzantine Fault Tolerance) Implementation
class PBFTNode {
private view: number = 0
private sequenceNumber: number = 0
private primary: string
private replicas: string[]
private log: Map<string, PBFTMessage[]> = new Map()
private prepared: Map<string, Set<string>> = new Map()
private committed: Map<string, Set<string>> = new Map()
private pendingRequests: Map<string, any> = new Map()
constructor(
public nodeId: string,
public clusterNodes: string[],
private f: number = 1 // Number of Byzantine faults tolerated
) {
this.replicas = [...clusterNodes]
this.primary = this.replicas[0]
}
async clientRequest(operation: any): Promise<{
success: boolean
result?: any
message?: string
}> {
const digest = this.calculateDigest(operation)
const requestId = `req_${Date.now()}_${Math.random()}`
this.pendingRequests.set(requestId, operation)
if (this.nodeId === this.primary) {
// Primary starts the PBFT protocol
await this.startPrePrepare(digest, operation, requestId)
} else {
// Forward to primary
console.log(`[PBFT-${this.nodeId}] Forwarding request to primary ${this.primary}`)
}
// Wait for completion (simplified)
return new Promise((resolve) => {
setTimeout(() => {
const committed = this.committed.get(digest)
if (committed && committed.size >= 2 * this.f + 1) {
resolve({
success: true,
result: `Operation ${operation} executed successfully`,
message: 'PBFT consensus reached'
})
} else {
resolve({
success: false,
message: 'PBFT consensus not reached'
})
}
}, 2000)
})
}
private async startPrePrepare(digest: string, operation: any, requestId: string): Promise<void> {
const message: PBFTMessage = {
type: PBFTMessageType.PRE_PREPARE,
view: this.view,
sequenceNumber: ++this.sequenceNumber,
senderId: this.nodeId,
digest,
data: { operation, requestId }
}
console.log(`[PBFT-${this.nodeId}] Sending PRE-PREPARE for sequence ${this.sequenceNumber}`)
// Broadcast to all replicas
for (const replicaId of this.replicas) {
if (replicaId !== this.nodeId) {
await this.sendPBFTMessage(replicaId, message)
}
}
// Store in log
this.addToLog(digest, message)
}
async handlePBFTMessage(message: PBFTMessage): Promise<void> {
const logKey = `${message.view}_${message.sequenceNumber}_${message.digest}`
switch (message.type) {
case PBFTMessageType.PRE_PREPARE:
await this.handlePrePrepare(message, logKey)
break
case PBFTMessageType.PREPARE:
await this.handlePrepare(message, logKey)
break
case PBFTMessageType.COMMIT:
await this.handleCommit(message, logKey)
break
case PBFTMessageType.VIEW_CHANGE:
await this.handleViewChange(message)
break
}
}
private async handlePrePrepare(message: PBFTMessage, logKey: string): Promise<void> {
if (this.nodeId === this.primary) {
console.log(`[PBFT-${this.nodeId}] Ignoring own PRE-PREPARE`)
return
}
console.log(`[PBFT-${this.nodeId}] Received PRE-PREPARE from primary ${message.senderId}`)
// Validate pre-prepare message
if (!this.validatePrePrepare(message)) {
console.log(`[PBFT-${this.nodeId}] Invalid PRE-PREPARE message`)
return
}
// Store and broadcast prepare
this.addToLog(message.digest, message)
const prepareMessage: PBFTMessage = {
type: PBFTMessageType.PREPARE,
view: message.view,
sequenceNumber: message.sequenceNumber,
senderId: this.nodeId,
digest: message.digest
}
// Broadcast prepare to all replicas
for (const replicaId of this.replicas) {
if (replicaId !== this.nodeId) {
await this.sendPBFTMessage(replicaId, prepareMessage)
}
}
}
private async handlePrepare(message: PBFTMessage, logKey: string): Promise<void> {
console.log(`[PBFT-${this.nodeId}] Received PREPARE from ${message.senderId}`)
// Add to prepared set
if (!this.prepared.has(message.digest)) {
this.prepared.set(message.digest, new Set())
}
this.prepared.get(message.digest)!.add(message.senderId)
// Check if we have 2f prepares
const prepares = this.prepared.get(message.digest)!
if (prepares.size >= 2 * this.f) {
console.log(`[PBFT-${this.nodeId}] Quorum reached for PREPARE phase`)
// Move to commit phase
const commitMessage: PBFTMessage = {
type: PBFTMessageType.COMMIT,
view: message.view,
sequenceNumber: message.sequenceNumber,
senderId: this.nodeId,
digest: message.digest
}
// Broadcast commit
for (const replicaId of this.replicas) {
if (replicaId !== this.nodeId) {
await this.sendPBFTMessage(replicaId, commitMessage)
}
}
}
}
private async handleCommit(message: PBFTMessage, logKey: string): Promise<void> {
console.log(`[PBFT-${this.nodeId}] Received COMMIT from ${message.senderId}`)
// Add to committed set
if (!this.committed.has(message.digest)) {
this.committed.set(message.digest, new Set())
}
this.committed.get(message.digest)!.add(message.senderId)
// Check if we have 2f+1 commits
const commits = this.committed.get(message.digest)!
if (commits.size >= 2 * this.f + 1) {
console.log(`[PBFT-${this.nodeId}] Quorum reached for COMMIT phase - EXECUTING`)
// Execute the operation
const logMessages = this.log.get(message.digest) || []
const prePrepare = logMessages.find(m => m.type === PBFTMessageType.PRE_PREPARE)
if (prePrepare && prePrepare.data?.operation) {
this.executeOperation(prePrepare.data.operation)
}
}
}
private async handleViewChange(message: PBFTMessage): Promise<void> {
console.log(`[PBFT-${this.nodeId}] Received VIEW_CHANGE from ${message.senderId}`)
// Simplified view change handling
// In real implementation, would collect view-change messages and select new primary
if (this.view < message.view) {
this.view = message.view
this.primary = this.replicas[this.view % this.replicas.length]
console.log(`[PBFT-${this.nodeId}] View changed to ${this.view}, new primary: ${this.primary}`)
}
}
private validatePrePrepare(message: PBFTMessage): boolean {
// Simplified validation
return message.senderId === this.primary && message.view === this.view
}
private addToLog(digest: string, message: PBFTMessage): void {
if (!this.log.has(digest)) {
this.log.set(digest, [])
}
this.log.get(digest)!.push(message)
}
private executeOperation(operation: any): void {
console.log(`[PBFT-${this.nodeId}] Executing operation: ${operation}`)
// In real implementation, would execute the actual operation
}
private calculateDigest(data: any): string {
// Simple hash function for demo
return btoa(JSON.stringify(data)).slice(0, 16)
}
private async sendPBFTMessage(targetId: string, message: PBFTMessage): Promise<void> {
// Simulate network delay
await new Promise(resolve => setTimeout(resolve, Math.random() * 100 + 50))
console.log(`[PBFT-${this.nodeId}] Sent ${message.type} to ${targetId}`)
// In real implementation, would send over network
}
getViewChange(): void {
console.log(`[PBFT-${this.nodeId}] Initiating view change`)
this.view++
const viewChangeMessage: PBFTMessage = {
type: PBFTMessageType.VIEW_CHANGE,
view: this.view,
sequenceNumber: 0,
senderId: this.nodeId,
digest: ''
}
// Broadcast view change
for (const replicaId of this.replicas) {
if (replicaId !== this.nodeId) {
this.sendPBFTMessage(replicaId, viewChangeMessage)
}
}
}
getConsensusStats(): {
view: number
sequenceNumber: number
primary: string
preparedCount: number
committedCount: number
} {
return {
view: this.view,
sequenceNumber: this.sequenceNumber,
primary: this.primary,
preparedCount: this.prepared.size,
committedCount: this.committed.size
}
}
}
// Consensus Performance Analyzer
class ConsensusAnalyzer {
private metrics: Map<string, Array<{
algorithm: string
operation: string
latency: number
success: boolean
timestamp: Date
nodes: number
}>> = new Map()
recordMetric(
algorithm: string,
operation: string,
latency: number,
success: boolean,
nodeCount: number
): void {
const key = `${algorithm}_${operation}`
if (!this.metrics.has(key)) {
this.metrics.set(key, [])
}
this.metrics.get(key)!.push({
algorithm,
operation,
latency,
success,
timestamp: new Date(),
nodes: nodeCount
})
}
compareAlgorithms(nodeCounts: number[]): {
raft: { avgLatency: number; throughput: number; successRate: number }
pbft: { avgLatency: number; throughput: number; successRate: number }
recommendation: string
} {
const raftMetrics = this.getAlgorithmMetrics('RAFT', nodeCounts)
const pbftMetrics = this.getAlgorithmMetrics('PBFT', nodeCounts)
let recommendation = ''
if (raftMetrics.avgLatency < pbftMetrics.avgLatency) {
recommendation = 'Raft shows better performance for this configuration'
} else {
recommendation = 'PBFT provides better fault tolerance despite higher latency'
}
if (nodeCounts.some(n => n > 10)) {
recommendation += '. Consider Raft for larger clusters due to lower overhead'
} else {
recommendation += '. PBFT is suitable for smaller clusters requiring Byzantine fault tolerance'
}
return {
raft: raftMetrics,
pbft: pbftMetrics,
recommendation
}
}
private getAlgorithmMetrics(algorithm: string, nodeCounts: number[]): {
avgLatency: number
throughput: number
successRate: number
} {
let totalLatency = 0
let totalOperations = 0
let successfulOperations = 0
for (const nodeCount of nodeCounts) {
const key = `${algorithm}_WRITE`
const metrics = this.metrics.get(key) || []
const relevantMetrics = metrics.filter(m => m.nodes === nodeCount)
for (const metric of relevantMetrics) {
totalLatency += metric.latency
totalOperations++
if (metric.success) successfulOperations++
}
}
return {
avgLatency: totalOperations > 0 ? totalLatency / totalOperations : 0,
throughput: totalOperations > 0 ? (successfulOperations / totalOperations) * 100 : 0,
successRate: totalOperations > 0 ? (successfulOperations / totalOperations) * 100 : 0
}
}
}
// Usage Example
async function consensusAlgorithmsExample() {
console.log('=== Consensus Algorithms (Raft & PBFT) ===\n')
// Setup Raft cluster
console.log('1. Setting up Raft Cluster')
const raftNodes: RaftNode[] = []
const raftMessageHandler = (message: RaftMessage) => {
const targetNode = raftNodes.find(n => n.nodeId === message.receiverId)
if (targetNode) {
targetNode.handleMessage(message)
}
}
for (let i = 1; i <= 5; i++) {
const node = new RaftNode(`raft-node-${i}`,
['raft-node-1', 'raft-node-2', 'raft-node-3', 'raft-node-4', 'raft-node-5'],
raftMessageHandler
)
raftNodes.push(node)
}
// Wait for election to complete
await new Promise(resolve => setTimeout(resolve, 10000))
console.log('Raft cluster states:')
raftNodes.forEach(node => {
const state = node.getState()
console.log(` ${node.nodeId}: ${state.state} (term: ${state.term}, leader: ${state.leaderId})`)
})
// Test Raft operations
console.log('\n2. Testing Raft Operations')
const leaderNode = raftNodes.find(n => n.getState().state === NodeState.LEADER)
if (leaderNode) {
const raftStartTime = Date.now()
const raftResult = await leaderNode.clientRequest({ action: 'set', key: 'test', value: 'raft_value' })
const raftLatency = Date.now() - raftStartTime
console.log(`Raft result: ${JSON.stringify(raftResult)}, Latency: ${raftLatency}ms`)
} else {
console.log('No Raft leader elected yet')
}
// Setup PBFT cluster
console.log('\n3. Setting up PBFT Cluster')
const pbftNodes: PBFTNode[] = []
const pbftNodesList = ['pbft-node-1', 'pbft-node-2', 'pbft-node-3', 'pbft-node-4']
for (const nodeId of pbftNodesList) {
const node = new PBFTNode(nodeId, pbftNodesList, 1) // f=1, can tolerate 1 Byzantine fault
pbftNodes.push(node)
}
console.log('PBFT cluster stats:')
pbftNodes.forEach(node => {
const stats = node.getConsensusStats()
console.log(` ${node.nodeId}: view=${stats.view}, primary=${stats.primary}`)
})
// Test PBFT operations
console.log('\n4. Testing PBFT Operations')
const primaryNode = pbftNodes[0] // First node is primary in view 0
const pbftStartTime = Date.now()
const pbftResult = await primaryNode.clientRequest('transfer_amount_100')
const pbftLatency = Date.now() - pbftStartTime
console.log(`PBFT result: ${JSON.stringify(pbftResult)}, Latency: ${pbftLatency}ms`)
// Simulate message passing between PBFT nodes
for (const sender of pbftNodes) {
for (const receiver of pbftNodes) {
if (sender.nodeId !== receiver.nodeId) {
// Simulate some messages
if (Math.random() > 0.7) {
const message: PBFTMessage = {
type: PBFTMessageType.PREPARE,
view: 0,
sequenceNumber: 1,
senderId: sender.nodeId,
digest: 'test_digest'
}
await receiver.handlePBFTMessage(message)
}
}
}
}
// Performance comparison
console.log('\n5. Performance Analysis')
const analyzer = new ConsensusAnalyzer()
// Add sample metrics
for (let i = 0; i < 20; i++) {
analyzer.recordMetric('RAFT', 'WRITE', Math.random() * 200 + 100, Math.random() > 0.1, 5)
analyzer.recordMetric('PBFT', 'WRITE', Math.random() * 400 + 200, Math.random() > 0.05, 4)
}
const comparison = analyzer.compareAlgorithms([5])
console.log('\nPerformance Comparison:')
console.log(`Raft - Latency: ${comparison.raft.avgLatency.toFixed(2)}ms, Success Rate: ${comparison.raft.successRate.toFixed(1)}%`)
console.log(`PBFT - Latency: ${comparison.pbft.avgLatency.toFixed(2)}ms, Success Rate: ${comparison.pbft.successRate.toFixed(1)}%`)
console.log(`\nRecommendation: ${comparison.recommendation}`)
// Final cluster states
console.log('\n6. Final Cluster States')
console.log('\nRaft cluster final states:')
raftNodes.forEach(node => {
const state = node.getState()
console.log(` ${node.nodeId}: ${state.state} (log: ${state.logLength}, committed: ${state.commitIndex})`)
})
console.log('\nPBFT cluster final stats:')
pbftNodes.forEach(node => {
const stats = node.getConsensusStats()
console.log(` ${node.nodeId}: prepared=${stats.preparedCount}, committed=${stats.committedCount}`)
})
}
export {
NodeState,
RaftMessageType,
PBFTMessageType,
RaftNode,
PBFTNode,
ConsensusAnalyzer,
consensusAlgorithmsExample
}
💻 Sharding et Partitionnement
🔴 complex
⭐⭐⭐⭐⭐
Implémentation de stratégies de sharding et partitionnement de données pour systèmes distribués scalables
⏱️ 85 min
🏷️ sharding, partitioning, consistent-hashing, load-balancing
Prerequisites:
Distributed databases knowledge, Load balancing concepts
// Sharding and Partitioning Implementation
// Partitioning Strategies
enum PartitioningStrategy {
HASH = 'HASH',
RANGE = 'RANGE',
CONSISTENT_HASH = 'CONSISTENT_HASH',
DIRECTORY = 'DIRECTORY',
GEOGRAPHIC = 'GEOGRAPHIC'
}
interface Shard {
id: string
nodeId: string
range?: {
start: any
end: any
}
hashRange?: {
start: number
end: number
}
keyRange?: string[]
地理位置?: {
region: string
country?: string
city?: string
}
status: 'ACTIVE' | 'MIGRATING' | 'OFFLINE'
weight: number
lastHealthCheck: Date
}
interface PartitionMetadata {
shardId: string
key: string
partitionKey: any
nodeId: string
createdAt: Date
lastAccessed: Date
accessCount: number
}
// Hash-based Sharding
class HashShardingManager {
private shards: Map<string, Shard> = new Map()
private shardRing: number[] = []
private virtualNodes: number = 150 // Virtual nodes per physical node
constructor(private nodes: string[]) {
this.initializeShards()
}
private initializeShards(): void {
for (const nodeId of this.nodes) {
this.addNode(nodeId)
}
}
addNode(nodeId: string): void {
console.log(`[HASH] Adding node ${nodeId} to shard ring`)
// Create virtual nodes for better distribution
for (let i = 0; i < this.virtualNodes; i++) {
const virtualNodeId = `${nodeId}_${i}`
const hash = this.hash(`${virtualNodeId}`)
this.shards.set(virtualNodeId, {
id: virtualNodeId,
nodeId,
hashRange: { start: hash, end: hash },
status: 'ACTIVE',
weight: 1,
lastHealthCheck: new Date()
})
this.shardRing.push(hash)
}
this.shardRing.sort((a, b) => a - b)
}
removeNode(nodeId: string): void {
console.log(`[HASH] Removing node ${nodeId} from shard ring`)
// Remove all virtual nodes for this physical node
const toRemove: string[] = []
for (const [virtualNodeId, shard] of this.shards.entries()) {
if (shard.nodeId === nodeId) {
toRemove.push(virtualNodeId)
}
}
for (const virtualNodeId of toRemove) {
const shard = this.shards.get(virtualNodeId)!
this.shardRing = this.shardRing.filter(hash => hash !== shard.hashRange!.start)
this.shards.delete(virtualNodeId)
}
console.log(`[HASH] Removed ${toRemove.length} virtual nodes for ${nodeId}`)
}
getShard(key: string): Shard | null {
const keyHash = this.hash(key)
// Find the first shard with hash >= keyHash
for (const hash of this.shardRing) {
if (hash >= keyHash) {
const shard = this.findShardByHash(hash)
if (shard) return shard
}
}
// Wrap around to the first shard
if (this.shardRing.length > 0) {
const shard = this.findShardByHash(this.shardRing[0])
if (shard) return shard
}
return null
}
private findShardByHash(hash: number): Shard | null {
for (const shard of this.shards.values()) {
if (shard.hashRange && shard.hashRange.start === hash) {
return shard
}
}
return null
}
private hash(input: string): number {
// Simple hash function for demo
let hash = 0
for (let i = 0; i < input.length; i++) {
const char = input.charCodeAt(i)
hash = ((hash << 5) - hash) + char
hash = hash & hash // Convert to 32-bit integer
}
return Math.abs(hash)
}
getDistribution(): Record<string, number> {
const distribution: Record<string, number> = {}
// Test with sample keys to see distribution
const testKeys = Array.from({ length: 10000 }, (_, i) => `test_key_${i}`)
for (const key of testKeys) {
const shard = this.getShard(key)
if (shard) {
distribution[shard.nodeId] = (distribution[shard.nodeId] || 0) + 1
}
}
return distribution
}
}
// Range-based Sharding
class RangeShardingManager {
private shards: Map<string, Shard> = new Map()
private ranges: Array<{ start: any; end: any; shardId: string }> = []
constructor() {
this.initializeRanges()
}
private initializeRanges(): void {
// Example: User ID ranges
const ranges = [
{ start: 0, end: 1000000, nodeId: 'node-1', shardId: 'shard-1' },
{ start: 1000001, end: 2000000, nodeId: 'node-2', shardId: 'shard-2' },
{ start: 2000001, end: 3000000, nodeId: 'node-3', shardId: 'shard-3' },
{ start: 3000001, end: 4000000, nodeId: 'node-4', shardId: 'shard-4' },
{ start: 4000001, end: Number.MAX_SAFE_INTEGER, nodeId: 'node-5', shardId: 'shard-5' }
]
for (const range of ranges) {
this.shards.set(range.shardId, {
id: range.shardId,
nodeId: range.nodeId,
range: { start: range.start, end: range.end },
status: 'ACTIVE',
weight: 1,
lastHealthCheck: new Date()
})
this.ranges.push({
start: range.start,
end: range.end,
shardId: range.shardId
})
}
this.ranges.sort((a, b) => a.start - b.start)
}
getShard(key: any): Shard | null {
const keyValue = typeof key === 'number' ? key : parseInt(key)
for (const range of this.ranges) {
if (keyValue >= range.start && keyValue <= range.end) {
return this.shards.get(range.shardId) || null
}
}
return null
}
addRange(start: any, end: any, nodeId: string): void {
const shardId = `shard_${Date.now()}`
this.shards.set(shardId, {
id: shardId,
nodeId,
range: { start, end },
status: 'ACTIVE',
weight: 1,
lastHealthCheck: new Date()
})
this.ranges.push({ start, end, shardId })
this.ranges.sort((a, b) => a.start - b.start)
console.log(`[RANGE] Added range [${start}, ${end}] to node ${nodeId}`)
}
splitRange(shardId: string, splitPoint: any, newNodeId: string): void {
const shard = this.shards.get(shardId)
if (!shard || !shard.range) {
throw new Error(`Shard ${shardId} not found or has no range`)
}
const { start, end } = shard.range
// Remove old range
this.ranges = this.ranges.filter(r => r.shardId !== shardId)
this.shards.delete(shardId)
// Create two new ranges
this.addRange(start, splitPoint, shard.nodeId)
this.addRange(splitPoint + 1, end, newNodeId)
console.log(`[RANGE] Split shard ${shardId} at ${splitPoint}`)
}
mergeRanges(shardId1: string, shardId2: string): void {
const shard1 = this.shards.get(shardId1)
const shard2 = this.shards.get(shardId2)
if (!shard1 || !shard2 || !shard1.range || !shard2.range) {
throw new Error('Both shards must exist and have ranges')
}
const newStart = Math.min(shard1.range.start, shard2.range.start)
const newEnd = Math.max(shard1.range.end, shard2.range.end)
// Remove old ranges
this.ranges = this.ranges.filter(r => r.shardId !== shardId1 && r.shardId !== shardId2)
this.shards.delete(shardId1)
this.shards.delete(shardId2)
// Create merged range
this.addRange(newStart, newEnd, shard1.nodeId)
console.log(`[RANGE] Merged shards ${shardId1} and ${shardId2}`)
}
getRanges(): Array<{ shardId: string; nodeId: string; start: any; end: any }> {
return this.ranges.map(range => {
const shard = this.shards.get(range.shardId)!
return {
shardId: range.shardId,
nodeId: shard.nodeId,
start: range.start,
end: range.end
}
})
}
}
// Consistent Hash Ring Implementation
class ConsistentHashRing {
private ring: number[] = []
private nodes: Map<number, string> = new Map()
private virtualNodes: number = 100
private replicas: number = 3
constructor(nodes: string[], virtualNodes: number = 100) {
this.virtualNodes = virtualNodes
this.initialize(nodes)
}
private initialize(nodes: string[]): void {
for (const node of nodes) {
this.addNode(node)
}
}
addNode(node: string): void {
for (let i = 0; i < this.virtualNodes; i++) {
const virtualNodeKey = `${node}#${i}`
const hash = this.md5(virtualNodeKey)
this.ring.push(hash)
this.nodes.set(hash, node)
}
this.ring.sort((a, b) => a - b)
console.log(`[CONSISTENT_HASH] Added node ${node} with ${this.virtualNodes} virtual nodes`)
}
removeNode(node: string): void {
const toRemove: number[] = []
for (const [hash, nodeId] of this.nodes.entries()) {
if (nodeId === node) {
toRemove.push(hash)
}
}
for (const hash of toRemove) {
this.ring = this.ring.filter(h => h !== hash)
this.nodes.delete(hash)
}
console.log(`[CONSISTENT_HASH] Removed node ${node} and ${toRemove.length} virtual nodes`)
}
getNode(key: string): string | null {
if (this.ring.length === 0) return null
const hash = this.md5(key)
// Find first node with hash >= key hash
for (let i = 0; i < this.ring.length; i++) {
if (this.ring[i] >= hash) {
return this.nodes.get(this.ring[i]) || null
}
}
// Wrap around to first node
return this.nodes.get(this.ring[0]) || null
}
getNodesForKey(key: string, count: number = this.replicas): string[] {
const nodes: string[] = []
const hash = this.md5(key)
let startIndex = 0
for (let i = 0; i < this.ring.length; i++) {
if (this.ring[i] >= hash) {
startIndex = i
break
}
}
// Get next N unique nodes
const visitedNodes = new Set<string>()
let index = startIndex
while (nodes.length < count && visitedNodes.size < this.nodes.size) {
const nodeHash = this.ring[index % this.ring.length]
const node = this.nodes.get(nodeHash)
if (node && !visitedNodes.has(node)) {
nodes.push(node)
visitedNodes.add(node)
}
index++
}
return nodes
}
private md5(input: string): number {
// Simplified hash function - in real implementation use crypto
let hash = 0
for (let i = 0; i < input.length; i++) {
const char = input.charCodeAt(i)
hash = ((hash << 5) - hash) + char
hash = hash & hash
}
return Math.abs(hash) % 1000000 // Modulo to keep numbers manageable
}
getDistribution(): Record<string, number> {
const distribution: Record<string, number> = {}
const testKeys = Array.from({ length: 10000 }, (_, i) => `test_key_${i}`)
for (const key of testKeys) {
const node = this.getNode(key)
if (node) {
distribution[node] = (distribution[node] || 0) + 1
}
}
return distribution
}
getRingInfo(): { totalNodes: number; totalVirtualNodes: number; balanceScore: number } {
const distribution = this.getDistribution()
const values = Object.values(distribution)
const avg = values.reduce((sum, val) => sum + val, 0) / values.length
const variance = values.reduce((sum, val) => sum + Math.pow(val - avg, 2), 0) / values.length
const balanceScore = 1 - (Math.sqrt(variance) / avg) // Lower variance = higher score
return {
totalNodes: new Set(this.nodes.values()).size,
totalVirtualNodes: this.ring.length,
balanceScore: Math.max(0, balanceScore)
}
}
}
// Directory-based Sharding
class DirectoryBasedSharding {
private directory: Map<string, PartitionMetadata> = new Map()
private shards: Map<string, Shard> = new Map()
private loadBalancer: ShardingLoadBalancer
constructor(nodes: string[]) {
this.loadBalancer = new ShardingLoadBalancer(nodes)
this.initializeShards(nodes)
}
private initializeShards(nodes: string[]): void {
for (const nodeId of nodes) {
this.shards.set(`shard_${nodeId}`, {
id: `shard_${nodeId}`,
nodeId,
status: 'ACTIVE',
weight: 1,
lastHealthCheck: new Date()
})
}
}
async assignPartition(key: string, partitionKey: any): Promise<string> {
// Check if key already assigned
if (this.directory.has(key)) {
return this.directory.get(key)!.shardId
}
// Select shard based on load balancing
const shardId = await this.loadBalancer.selectShard(partitionKey)
const shard = this.shards.get(shardId)
if (!shard || shard.status !== 'ACTIVE') {
throw new Error('No active shard available')
}
const metadata: PartitionMetadata = {
shardId,
key,
partitionKey,
nodeId: shard.nodeId,
createdAt: new Date(),
lastAccessed: new Date(),
accessCount: 0
}
this.directory.set(key, metadata)
console.log(`[DIRECTORY] Assigned key ${key} to shard ${shardId} (node: ${shard.nodeId})`)
return shardId
}
getShard(key: string): Shard | null {
const metadata = this.directory.get(key)
if (!metadata) return null
return this.shards.get(metadata.shardId) || null
}
async rebalance(): Promise<{
migratedPartitions: number
newDistribution: Record<string, number>
}> {
console.log('[DIRECTORY] Starting rebalancing...')
const currentLoad = this.calculateLoad()
const targetLoad = Object.keys(currentLoad).length * 0.2 // Target 20% per node
const toMigrate: Array<{ key: string; fromShard: string; toShard: string }> = []
let migratedCount = 0
// Find overloaded shards
for (const [shardId, load] of Object.entries(currentLoad)) {
if (load > targetLoad * 1.2) { // 20% threshold
const underloadedShard = this.findUnderloadedShard(currentLoad, targetLoad)
if (underloadedShard) {
// Find keys to migrate (least recently accessed)
const shardKeys = Array.from(this.directory.entries())
.filter(([_, metadata]) => metadata.shardId === shardId)
.sort((a, b) => a[1].lastAccessed.getTime() - b[1].lastAccessed.getTime())
.slice(0, Math.floor((load - targetLoad) / 2))
for (const [key, metadata] of shardKeys) {
toMigrate.push({
key,
fromShard: shardId,
toShard: underloadedShard
})
if (toMigrate.length >= 10) break // Limit migrations per rebalance
}
}
}
}
// Execute migrations
for (const migration of toMigrate) {
const metadata = this.directory.get(migration.key)!
metadata.shardId = migration.toShard
metadata.nodeId = this.shards.get(migration.toShard)!.nodeId
migratedCount++
console.log(`[DIRECTORY] Migrated key ${migration.key} from ${migration.fromShard} to ${migration.toShard}`)
}
const newDistribution = this.calculateLoad()
return {
migratedPartitions: migratedCount,
newDistribution
}
}
private calculateLoad(): Record<string, number> {
const load: Record<string, number> = {}
for (const metadata of this.directory.values()) {
load[metadata.shardId] = (load[metadata.shardId] || 0) + 1
}
return load
}
private findUnderloadedShard(currentLoad: Record<string, number>, targetLoad: number): string | null {
for (const [shardId, load] of Object.entries(currentLoad)) {
if (load < targetLoad * 0.8) { // 20% threshold
return shardId
}
}
return null
}
getDirectoryStats(): {
totalPartitions: number
distribution: Record<string, number>
hotKeys: Array<{ key: string; accessCount: number; shardId: string }>
} {
const distribution = this.calculateLoad()
const totalPartitions = this.directory.size
// Find hot keys (high access count)
const hotKeys = Array.from(this.directory.entries())
.sort((a, b) => b[1].accessCount - a[1].accessCount)
.slice(0, 10)
.map(([key, metadata]) => ({
key,
accessCount: metadata.accessCount,
shardId: metadata.shardId
}))
return {
totalPartitions,
distribution,
hotKeys
}
}
recordAccess(key: string): void {
const metadata = this.directory.get(key)
if (metadata) {
metadata.lastAccessed = new Date()
metadata.accessCount++
}
}
}
// Load Balancer for Sharding
class ShardingLoadBalancer {
private nodeLoads: Map<string, { connections: number; cpu: number; memory: number }> = new Map()
constructor(nodes: string[]) {
for (const node of nodes) {
this.nodeLoads.set(node, { connections: 0, cpu: 0, memory: 0 })
}
}
async selectShard(partitionKey: any): Promise<string> {
// Find least loaded node
let bestNode = ''
let minScore = Infinity
for (const [nodeId, load] of this.nodeLoads.entries()) {
const score = load.connections * 0.4 + load.cpu * 0.3 + load.memory * 0.3
if (score < minScore) {
minScore = score
bestNode = nodeId
}
}
return `shard_${bestNode}`
}
updateNodeLoad(nodeId: string, load: { connections: number; cpu: number; memory: number }): void {
this.nodeLoads.set(nodeId, load)
}
getLoadDistribution(): Record<string, { connections: number; cpu: number; memory: number }> {
const distribution: Record<string, { connections: number; cpu: number; memory: number }> = {}
for (const [nodeId, load] of this.nodeLoads.entries()) {
distribution[nodeId] = { ...load }
}
return distribution
}
}
// Cross-Shard Query Coordinator
class CrossShardQueryCoordinator {
constructor(
private hashSharding: HashShardingManager,
private rangeSharding: RangeShardingManager,
private directorySharding: DirectoryBasedSharding
) {}
async executeCrossShardQuery(
query: {
type: 'SELECT' | 'JOIN' | 'AGGREGATE'
tables: string[]
conditions?: Record<string, any>
joinKeys?: string[]
}
): Promise<{
results: any[]
shardsQueried: string[]
executionTime: number
}> {
const startTime = Date.now()
const shardsQueried: string[] = []
const results: any[] = []
console.log(`[QUERY] Executing cross-shard query: ${query.type} on ${query.tables.join(', ')}`)
// Determine which shards to query based on query type
switch (query.type) {
case 'SELECT':
await this.executeSelectQuery(query, shardsQueried, results)
break
case 'JOIN':
await this.executeJoinQuery(query, shardsQueried, results)
break
case 'AGGREGATE':
await this.executeAggregateQuery(query, shardsQueried, results)
break
}
const executionTime = Date.now() - startTime
return {
results,
shardsQueried: [...new Set(shardsQueried)],
executionTime
}
}
private async executeSelectQuery(
query: any,
shardsQueried: string[],
results: any[]
): Promise<void> {
// For demo, simulate querying all sharding strategies
const strategies = [
{ name: 'hash', manager: this.hashSharding },
{ name: 'range', manager: this.rangeSharding },
{ name: 'directory', manager: this.directorySharding }
]
for (const strategy of strategies) {
// Simulate finding relevant shards
const relevantShards = this.findRelevantShards(strategy, query.conditions)
for (const shardId of relevantShards) {
shardsQueried.push(`${strategy.name}_${shardId}`)
// Simulate querying the shard
const shardResults = await this.queryShard(shardId, query)
results.push(...shardResults)
}
}
}
private async executeJoinQuery(
query: any,
shardsQueried: string[],
results: any[]
): Promise<void> {
// Simplified join execution - in real implementation would be more complex
if (!query.joinKeys || query.joinKeys.length === 0) {
throw new Error('Join query requires join keys')
}
// For each join key, find which shards contain the data
for (const joinKey of query.joinKeys) {
const shard = this.hashSharding.getShard(joinKey)
if (shard) {
shardsQueried.push(`hash_${shard.id}`)
const shardResults = await this.queryShard(joinKey, query)
results.push(...shardResults)
}
}
}
private async executeAggregateQuery(
query: any,
shardsQueried: string[],
results: any[]
): Promise<void> {
// For aggregate queries, need to query all shards and combine results
const allShards = [
...Array.from(this.hashSharding.getDistribution().keys()),
...this.rangeSharding.getRanges().map(r => r.shardId)
]
for (const shardId of allShards) {
shardsQueried.push(shardId)
const shardResults = await this.queryShard(shardId, query)
results.push(...shardResults)
}
// Combine aggregate results
if (query.conditions?.aggregate === 'COUNT') {
const totalCount = results.reduce((sum, result) => sum + (result.count || 0), 0)
results.length = 0
results.push({ aggregate: 'COUNT', value: totalCount })
}
}
private findRelevantShards(strategy: any, conditions?: Record<string, any>): string[] {
// Simplified shard selection based on conditions
if (conditions?.id) {
const shard = strategy.manager.getShard?.(conditions.id)
return shard ? [shard.id] : []
}
// If no specific conditions, return all shards for the strategy
if (strategy.name === 'hash') {
return Object.keys(strategy.manager.getDistribution())
} else if (strategy.name === 'range') {
return strategy.manager.getRanges().map(r => r.shardId)
}
return []
}
private async queryShard(shardId: string, query: any): Promise<any[]> {
// Simulate network delay and query execution
await new Promise(resolve => setTimeout(resolve, Math.random() * 50 + 10))
// Generate mock results
const resultCount = Math.floor(Math.random() * 10) + 1
const results: any[] = []
for (let i = 0; i < resultCount; i++) {
results.push({
id: `${shardId}_${i}`,
shardId,
data: `Mock data from ${shardId}`,
timestamp: new Date()
})
}
return results
}
}
// Usage Example
async function shardingExample() {
console.log('=== Sharding and Partitioning Strategies ===\n')
// Test Hash Sharding
console.log('1. Hash-based Sharding')
const hashSharding = new HashShardingManager(['node-1', 'node-2', 'node-3', 'node-4'])
// Test key distribution
const testKeys = ['user_123', 'user_456', 'user_789', 'user_999', 'user_111']
console.log('\nKey distribution:')
testKeys.forEach(key => {
const shard = hashSharding.getShard(key)
console.log(` ${key} -> ${shard?.nodeId}`)
})
// Add and remove nodes
console.log('\nAdding node-5 to cluster...')
hashSharding.addNode('node-5')
console.log('\nRemoving node-2 from cluster...')
hashSharding.removeNode('node-2')
// Show distribution
const hashDistribution = hashSharding.getDistribution()
console.log('\nHash sharding distribution:')
Object.entries(hashDistribution).forEach(([node, count]) => {
console.log(` ${node}: ${count} keys`)
})
// Test Range Sharding
console.log('\n2. Range-based Sharding')
const rangeSharding = new RangeShardingManager()
const userIds = [500, 1500, 2500, 3500, 4500]
console.log('\nUser ID distribution:')
userIds.forEach(userId => {
const shard = rangeSharding.getShard(userId)
console.log(` User ${userId} -> ${shard?.nodeId}`)
})
// Split and merge ranges
console.log('\nSplitting shard-1 at 500000...')
rangeSharding.splitRange('shard-1', 500000, 'node-6')
console.log('\nCurrent ranges:')
const ranges = rangeSharding.getRanges()
ranges.forEach(range => {
console.log(` ${range.shardId} (${range.nodeId}): [${range.start}, ${range.end}]`)
})
// Test Consistent Hashing
console.log('\n3. Consistent Hash Ring')
const consistentHash = new ConsistentHashRing(['node-1', 'node-2', 'node-3'], 50)
const testKeys2 = ['key1', 'key2', 'key3', 'key4', 'key5']
console.log('\nKey assignment with replicas:')
testKeys2.forEach(key => {
const nodes = consistentHash.getNodesForKey(key, 2)
console.log(` ${key} -> [${nodes.join(', ')}]`)
})
// Add node and show minimal key movement
console.log('\nAdding node-4 and checking key movement...')
const beforeDistribution = consistentHash.getDistribution()
consistentHash.addNode('node-4')
const afterDistribution = consistentHash.getDistribution()
console.log('Distribution before adding node-4:', beforeDistribution)
console.log('Distribution after adding node-4:', afterDistribution)
const ringInfo = consistentHash.getRingInfo()
console.log(`\nRing info: ${ringInfo.totalNodes} nodes, ${ringInfo.totalVirtualNodes} virtual nodes, balance score: ${ringInfo.balanceScore.toFixed(3)}`)
// Test Directory-based Sharding
console.log('\n4. Directory-based Sharding')
const directorySharding = new DirectoryBasedSharding(['node-1', 'node-2', 'node-3'])
// Assign some partitions
const partitions = [
{ key: 'product_a', partitionKey: 'electronics' },
{ key: 'product_b', partitionKey: 'books' },
{ key: 'user_x', partitionKey: 'premium' },
{ key: 'user_y', partitionKey: 'basic' },
{ key: 'order_1', partitionKey: 'electronics' },
{ key: 'order_2', partitionKey: 'books' }
]
console.log('\nAssigning partitions:')
for (const partition of partitions) {
const shardId = await directorySharding.assignPartition(partition.key, partition.partitionKey)
const shard = directorySharding.getShard(partition.key)
console.log(` ${partition.key} -> ${shard?.nodeId}`)
}
// Simulate some access patterns
for (let i = 0; i < 10; i++) {
directorySharding.recordAccess('product_a') // Make this a hot key
directorySharding.recordAccess('order_1')
}
// Get directory stats
const stats = directorySharding.getDirectoryStats()
console.log('\nDirectory stats:')
console.log(` Total partitions: ${stats.totalPartitions}`)
console.log(' Distribution:', stats.distribution)
console.log(' Hot keys:', stats.hotKeys)
// Rebalancing
console.log('\n5. Rebalancing')
const rebalanceResult = await directorySharding.rebalance()
console.log(`Migrated ${rebalanceResult.migratedPartitions} partitions`)
console.log('New distribution:', rebalanceResult.newDistribution)
// Cross-Shard Queries
console.log('\n6. Cross-Shard Query Coordinator')
const queryCoordinator = new CrossShardQueryCoordinator(
hashSharding,
rangeSharding,
directorySharding
)
// Execute different query types
const queries = [
{
type: 'SELECT' as const,
tables: ['users'],
conditions: { id: 'user_123' }
},
{
type: 'JOIN' as const,
tables: ['users', 'orders'],
joinKeys: ['user_123']
},
{
type: 'AGGREGATE' as const,
tables: ['orders'],
conditions: { aggregate: 'COUNT' }
}
]
for (const query of queries) {
const result = await queryCoordinator.executeCrossShardQuery(query)
console.log(`\n${query.type} query:`)
console.log(` Shards queried: ${result.shardsQueried.length}`)
console.log(` Results returned: ${result.results.length}`)
console.log(` Execution time: ${result.executionTime}ms`)
}
console.log('\n=== Sharding Analysis Complete ===')
}
export {
PartitioningStrategy,
Shard,
PartitionMetadata,
HashShardingManager,
RangeShardingManager,
ConsistentHashRing,
DirectoryBasedSharding,
ShardingLoadBalancer,
CrossShardQueryCoordinator,
shardingExample
}
💻 Mécanismes de Tolérance aux Pannes
🔴 complex
⭐⭐⭐⭐
Implémentation de tolérance aux pannes complète avec circuit breakers, retentatives et stratégies de failover
⏱️ 75 min
🏷️ fault-tolerance, circuit-breaker, retry, failover
Prerequisites:
Microservices architecture knowledge, Resilience patterns
// Fault Tolerance Mechanisms Implementation
// Fault Tolerance Patterns
enum CircuitState {
CLOSED = 'CLOSED',
OPEN = 'OPEN',
HALF_OPEN = 'HALF_OPEN'
}
enum RetryStrategy {
FIXED_DELAY = 'FIXED_DELAY',
EXPONENTIAL_BACKOFF = 'EXPONENTIAL_BACKOFF',
LINEAR_BACKOFF = 'LINEAR_BACKOFF'
}
enum FailoverStrategy {
ACTIVE_PASSIVE = 'ACTIVE_PASSIVE',
ACTIVE_ACTIVE = 'ACTIVE_ACTIVE',
GEOGRAPHIC = 'GEOGRAPHIC'
}
interface ServiceHealth {
serviceId: string
endpoint: string
status: 'HEALTHY' | 'DEGRADED' | 'UNHEALTHY'
lastCheck: Date
responseTime: number
errorRate: number
consecutiveFailures: number
}
interface CircuitBreakerConfig {
failureThreshold: number
timeoutMs: number
halfOpenMaxCalls: number
resetTimeoutMs: number
}
interface RetryConfig {
maxAttempts: number
strategy: RetryStrategy
baseDelayMs: number
maxDelayMs: number
multiplier?: number
}
// Circuit Breaker Implementation
class CircuitBreaker {
private state: CircuitState = CircuitState.CLOSED
private failureCount: number = 0
private successCount: number = 0
private lastFailureTime: number = 0
private nextAttempt: number = 0
private halfOpenCalls: number = 0
constructor(
private serviceId: string,
private config: CircuitBreakerConfig,
private healthMonitor: HealthMonitor
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === CircuitState.OPEN) {
if (Date.now() < this.nextAttempt) {
throw new Error(`Circuit breaker OPEN for ${this.serviceId}. Next attempt at ${new Date(this.nextAttempt).toISOString()}`)
}
this.transitionToHalfOpen()
}
try {
const result = await this.withTimeout(operation, this.config.timeoutMs)
this.onSuccess()
return result
} catch (error) {
this.onFailure()
throw error
}
}
private async withTimeout<T>(operation: () => Promise<T>, timeoutMs: number): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error(`Operation timed out after ${timeoutMs}ms`))
}, timeoutMs)
operation()
.then(result => {
clearTimeout(timeoutId)
resolve(result)
})
.catch(error => {
clearTimeout(timeoutId)
reject(error)
})
})
}
private onSuccess(): void {
this.failureCount = 0
this.successCount++
if (this.state === CircuitState.HALF_OPEN) {
this.halfOpenCalls++
if (this.halfOpenCalls >= this.config.halfOpenMaxCalls) {
this.transitionToClosed()
}
}
this.healthMonitor.recordSuccess(this.serviceId)
}
private onFailure(): void {
this.failureCount++
this.lastFailureTime = Date.now()
if (this.state === CircuitState.HALF_OPEN) {
this.transitionToOpen()
} else if (this.failureCount >= this.config.failureThreshold) {
this.transitionToOpen()
}
this.healthMonitor.recordFailure(this.serviceId)
}
private transitionToClosed(): void {
this.state = CircuitState.CLOSED
this.failureCount = 0
this.successCount = 0
this.halfOpenCalls = 0
console.log(`[CIRCUIT_BREAKER] ${this.serviceId} transitioned to CLOSED`)
}
private transitionToOpen(): void {
this.state = CircuitState.OPEN
this.nextAttempt = Date.now() + this.config.resetTimeoutMs
this.halfOpenCalls = 0
console.log(`[CIRCUIT_BREAKER] ${this.serviceId} transitioned to OPEN until ${new Date(this.nextAttempt).toISOString()}`)
}
private transitionToHalfOpen(): void {
this.state = CircuitState.HALF_OPEN
this.halfOpenCalls = 0
console.log(`[CIRCUIT_BREAKER] ${this.serviceId} transitioned to HALF_OPEN`)
}
getState(): { state: CircuitState; failureCount: number; successCount: number } {
return {
state: this.state,
failureCount: this.failureCount,
successCount: this.successCount
}
}
forceOpen(): void {
this.transitionToOpen()
}
forceClose(): void {
this.transitionToClosed()
}
}
// Retry Mechanism with Different Strategies
class RetryMechanism {
constructor(private config: RetryConfig) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
let lastError: Error
for (let attempt = 1; attempt <= this.config.maxAttempts; attempt++) {
try {
console.log(`[RETRY] Attempt ${attempt}/${this.config.maxAttempts}`)
const result = await operation()
if (attempt > 1) {
console.log(`[RETRY] Operation succeeded on attempt ${attempt}`)
}
return result
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
console.log(`[RETRY] Attempt ${attempt} failed: ${lastError.message}`)
if (attempt === this.config.maxAttempts) {
console.log(`[RETRY] All ${this.config.maxAttempts} attempts exhausted`)
throw lastError
}
const delay = this.calculateDelay(attempt)
console.log(`[RETRY] Waiting ${delay}ms before retry...`)
await this.sleep(delay)
}
}
throw lastError!
}
private calculateDelay(attempt: number): number {
switch (this.config.strategy) {
case RetryStrategy.FIXED_DELAY:
return this.config.baseDelayMs
case RetryStrategy.EXPONENTIAL_BACKOFF:
const exponentialDelay = this.config.baseDelayMs * Math.pow(this.config.multiplier || 2, attempt - 1)
return Math.min(exponentialDelay, this.config.maxDelayMs)
case RetryStrategy.LINEAR_BACKOFF:
const linearDelay = this.config.baseDelayMs + (attempt - 1) * (this.config.baseDelayMs / 2)
return Math.min(linearDelay, this.config.maxDelayMs)
default:
return this.config.baseDelayMs
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
}
// Health Monitor Service
class HealthMonitor {
private serviceHealth: Map<string, ServiceHealth> = new Map()
private healthCheckInterval: number = 30000 // 30 seconds
constructor(private services: Array<{ id: string; endpoint: string }>) {
this.initializeServices()
this.startHealthChecks()
}
private initializeServices(): void {
for (const service of this.services) {
this.serviceHealth.set(service.id, {
serviceId: service.id,
endpoint: service.endpoint,
status: 'HEALTHY',
lastCheck: new Date(),
responseTime: 0,
errorRate: 0,
consecutiveFailures: 0
})
}
}
private startHealthChecks(): void {
setInterval(() => {
this.performHealthChecks()
}, this.healthCheckInterval)
}
private async performHealthChecks(): Promise<void> {
for (const [serviceId, health] of this.serviceHealth.entries()) {
try {
const startTime = Date.now()
await this.checkServiceHealth(health.endpoint)
const responseTime = Date.now() - startTime
health.status = 'HEALTHY'
health.responseTime = responseTime
health.lastCheck = new Date()
health.consecutiveFailures = 0
// Update error rate (simplified moving average)
health.errorRate = health.errorRate * 0.9 // Decay factor
console.log(`[HEALTH] ${serviceId}: HEALTHY (${responseTime}ms)`)
} catch (error) {
health.consecutiveFailures++
health.lastCheck = new Date()
if (health.consecutiveFailures >= 3) {
health.status = 'UNHEALTHY'
} else if (health.consecutiveFailures >= 1) {
health.status = 'DEGRADED'
}
// Update error rate
health.errorRate = Math.min(1, health.errorRate * 0.9 + 0.1)
console.log(`[HEALTH] ${serviceId}: ${health.status} (failure #${health.consecutiveFailures})`)
}
}
}
private async checkServiceHealth(endpoint: string): Promise<void> {
// Simulate health check
await new Promise((resolve, reject) => {
setTimeout(() => {
// 90% success rate
if (Math.random() > 0.1) {
resolve(undefined)
} else {
reject(new Error('Service unavailable'))
}
}, Math.random() * 500 + 50) // 50-550ms response time
})
}
recordSuccess(serviceId: string): void {
const health = this.serviceHealth.get(serviceId)
if (health) {
health.consecutiveFailures = 0
health.errorRate = Math.max(0, health.errorRate * 0.9)
}
}
recordFailure(serviceId: string): void {
const health = this.serviceHealth.get(serviceId)
if (health) {
health.consecutiveFailures++
health.errorRate = Math.min(1, health.errorRate * 0.9 + 0.1)
}
}
getHealthyServices(): string[] {
return Array.from(this.serviceHealth.values())
.filter(health => health.status === 'HEALTHY')
.map(health => health.serviceId)
}
getServiceHealth(serviceId: string): ServiceHealth | null {
return this.serviceHealth.get(serviceId) || null
}
getAllServiceHealth(): ServiceHealth[] {
return Array.from(this.serviceHealth.values())
}
}
// Failover Manager
class FailoverManager {
private activeNode: string | null = null
private standbyNodes: Set<string> = new Set()
private failoverHistory: Array<{
timestamp: Date
fromNode: string
toNode: string
reason: string
}> = []
constructor(
private nodes: Array<{ id: string; endpoint: string; region: string; priority: number }>,
private strategy: FailoverStrategy,
private healthMonitor: HealthMonitor
) {
this.initializeFailover()
}
private initializeFailover(): void {
if (this.strategy === FailoverStrategy.ACTIVE_PASSIVE) {
// Sort by priority and select highest as active
const sortedNodes = [...this.nodes].sort((a, b) => b.priority - a.priority)
this.activeNode = sortedNodes[0].id
this.standbyNodes = new Set(sortedNodes.slice(1).map(n => n.id))
} else if (this.strategy === FailoverStrategy.ACTIVE_ACTIVE) {
// All nodes are active
this.activeNode = 'multi-active'
this.standbyNodes = new Set()
}
console.log(`[FAILOVER] Initialized with ${this.strategy} strategy`)
console.log(`[FAILOVER] Active node: ${this.activeNode}`)
console.log(`[FAILOVER] Standby nodes: ${Array.from(this.standbyNodes).join(', ')}`)
}
async executeWithFailover<T>(
operation: (nodeId: string) => Promise<T>,
operationContext: string = 'operation'
): Promise<T> {
const nodesToTry = this.getNodesInOrder()
let lastError: Error | null = null
for (const nodeId of nodesToTry) {
try {
console.log(`[FAILOVER] Executing ${operationContext} on node ${nodeId}`)
// Check if node is healthy
const health = this.healthMonitor.getServiceHealth(nodeId)
if (health && health.status !== 'HEALTHY') {
console.log(`[FAILOVER] Skipping unhealthy node ${nodeId} (${health.status})`)
continue
}
const result = await operation(nodeId)
console.log(`[FAILOVER] ${operationContext} succeeded on node ${nodeId}`)
return result
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
console.log(`[FAILOVER] ${operationContext} failed on node ${nodeId}: ${lastError.message}`)
// If active node failed, initiate failover
if (nodeId === this.activeNode) {
await this.initiateFailover(nodeId, lastError.message)
}
}
}
throw lastError || new Error('All nodes failed')
}
private getNodesInOrder(): string[] {
if (this.strategy === FailoverStrategy.ACTIVE_ACTIVE) {
// Return healthy nodes sorted by priority
const healthyNodes = this.nodes.filter(node => {
const health = this.healthMonitor.getServiceHealth(node.id)
return health && health.status === 'HEALTHY'
})
return healthyNodes.sort((a, b) => b.priority - a.priority).map(n => n.id)
} else {
// Return active first, then healthy standbys
const nodes = [this.activeNode!, ...this.nodes
.filter(n => this.standbyNodes.has(n.id))
.filter(n => {
const health = this.healthMonitor.getServiceHealth(n.id)
return health && health.status === 'HEALTHY'
})
.sort((a, b) => b.priority - a.priority)
.map(n => n.id)]
return nodes.filter(Boolean)
}
}
private async initiateFailover(failedNodeId: string, reason: string): Promise<void> {
console.log(`[FAILOVER] Initiating failover from ${failedNodeId}`)
if (this.strategy === FailoverStrategy.ACTIVE_PASSIVE && this.activeNode === failedNodeId) {
// Find next highest priority healthy standby
const healthyStandbys = this.nodes
.filter(n => this.standbyNodes.has(n.id))
.filter(n => {
const health = this.healthMonitor.getServiceHealth(n.id)
return health && health.status === 'HEALTHY'
})
.sort((a, b) => b.priority - a.priority)
if (healthyStandbys.length > 0) {
const newActiveNode = healthyStandbys[0].id
this.failoverHistory.push({
timestamp: new Date(),
fromNode: failedNodeId,
toNode: newActiveNode,
reason
})
this.standbyNodes.delete(newActiveNode)
this.standbyNodes.add(failedNodeId)
this.activeNode = newActiveNode
console.log(`[FAILOVER] Failover completed: ${failedNodeId} -> ${newActiveNode}`)
} else {
console.log(`[FAILOVER] No healthy standby nodes available for failover`)
}
}
}
getFailoverHistory(): Array<{
timestamp: Date
fromNode: string
toNode: string
reason: string
}> {
return [...this.failoverHistory]
}
getCurrentConfiguration(): {
strategy: FailoverStrategy
activeNode: string | null
standbyNodes: string[]
totalFailovers: number
} {
return {
strategy: this.strategy,
activeNode: this.activeNode,
standbyNodes: Array.from(this.standbyNodes),
totalFailovers: this.failoverHistory.length
}
}
}
// Bulkhead Pattern Implementation
class Bulkhead {
private semaphore: number = 0
private waitingQueue: Array<{ resolve: Function; reject: Function }> = []
private metrics = {
totalCalls: 0,
rejectedCalls: 0,
successfulCalls: 0,
failedCalls: 0
}
constructor(
private name: string,
private maxConcurrentCalls: number,
private maxQueueSize: number
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
this.metrics.totalCalls++
if (this.semaphore >= this.maxConcurrentCalls) {
if (this.waitingQueue.length >= this.maxQueueSize) {
this.metrics.rejectedCalls++
throw new Error(`Bulkhead '${this.name}' rejected call - queue full`)
}
// Wait for available slot
await new Promise((resolve, reject) => {
this.waitingQueue.push({ resolve, reject })
})
}
this.semaphore++
try {
const result = await operation()
this.metrics.successfulCalls++
return result
} catch (error) {
this.metrics.failedCalls++
throw error
} finally {
this.semaphore--
this.processQueue()
}
}
private processQueue(): void {
if (this.waitingQueue.length > 0 && this.semaphore < this.maxConcurrentCalls) {
const next = this.waitingQueue.shift()
if (next) {
next.resolve(undefined)
}
}
}
getMetrics(): typeof this.metrics & {
currentConcurrency: number
queueSize: number
rejectionRate: number
} {
const rejectionRate = this.metrics.totalCalls > 0
? (this.metrics.rejectedCalls / this.metrics.totalCalls) * 100
: 0
return {
...this.metrics,
currentConcurrency: this.semaphore,
queueSize: this.waitingQueue.length,
rejectionRate
}
}
}
// Timeout Manager
class TimeoutManager {
private timeouts: Map<string, NodeJS.Timeout> = new Map()
executeWithTimeout<T>(
operation: () => Promise<T>,
timeoutMs: number,
operationId: string = crypto.randomUUID()
): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
this.timeouts.delete(operationId)
reject(new Error(`Operation ${operationId} timed out after ${timeoutMs}ms`))
}, timeoutMs)
this.timeouts.set(operationId, timeoutId)
operation()
.then(result => {
this.clearTimeout(operationId)
resolve(result)
})
.catch(error => {
this.clearTimeout(operationId)
reject(error)
})
})
}
clearTimeout(operationId: string): void {
const timeoutId = this.timeouts.get(operationId)
if (timeoutId) {
clearTimeout(timeoutId)
this.timeouts.delete(operationId)
}
}
cancelAll(): void {
for (const timeoutId of this.timeouts.values()) {
clearTimeout(timeoutId)
}
this.timeouts.clear()
}
getActiveTimeoutsCount(): number {
return this.timeouts.size
}
}
// Fault Tolerance Orchestrator
class FaultToleranceOrchestrator {
private circuitBreakers: Map<string, CircuitBreaker> = new Map()
private bulkheads: Map<string, Bulkhead> = new Map()
private retryMechanisms: Map<string, RetryMechanism> = new Map()
private timeoutManager = new TimeoutManager()
constructor(
private services: Array<{
id: string
endpoint: string
circuitBreakerConfig?: CircuitBreakerConfig
bulkheadConfig?: { maxConcurrentCalls: number; maxQueueSize: number }
retryConfig?: RetryConfig
}>,
private healthMonitor: HealthMonitor,
private failoverManager: FailoverManager
) {
this.initializeComponents()
}
private initializeComponents(): void {
for (const service of this.services) {
// Circuit Breaker
if (service.circuitBreakerConfig) {
this.circuitBreakers.set(service.id, new CircuitBreaker(
service.id,
service.circuitBreakerConfig,
this.healthMonitor
))
}
// Bulkhead
if (service.bulkheadConfig) {
this.bulkheads.set(service.id, new Bulkhead(
service.id,
service.bulkheadConfig.maxConcurrentCalls,
service.bulkheadConfig.maxQueueSize
))
}
// Retry Mechanism
if (service.retryConfig) {
this.retryMechanisms.set(service.id, new RetryMechanism(service.retryConfig))
}
}
}
async executeServiceCall<T>(
serviceId: string,
operation: () => Promise<T>,
options: {
timeoutMs?: number
useFailover?: boolean
} = {}
): Promise<T> {
const service = this.services.find(s => s.id === serviceId)
if (!service) {
throw new Error(`Service ${serviceId} not found`)
}
const circuitBreaker = this.circuitBreakers.get(serviceId)
const bulkhead = this.bulkheads.get(serviceId)
const retryMechanism = this.retryMechanisms.get(serviceId)
let executeOperation = operation
// Apply bulkhead if configured
if (bulkhead) {
const originalOperation = executeOperation
executeOperation = () => bulkhead.execute(originalOperation)
}
// Apply retry mechanism if configured
if (retryMechanism) {
const originalOperation = executeOperation
executeOperation = () => retryMechanism.execute(originalOperation)
}
// Apply circuit breaker if configured
if (circuitBreaker) {
const originalOperation = executeOperation
executeOperation = () => circuitBreaker.execute(originalOperation)
}
// Apply timeout if specified
if (options.timeoutMs) {
const originalOperation = executeOperation
executeOperation = () => this.timeoutManager.executeWithTimeout(originalOperation, options.timeoutMs!)
}
// Apply failover if requested
if (options.useFailover) {
return this.failoverManager.executeWithFailover(
() => executeOperation(),
`service call to ${serviceId}`
)
}
return executeOperation()
}
getSystemHealth(): {
services: Array<{
id: string
circuitBreakerState?: any
bulkheadMetrics?: any
health: ServiceHealth | null
}>
failoverConfig: any
overallHealth: 'HEALTHY' | 'DEGRADED' | 'UNHEALTHY'
} {
const services = this.services.map(service => {
const circuitBreaker = this.circuitBreakers.get(service.id)
const bulkhead = this.bulkheads.get(service.id)
const health = this.healthMonitor.getServiceHealth(service.id)
return {
id: service.id,
circuitBreakerState: circuitBreaker?.getState(),
bulkheadMetrics: bulkhead?.getMetrics(),
health
}
})
const unhealthyServices = services.filter(s => s.health?.status === 'UNHEALTHY').length
const degradedServices = services.filter(s => s.health?.status === 'DEGRADED').length
let overallHealth: 'HEALTHY' | 'DEGRADED' | 'UNHEALTHY' = 'HEALTHY'
if (unhealthyServices > 0) {
overallHealth = 'UNHEALTHY'
} else if (degradedServices > 0) {
overallHealth = 'DEGRADED'
}
return {
services,
failoverConfig: this.failoverManager.getCurrentConfiguration(),
overallHealth
}
}
}
// Usage Example
async function faultToleranceExample() {
console.log('=== Fault Tolerance Mechanisms ===\n')
// Initialize services
const services = [
{
id: 'user-service',
endpoint: 'http://user-service:8080',
circuitBreakerConfig: {
failureThreshold: 5,
timeoutMs: 5000,
halfOpenMaxCalls: 3,
resetTimeoutMs: 30000
},
bulkheadConfig: {
maxConcurrentCalls: 10,
maxQueueSize: 50
},
retryConfig: {
maxAttempts: 3,
strategy: RetryStrategy.EXPONENTIAL_BACKOFF,
baseDelayMs: 1000,
maxDelayMs: 10000,
multiplier: 2
}
},
{
id: 'order-service',
endpoint: 'http://order-service:8080',
circuitBreakerConfig: {
failureThreshold: 3,
timeoutMs: 8000,
halfOpenMaxCalls: 2,
resetTimeoutMs: 60000
},
retryConfig: {
maxAttempts: 2,
strategy: RetryStrategy.FIXED_DELAY,
baseDelayMs: 2000,
maxDelayMs: 5000
}
},
{
id: 'payment-service',
endpoint: 'http://payment-service:8080',
circuitBreakerConfig: {
failureThreshold: 2,
timeoutMs: 3000,
halfOpenMaxCalls: 1,
resetTimeoutMs: 15000
}
}
]
// Initialize nodes for failover
const nodes = [
{ id: 'primary-db', endpoint: 'http://primary-db:5432', region: 'us-east-1', priority: 10 },
{ id: 'secondary-db', endpoint: 'http://secondary-db:5432', region: 'us-west-1', priority: 8 },
{ id: 'backup-db', endpoint: 'http://backup-db:5432', region: 'eu-west-1', priority: 6 }
]
// Initialize components
const healthMonitor = new HealthMonitor(services)
const failoverManager = new FailoverManager(nodes, FailoverStrategy.ACTIVE_PASSIVE, healthMonitor)
const orchestrator = new FaultToleranceOrchestrator(services, healthMonitor, failoverManager)
console.log('1. Initial System Health')
// Wait for initial health checks
await new Promise(resolve => setTimeout(resolve, 2000))
let systemHealth = orchestrator.getSystemHealth()
console.log(`Overall system health: ${systemHealth.overallHealth}`)
console.log('Service health:')
systemHealth.services.forEach(service => {
console.log(` ${service.id}: ${service.health?.status || 'UNKNOWN'}`)
})
console.log('\n2. Testing Circuit Breaker')
// Simulate service failures to trigger circuit breaker
console.log('Simulating user-service failures...')
for (let i = 0; i < 7; i++) {
try {
await orchestrator.executeServiceCall('user-service', async () => {
// 70% failure rate
if (Math.random() > 0.3) {
throw new Error('Service temporarily unavailable')
}
return 'User data'
}, { timeoutMs: 1000 })
console.log(` Attempt ${i + 1}: SUCCESS`)
} catch (error) {
console.log(` Attempt ${i + 1}: FAILED - ${error.message}`)
}
}
// Check circuit breaker state
const userServiceCB = orchestrator['circuitBreakers'].get('user-service')
if (userServiceCB) {
const cbState = userServiceCB.getState()
console.log(`\nCircuit breaker state: ${cbState.state} (failures: ${cbState.failureCount})`)
}
console.log('\n3. Testing Retry Mechanism')
// Test different retry strategies
const retryConfig = {
maxAttempts: 4,
strategy: RetryStrategy.EXPONENTIAL_BACKOFF,
baseDelayMs: 500,
maxDelayMs: 5000,
multiplier: 2
}
const retryMechanism = new RetryMechanism(retryConfig)
try {
const result = await retryMechanism.execute(async () => {
// 50% success rate
if (Math.random() > 0.5) {
throw new Error('Random failure')
}
return 'Operation succeeded'
})
console.log(`Retry mechanism result: ${result}`)
} catch (error) {
console.log(`Retry mechanism failed: ${error.message}`)
}
console.log('\n4. Testing Bulkhead Pattern')
const bulkhead = new Bulkhead('test-bulkhead', 3, 5)
// Submit concurrent requests
const promises = []
for (let i = 0; i < 8; i++) {
promises.push(
bulkhead.execute(async () => {
await new Promise(resolve => setTimeout(resolve, 1000))
return `Request ${i} completed`
}).catch(error => error.message)
)
}
const bulkheadResults = await Promise.all(promises)
console.log('Bulkhead results:')
bulkheadResults.forEach((result, index) => {
console.log(` Request ${index}: ${result}`)
})
const bulkheadMetrics = bulkhead.getMetrics()
console.log(`Bulkhead metrics: ${JSON.stringify(bulkheadMetrics)}`)
console.log('\n5. Testing Failover')
// Simulate database failover
try {
await orchestrator.executeServiceCall('user-service', async () => {
return await failoverManager.executeWithFailover(async (nodeId) => {
console.log(` Executing database operation on ${nodeId}`)
// Simulate primary failure
if (nodeId === 'primary-db' && Math.random() > 0.3) {
throw new Error('Primary database failed')
}
await new Promise(resolve => setTimeout(resolve, 200))
return `Data from ${nodeId}`
}, 'database query')
}, { useFailover: true })
} catch (error) {
console.log(`Failover test failed: ${error.message}`)
}
const failoverHistory = failoverManager.getFailoverHistory()
if (failoverHistory.length > 0) {
console.log('\nFailover history:')
failoverHistory.forEach(failover => {
console.log(` ${failover.timestamp.toISOString()}: ${failover.fromNode} -> ${failover.toNode} (${failover.reason})`)
})
}
console.log('\n6. Final System Health Check')
// Wait for some time to see health changes
await new Promise(resolve => setTimeout(resolve, 3000))
systemHealth = orchestrator.getSystemHealth()
console.log(`Final overall health: ${systemHealth.overallHealth}`)
console.log('\nService details:')
systemHealth.services.forEach(service => {
console.log(`\n${service.id}:`)
console.log(` Health: ${service.health?.status}`)
console.log(` Response time: ${service.health?.responseTime}ms`)
console.log(` Error rate: ${(service.health?.errorRate || 0).toFixed(2)}%`)
if (service.circuitBreakerState) {
console.log(` Circuit breaker: ${service.circuitBreakerState.state}`)
}
if (service.bulkheadMetrics) {
console.log(` Bulkhead: ${service.bulkheadMetrics.currentConcurrency}/${service.bulkheadMetrics.maxConcurrency}`)
}
})
console.log('\n=== Fault Tolerance Demo Complete ===')
}
export {
CircuitState,
RetryStrategy,
FailoverStrategy,
ServiceHealth,
CircuitBreaker,
RetryMechanism,
HealthMonitor,
FailoverManager,
Bulkhead,
TimeoutManager,
FaultToleranceOrchestrator,
faultToleranceExample
}