🎯 Ejemplos recomendados
Balanced sample collections from various categories for you to explore
Muestras de Patrones de Sistemas Distribuidos
Patrones completos de sistemas distribuidos cubriendo consistencia, algoritmos de consenso, sharding y mecanismos de tolerancia a fallos
💻 Modelos de Consistencia Distribuida
🔴 complex
⭐⭐⭐⭐⭐
Implementando diferentes modelos de consistencia y patrones de coordinación
⏱️ 80 min
🏷️ distributed-consistency, strong-consistency, eventual-consistency, vector-clocks
Prerequisites:
Distributed systems fundamentals, CAP theorem understanding
// Distributed Consistency Models Implementation
// Consistency Model Types
enum ConsistencyModel {
STRONG = 'strong',
SEQUENTIAL = 'sequential',
CAUSAL = 'causal',
EVENTUAL = 'eventual',
WEAK = 'weak'
}
interface DataItem {
key: string
value: any
version: number
timestamp: Date
nodeId: string
}
interface Operation {
id: string
type: 'READ' | 'WRITE' | 'DELETE'
key: string
value?: any
timestamp: Date
nodeId: string
}
interface ConsistencyResult {
success: boolean
value?: any
version?: number
errorMessage?: string
}
// Strong Consistency Implementation (Linearizability)
class StrongConsistencyManager {
private data: Map<string, DataItem> = new Map()
private operationLog: Operation[] = []
private lockTable: Map<string, string> = new Map() // key -> nodeId
async write(key: string, value: any, nodeId: string): Promise<ConsistencyResult> {
// Acquire lock for strong consistency
const lockAcquired = await this.acquireLock(key, nodeId)
if (!lockAcquired) {
return { success: false, errorMessage: 'Could not acquire lock for write operation' }
}
try {
const currentData = this.data.get(key)
const newVersion = (currentData?.version || 0) + 1
const newDataItem: DataItem = {
key,
value,
version: newVersion,
timestamp: new Date(),
nodeId
}
// Log the operation
const operation: Operation = {
id: crypto.randomUUID(),
type: 'WRITE',
key,
value,
timestamp: new Date(),
nodeId
}
this.operationLog.push(operation)
this.data.set(key, newDataItem)
console.log(`[STRONG] Write operation for key ${key} by ${nodeId} - Version: ${newVersion}`)
return { success: true, value, version: newVersion }
} finally {
await this.releaseLock(key, nodeId)
}
}
async read(key: string, nodeId: string): Promise<ConsistencyResult> {
// For strong consistency, read from primary with lock
const lockAcquired = await this.acquireLock(key, nodeId, 5000) // 5 second timeout
if (!lockAcquired) {
return { success: false, errorMessage: 'Could not acquire lock for read operation' }
}
try {
const dataItem = this.data.get(key)
if (!dataItem) {
return { success: false, errorMessage: 'Key not found' }
}
const operation: Operation = {
id: crypto.randomUUID(),
type: 'READ',
key,
timestamp: new Date(),
nodeId
}
this.operationLog.push(operation)
console.log(`[STRONG] Read operation for key ${key} by ${nodeId} - Version: ${dataItem.version}`)
return {
success: true,
value: dataItem.value,
version: dataItem.version
}
} finally {
await this.releaseLock(key, nodeId)
}
}
private async acquireLock(key: string, nodeId: string, timeout: number = 30000): Promise<boolean> {
const startTime = Date.now()
while (Date.now() - startTime < timeout) {
const currentLock = this.lockTable.get(key)
if (!currentLock || currentLock === nodeId) {
this.lockTable.set(key, nodeId)
return true
}
// Wait before retrying
await new Promise(resolve => setTimeout(resolve, 100))
}
return false
}
private async releaseLock(key: string, nodeId: string): Promise<void> {
const currentLock = this.lockTable.get(key)
if (currentLock === nodeId) {
this.lockTable.delete(key)
}
}
getOperationHistory(): Operation[] {
return [...this.operationLog]
}
}
// Eventual Consistency Implementation with Gossip Protocol
class EventualConsistencyManager {
private nodeData: Map<string, Map<string, DataItem>> = new Map() // nodeId -> (key -> data)
private vectorClocks: Map<string, Map<string, number>> = new Map() // nodeId -> (nodeId -> counter)
private gossipInterval: number = 5000 // 5 seconds
constructor(private nodeId: string, private knownNodes: string[]) {
this.initializeNode()
this.startGossipProtocol()
}
private initializeNode(): void {
this.nodeData.set(this.nodeId, new Map())
this.vectorClocks.set(this.nodeId, new Map())
const nodeClock = this.vectorClocks.get(this.nodeId)!
for (const node of this.knownNodes) {
nodeClock.set(node, 0)
}
nodeClock.set(this.nodeId, 0)
}
async write(key: string, value: any): Promise<ConsistencyResult> {
const nodeData = this.nodeData.get(this.nodeId)!
const nodeClock = this.vectorClocks.get(this.nodeId)!
// Increment vector clock for this node
const currentCounter = nodeClock.get(this.nodeId) || 0
nodeClock.set(this.nodeId, currentCounter + 1)
const newDataItem: DataItem = {
key,
value,
version: currentCounter + 1,
timestamp: new Date(),
nodeId: this.nodeId
}
// Store with vector clock
(newDataItem as any).vectorClock = new Map(nodeClock)
nodeData.set(key, newDataItem)
console.log(`[EVENTUAL] Local write for key ${key} on node ${this.nodeId} - VC: ${JSON.stringify(Object.fromEntries(nodeClock))}`)
return { success: true, value, version: currentCounter + 1 }
}
async read(key: string): Promise<ConsistencyResult> {
const nodeData = this.nodeData.get(this.nodeId)!
const dataItem = nodeData.get(key)
if (!dataItem) {
return { success: false, errorMessage: 'Key not found' }
}
console.log(`[EVENTUAL] Read key ${key} on node ${this.nodeId} - Version: ${dataItem.version}`)
return {
success: true,
value: dataItem.value,
version: dataItem.version
}
}
private async startGossipProtocol(): Promise<void> {
setInterval(async () => {
await this.performGossip()
}, this.gossipInterval)
}
private async performGossip(): Promise<void> {
if (this.knownNodes.length === 0) return
// Select random node to gossip with
const targetNode = this.knownNodes[Math.floor(Math.random() * this.knownNodes.length)]
try {
await this.exchangeDataWithNode(targetNode)
} catch (error) {
console.log(`Gossip with node ${targetNode} failed:`, error)
}
}
private async exchangeDataWithNode(targetNodeId: string): Promise<void> {
console.log(`[GOSSIP] Exchanging data between ${this.nodeId} and ${targetNodeId}`)
// Simulate data exchange with another node
const targetNodeData = this.getSimulatedNodeData(targetNodeId)
const myNodeData = this.nodeData.get(this.nodeId)!
// Merge data from target node
for (const [key, targetData] of targetNodeData.entries()) {
const myData = myNodeData.get(key)
if (!myData) {
// Key doesn't exist locally, adopt it
myNodeData.set(key, targetData)
console.log(`[GOSSIP] Adopted key ${key} from node ${targetNodeId}`)
} else {
// Resolve conflicts using vector clocks
const resolved = this.resolveConflict(myData, targetData)
if (resolved !== myData) {
myNodeData.set(key, resolved)
console.log(`[GOSSIP] Resolved conflict for key ${key}`)
}
}
}
}
private resolveConflict(data1: DataItem, data2: DataItem): DataItem {
const vc1 = (data1 as any).vectorClock as Map<string, number>
const vc2 = (data2 as any).vectorClock as Map<string, number>
if (!vc1 || !vc2) {
// Fallback to timestamp
return data1.timestamp >= data2.timestamp ? data1 : data2
}
// Compare vector clocks
const result = this.compareVectorClocks(vc1, vc2)
if (result === 'concurrent') {
// Conflict - choose the one with higher timestamp (last-writer-wins)
return data1.timestamp >= data2.timestamp ? data1 : data2
} else if (result === 'greater') {
return data1
} else {
return data2
}
}
private compareVectorClocks(vc1: Map<string, number>, vc2: Map<string, number>): 'greater' | 'less' | 'equal' | 'concurrent' {
let greater = false
let less = false
const allKeys = new Set([...vc1.keys(), ...vc2.keys()])
for (const key of allKeys) {
const v1 = vc1.get(key) || 0
const v2 = vc2.get(key) || 0
if (v1 > v2) greater = true
if (v1 < v2) less = true
if (greater && less) return 'concurrent'
}
if (greater && !less) return 'greater'
if (!greater && less) return 'less'
return 'equal'
}
private getSimulatedNodeData(nodeId: string): Map<string, DataItem> {
// Simulate getting data from another node
const simulatedData = new Map<string, DataItem>()
// Add some sample data for simulation
if (Math.random() > 0.5) {
simulatedData.set('shared_key_1', {
key: 'shared_key_1',
value: `value_from_${nodeId}`,
version: Math.floor(Math.random() * 10),
timestamp: new Date(Date.now() - Math.random() * 10000),
nodeId
})
}
return simulatedData
}
getLocalData(): Map<string, DataItem> {
return new Map(this.nodeData.get(this.nodeId))
}
}
// Causal Consistency Implementation
class CausalConsistencyManager {
private data: Map<string, DataItem> = new Map()
private dependencyGraph: Map<string, Set<string>> = new Map() // operation -> dependencies
private pendingOperations: Map<string, Operation> = new Map()
async write(key: string, value: any, dependencies: string[] = []): Promise<ConsistencyResult> {
const operationId = crypto.randomUUID()
const timestamp = new Date()
const operation: Operation = {
id: operationId,
type: 'WRITE',
key,
value,
timestamp,
nodeId: 'causal-node'
}
// Check if dependencies are satisfied
const dependenciesSatisfied = this.checkDependencies(dependencies)
if (!dependenciesSatisfied) {
// Add to pending operations
this.pendingOperations.set(operationId, operation)
this.dependencyGraph.set(operationId, new Set(dependencies))
console.log(`[CAUSAL] Write for key ${key} pending dependencies: ${dependencies.join(', ')}`)
return { success: false, errorMessage: 'Dependencies not satisfied' }
}
// Execute operation
const newDataItem: DataItem = {
key,
value,
version: Date.now(), // Use timestamp as version
timestamp,
nodeId: 'causal-node'
}
this.data.set(key, newDataItem)
this.dependencyGraph.delete(operationId)
// Process pending operations that now have satisfied dependencies
await this.processPendingOperations(operationId)
console.log(`[CAUSAL] Write for key ${key} executed successfully`)
return { success: true, value, version: newDataItem.version }
}
async read(key: string): Promise<ConsistencyResult> {
const dataItem = this.data.get(key)
if (!dataItem) {
return { success: false, errorMessage: 'Key not found' }
}
console.log(`[CAUSAL] Read key ${key} - Version: ${dataItem.version}`)
return {
success: true,
value: dataItem.value,
version: dataItem.version
}
}
private checkDependencies(dependencies: string[]): boolean {
return dependencies.every(dep => !this.pendingOperations.has(dep))
}
private async processPendingOperations(completedOperationId: string): Promise<void> {
const toProcess: string[] = []
// Find operations that depend on the completed operation
for (const [opId, dependencies] of this.dependencyGraph.entries()) {
if (dependencies.has(completedOperationId)) {
dependencies.delete(completedOperationId)
if (dependencies.size === 0) {
toProcess.push(opId)
}
}
}
// Process operations whose dependencies are now satisfied
for (const opId of toProcess) {
const operation = this.pendingOperations.get(opId)
if (!operation) continue
this.pendingOperations.delete(opId)
this.dependencyGraph.delete(opId)
// Execute the pending operation
if (operation.type === 'WRITE' && operation.value !== undefined) {
const newDataItem: DataItem = {
key: operation.key,
value: operation.value,
version: Date.now(),
timestamp: operation.timestamp,
nodeId: operation.nodeId
}
this.data.set(operation.key, newDataItem)
console.log(`[CAUSAL] Executed pending write for key ${operation.key}`)
}
// Recursively process more pending operations
await this.processPendingOperations(opId)
}
}
getPendingOperationsCount(): number {
return this.pendingOperations.size
}
getDependencyGraph(): Map<string, Set<string>> {
return new Map(this.dependencyGraph)
}
}
// Consistency Performance Analyzer
class ConsistencyPerformanceAnalyzer {
private metrics: Map<string, Array<{
operation: string
latency: number
timestamp: Date
success: boolean
}>> = new Map()
recordMetric(consistencyModel: string, operation: string, latency: number, success: boolean): void {
if (!this.metrics.has(consistencyModel)) {
this.metrics.set(consistencyModel, [])
}
const modelMetrics = this.metrics.get(consistencyModel)!
modelMetrics.push({
operation,
latency,
timestamp: new Date(),
success
})
// Keep only last 1000 metrics per model
if (modelMetrics.length > 1000) {
modelMetrics.shift()
}
}
getPerformanceReport(): {
models: Array<{
name: string
avgLatency: number
successRate: number
totalOperations: number
readLatency: number
writeLatency: number
}>
recommendations: string[]
} {
const models: Array<{
name: string
avgLatency: number
successRate: number
totalOperations: number
readLatency: number
writeLatency: number
}> = []
for (const [modelName, metrics] of this.metrics.entries()) {
if (metrics.length === 0) continue
const successfulOps = metrics.filter(m => m.success)
const readOps = metrics.filter(m => m.operation === 'READ')
const writeOps = metrics.filter(m => m.operation === 'WRITE')
const avgLatency = metrics.reduce((sum, m) => sum + m.latency, 0) / metrics.length
const successRate = (successfulOps.length / metrics.length) * 100
const readLatency = readOps.length > 0 ? readOps.reduce((sum, m) => sum + m.latency, 0) / readOps.length : 0
const writeLatency = writeOps.length > 0 ? writeOps.reduce((sum, m) => sum + m.latency, 0) / writeOps.length : 0
models.push({
name: modelName,
avgLatency,
successRate,
totalOperations: metrics.length,
readLatency,
writeLatency
})
}
const recommendations = this.generateRecommendations(models)
return { models, recommendations }
}
private generateRecommendations(models: any[]): string[] {
const recommendations: string[] = []
// Analyze latency patterns
const avgLatencies = models.map(m => m.avgLatency)
const maxLatency = Math.max(...avgLatencies)
if (maxLatency > 1000) { // 1 second
recommendations.push('High latency detected. Consider optimizing network communication or using caching.')
}
// Analyze success rates
const lowSuccessModels = models.filter(m => m.successRate < 95)
if (lowSuccessModels.length > 0) {
recommendations.push('Low success rates detected. Check for network failures or implement retry mechanisms.')
}
// Compare read vs write latencies
const highWriteLatencyModels = models.filter(m => m.writeLatency > m.readLatency * 2)
if (highWriteLatencyModels.length > 0) {
recommendations.push('Write operations significantly slower than reads. Consider write-optimizations or eventual consistency for writes.')
}
// Model-specific recommendations
const strongModel = models.find(m => m.name.includes('STRONG'))
const eventualModel = models.find(m => m.name.includes('EVENTUAL'))
if (strongModel && eventualModel) {
if (strongModel.avgLatency > eventualModel.avgLatency * 3) {
recommendations.push('Strong consistency shows high latency penalty. Evaluate if eventual consistency could be sufficient for your use case.')
}
}
if (recommendations.length === 0) {
recommendations.push('Performance metrics look good. Continue monitoring for any degradation.')
}
return recommendations
}
}
// Usage Example
async function consistencyModelsExample() {
console.log('=== Distributed Consistency Models ===\n')
// Initialize different consistency managers
const strongManager = new StrongConsistencyManager()
const eventualManager1 = new EventualConsistencyManager('node-1', ['node-2', 'node-3'])
const eventualManager2 = new EventualConsistencyManager('node-2', ['node-1', 'node-3'])
const causalManager = new CausalConsistencyManager()
const analyzer = new ConsistencyPerformanceAnalyzer()
console.log('1. Testing Strong Consistency')
// Test strong consistency with concurrent operations
const strongStartTime = Date.now()
await strongManager.write('key1', 'value1', 'node-1')
await strongManager.read('key1', 'node-2')
await strongManager.write('key1', 'value2', 'node-2')
const strongResult = await strongManager.read('key1', 'node-1')
const strongLatency = Date.now() - strongStartTime
console.log(`Strong consistency result: ${JSON.stringify(strongResult)}`)
analyzer.recordMetric('STRONG', 'WRITE', strongLatency / 2, true)
analyzer.recordMetric('STRONG', 'READ', strongLatency / 2, true)
console.log('\n2. Testing Eventual Consistency')
// Test eventual consistency
const eventualStartTime = Date.now()
await eventualManager1.write('key1', 'value1')
await eventualManager2.write('key2', 'value2')
// Wait for gossip to propagate
await new Promise(resolve => setTimeout(resolve, 6000))
const read1 = await eventualManager1.read('key1')
const read2 = await eventualManager2.read('key2')
const eventualLatency = Date.now() - eventualStartTime
console.log(`Eventual consistency results: node1=${JSON.stringify(read1)}, node2=${JSON.stringify(read2)}`)
analyzer.recordMetric('EVENTUAL', 'WRITE', eventualLatency / 2, true)
analyzer.recordMetric('EVENTUAL', 'READ', eventualLatency / 2, true)
// Show data convergence
console.log('\nNode 1 data:', Array.from(eventualManager1.getLocalData().keys()))
console.log('Node 2 data:', Array.from(eventualManager2.getLocalData().keys()))
console.log('\n3. Testing Causal Consistency')
// Test causal consistency with dependencies
const causalStartTime = Date.now()
await causalManager.write('key1', 'value1') // First operation
await causalManager.write('key2', 'value2', ['op-1']) // Depends on first operation
const causalResult = await causalManager.read('key2')
const causalLatency = Date.now() - causalStartTime
console.log(`Causal consistency result: ${JSON.stringify(causalResult)}`)
console.log(`Pending operations: ${causalManager.getPendingOperationsCount()}`)
analyzer.recordMetric('CAUSAL', 'WRITE', causalLatency / 2, true)
analyzer.recordMetric('CAUSAL', 'READ', causalLatency / 2, true)
// Test concurrent operations with potential conflicts
console.log('\n4. Testing Concurrent Operations')
const concurrentPromises = []
// Strong consistency - should serialize
for (let i = 0; i < 5; i++) {
concurrentPromises.push(strongManager.write(`concurrent_key_${i}`, `value_${i}`, `node-${i}`))
}
const concurrentStartTime = Date.now()
await Promise.all(concurrentPromises)
const concurrentLatency = Date.now() - concurrentStartTime
console.log(`Concurrent strong operations completed in ${concurrentLatency}ms`)
// Performance analysis
console.log('\n5. Performance Analysis Report')
// Add some more sample metrics
for (let i = 0; i < 10; i++) {
analyzer.recordMetric('STRONG', 'READ', Math.random() * 200 + 50, true)
analyzer.recordMetric('STRONG', 'WRITE', Math.random() * 500 + 200, true)
analyzer.recordMetric('EVENTUAL', 'READ', Math.random() * 50 + 10, true)
analyzer.recordMetric('EVENTUAL', 'WRITE', Math.random() * 100 + 20, true)
analyzer.recordMetric('CAUSAL', 'READ', Math.random() * 150 + 30, true)
analyzer.recordMetric('CAUSAL', 'WRITE', Math.random() * 300 + 100, true)
}
const report = analyzer.getPerformanceReport()
console.log('\nPerformance by Model:')
report.models.forEach(model => {
console.log(` ${model.name}:`)
console.log(` Average Latency: ${model.avgLatency.toFixed(2)}ms`)
console.log(` Success Rate: ${model.successRate.toFixed(1)}%`)
console.log(` Read Latency: ${model.readLatency.toFixed(2)}ms`)
console.log(` Write Latency: ${model.writeLatency.toFixed(2)}ms`)
console.log(` Total Operations: ${model.totalOperations}`)
})
console.log('\nRecommendations:')
report.recommendations.forEach(rec => {
console.log(` • ${rec}`)
})
}
export {
ConsistencyModel,
DataItem,
Operation,
ConsistencyResult,
StrongConsistencyManager,
EventualConsistencyManager,
CausalConsistencyManager,
ConsistencyPerformanceAnalyzer,
consistencyModelsExample
}
💻 Algoritmos de Consenso (Raft, PBFT)
🔴 complex
⭐⭐⭐⭐⭐
Implementando algoritmos de consenso Raft y PBFT para acuerdo distribuido
⏱️ 90 min
🏷️ consensus-algorithms, raft, pbft, byzantine-fault-tolerance
Prerequisites:
Advanced distributed systems knowledge, Understanding of consensus protocols
// Consensus Algorithms Implementation
// Node Types and States
enum NodeState {
FOLLOWER = 'FOLLOWER',
CANDIDATE = 'CANDIDATE',
LEADER = 'LEADER'
}
enum RaftMessageType {
APPEND_ENTRIES = 'APPEND_ENTRIES',
APPEND_ENTRIES_RESPONSE = 'APPEND_ENTRIES_RESPONSE',
REQUEST_VOTE = 'REQUEST_VOTE',
REQUEST_VOTE_RESPONSE = 'REQUEST_VOTE_RESPONSE',
HEARTBEAT = 'HEARTBEAT'
}
enum PBFTMessageType {
PRE_PREPARE = 'PRE_PREPARE',
PREPARE = 'PREPARE',
COMMIT = 'COMMIT',
VIEW_CHANGE = 'VIEW_CHANGE',
NEW_VIEW = 'NEW_VIEW'
}
interface RaftMessage {
type: RaftMessageType
term: number
senderId: string
receiverId: string
data?: any
}
interface PBFTMessage {
type: PBFTMessageType
view: number
sequenceNumber: number
senderId: string
digest: string
data?: any
signatures?: string[]
}
interface LogEntry {
term: number
index: number
command: any
timestamp: Date
}
// Raft Consensus Algorithm Implementation
class RaftNode {
private state: NodeState = NodeState.FOLLOWER
private currentTerm: number = 0
private votedFor: string | null = null
private log: LogEntry[] = []
private commitIndex: number = 0
private lastApplied: number = 0
// Leader state
private leaderId: string | null = null
private nextIndex: Map<string, number> = new Map()
private matchIndex: Map<string, number> = new Map()
// Timing
private electionTimeout: number
private heartbeatInterval: number
private lastHeartbeat: number = Date.now()
private electionTimer: NodeJS.Timeout | null = null
private heartbeatTimer: NodeJS.Timeout | null = null
constructor(
public nodeId: string,
public clusterNodes: string[],
private messageHandler: (message: RaftMessage) => void
) {
this.electionTimeout = Math.random() * 3000 + 5000 // 5-8 seconds
this.heartbeatInterval = 1000 // 1 second
this.initializeNode()
}
private initializeNode(): void {
// Initialize nextIndex for all nodes
for (const nodeId of this.clusterNodes) {
if (nodeId !== this.nodeId) {
this.nextIndex.set(nodeId, 1)
this.matchIndex.set(nodeId, 0)
}
}
this.startElectionTimer()
}
// Client request to append command
async clientRequest(command: any): Promise<{
success: boolean
message?: string
term?: number
}> {
if (this.state !== NodeState.LEADER) {
return {
success: false,
message: `Not a leader. Current leader: ${this.leaderId || 'unknown'}`,
term: this.currentTerm
}
}
const logEntry: LogEntry = {
term: this.currentTerm,
index: this.log.length + 1,
command,
timestamp: new Date()
}
this.log.push(logEntry)
console.log(`[RAFT-${this.nodeId}] Client request appended to log at index ${logEntry.index}`)
// Replicate to followers
const success = await this.replicateLog()
if (success) {
return { success: true, term: this.currentTerm }
} else {
return { success: false, message: 'Failed to replicate to majority', term: this.currentTerm }
}
}
private async replicateLog(): Promise<boolean> {
let successCount = 1 // Count self
const majority = Math.floor(this.clusterNodes.length / 2) + 1
for (const followerId of this.clusterNodes) {
if (followerId === this.nodeId) continue
const nextIdx = this.nextIndex.get(followerId) || 1
const entries = this.log.slice(nextIdx - 1)
if (entries.length === 0) {
// Send heartbeat
this.sendHeartbeat(followerId)
successCount++
} else {
// Send append entries
const success = await this.sendAppendEntries(followerId, entries)
if (success) successCount++
}
}
return successCount >= majority
}
private sendHeartbeat(followerId: string): void {
const message: RaftMessage = {
type: RaftMessageType.HEARTBEAT,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: followerId,
data: { leaderId: this.nodeId }
}
this.messageHandler(message)
}
private async sendAppendEntries(followerId: string, entries: LogEntry[]): Promise<boolean> {
const prevLogIndex = (this.nextIndex.get(followerId) || 1) - 1
const prevLogTerm = prevLogIndex > 0 ? this.log[prevLogIndex - 1].term : 0
const message: RaftMessage = {
type: RaftMessageType.APPEND_ENTRIES,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: followerId,
data: {
entries,
prevLogIndex,
prevLogTerm,
leaderCommit: this.commitIndex
}
}
// Simulate sending message and getting response
const response = await this.simulateMessageExchange(message)
if (response && response.data?.success) {
const nextIdx = (this.nextIndex.get(followerId) || 1) + entries.length
this.nextIndex.set(followerId, nextIdx)
this.matchIndex.set(followerId, nextIdx - 1)
// Try to commit entries
this.updateCommitIndex()
return true
} else {
// Decrement nextIndex and retry
this.nextIndex.set(followerId, Math.max(1, (this.nextIndex.get(followerId) || 1) - 1))
return false
}
}
private async simulateMessageExchange(message: RaftMessage): Promise<RaftMessage | null> {
// Simulate network delay and potential failure
await new Promise(resolve => setTimeout(resolve, Math.random() * 100 + 50))
// 90% success rate
if (Math.random() > 0.1) {
return {
type: RaftMessageType.APPEND_ENTRIES_RESPONSE,
term: message.term,
senderId: message.receiverId,
receiverId: message.senderId,
data: { success: true }
}
}
return null
}
// Handle incoming messages
handleMessage(message: RaftMessage): void {
if (message.term > this.currentTerm) {
this.currentTerm = message.term
this.votedFor = null
this.state = NodeState.FOLLOWER
this.leaderId = null
}
switch (message.type) {
case RaftMessageType.APPEND_ENTRIES:
this.handleAppendEntries(message)
break
case RaftMessageType.APPEND_ENTRIES_RESPONSE:
this.handleAppendEntriesResponse(message)
break
case RaftMessageType.REQUEST_VOTE:
this.handleRequestVote(message)
break
case RaftMessageType.REQUEST_VOTE_RESPONSE:
this.handleRequestVoteResponse(message)
break
case RaftMessageType.HEARTBEAT:
this.handleHeartbeat(message)
break
}
}
private handleAppendEntries(message: RaftMessage): void {
const { entries, prevLogIndex, prevLogTerm, leaderCommit } = message.data || {}
// Check if we can accept the entries
if (prevLogIndex > 0) {
if (this.log.length < prevLogIndex || this.log[prevLogIndex - 1].term !== prevLogTerm) {
// Reject append entries
this.sendMessage({
type: RaftMessageType.APPEND_ENTRIES_RESPONSE,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: message.senderId,
data: { success: false }
})
return
}
}
// Accept and append entries
this.log = this.log.slice(0, prevLogIndex)
this.log.push(...entries)
// Update commit index
if (leaderCommit > this.commitIndex) {
this.commitIndex = Math.min(leaderCommit, this.log.length)
}
// Reset election timer
this.lastHeartbeat = Date.now()
this.leaderId = message.senderId
// Send success response
this.sendMessage({
type: RaftMessageType.APPEND_ENTRIES_RESPONSE,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: message.senderId,
data: { success: true }
})
console.log(`[RAFT-${this.nodeId}] Appended ${entries.length} entries from leader ${message.senderId}`)
}
private handleHeartbeat(message: RaftMessage): void {
this.lastHeartbeat = Date.now()
this.leaderId = message.senderId
this.state = NodeState.FOLLOWER
}
private handleRequestVote(message: RaftMessage): void {
const { candidateId, lastLogIndex, lastLogTerm } = message.data || {}
let voteGranted = false
if (this.votedFor === null || this.votedFor === candidateId) {
// Check if candidate's log is at least as up-to-date as ours
const ourLastLog = this.log[this.log.length - 1]
const ourLastTerm = ourLastLog ? ourLastLog.term : 0
const ourLastIndex = this.log.length
if (message.term > ourLastTerm ||
(message.term === ourLastTerm && lastLogIndex >= ourLastIndex)) {
voteGranted = true
this.votedFor = candidateId
}
}
this.sendMessage({
type: RaftMessageType.REQUEST_VOTE_RESPONSE,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: message.senderId,
data: { voteGranted }
})
console.log(`[RAFT-${this.nodeId}] Voted for ${candidateId}: ${voteGranted}`)
}
private handleAppendEntriesResponse(message: RaftMessage): void {
if (this.state !== NodeState.LEADER) return
const { success } = message.data || {}
if (success) {
// Update follower's match index
const nextIdx = (this.nextIndex.get(message.senderId) || 1)
this.matchIndex.set(message.senderId, nextIdx - 1)
} else {
// Decrement next index and retry
this.nextIndex.set(message.senderId, Math.max(1, (this.nextIndex.get(message.senderId) || 1) - 1))
}
}
private handleRequestVoteResponse(message: RaftMessage): void {
if (this.state !== NodeState.CANDIDATE) return
const { voteGranted } = message.data || {}
if (voteGranted) {
// Count votes (simplified - in real implementation, track votes)
console.log(`[RAFT-${this.nodeId}] Received vote from ${message.senderId}`)
}
}
private updateCommitIndex(): void {
for (let n = this.log.length; n > this.commitIndex; n--) {
let count = 0
for (const nodeId of this.clusterNodes) {
if (nodeId === this.nodeId) {
count++
} else {
const matchIdx = this.matchIndex.get(nodeId) || 0
if (matchIdx >= n) count++
}
}
if (count >= Math.floor(this.clusterNodes.length / 2) + 1) {
this.commitIndex = n
console.log(`[RAFT-${this.nodeId}] Committed log entry ${n}`)
break
}
}
}
private startElectionTimer(): void {
if (this.electionTimer) {
clearTimeout(this.electionTimer)
}
this.electionTimer = setTimeout(() => {
this.startElection()
}, this.electionTimeout)
}
private startElection(): void {
console.log(`[RAFT-${this.nodeId}] Starting election for term ${this.currentTerm + 1}`)
this.currentTerm++
this.state = NodeState.CANDIDATE
this.votedFor = this.nodeId
// Request votes from all nodes
const lastLog = this.log[this.log.length - 1]
const lastLogIndex = this.log.length
const lastLogTerm = lastLog ? lastLog.term : 0
for (const nodeId of this.clusterNodes) {
if (nodeId === this.nodeId) continue
this.sendMessage({
type: RaftMessageType.REQUEST_VOTE,
term: this.currentTerm,
senderId: this.nodeId,
receiverId: nodeId,
data: {
candidateId: this.nodeId,
lastLogIndex,
lastLogTerm
}
})
}
// Restart election timer
this.startElectionTimer()
}
private becomeLeader(): void {
console.log(`[RAFT-${this.nodeId}] Became leader for term ${this.currentTerm}`)
this.state = NodeState.LEADER
this.leaderId = this.nodeId
// Initialize leader state
for (const nodeId of this.clusterNodes) {
if (nodeId !== this.nodeId) {
this.nextIndex.set(nodeId, this.log.length + 1)
this.matchIndex.set(nodeId, 0)
}
}
// Start sending heartbeats
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
}
this.heartbeatTimer = setInterval(() => {
if (this.state === NodeState.LEADER) {
this.sendHeartbeats()
}
}, this.heartbeatInterval)
}
private sendHeartbeats(): void {
for (const nodeId of this.clusterNodes) {
if (nodeId !== this.nodeId) {
this.sendHeartbeat(nodeId)
}
}
}
private sendMessage(message: RaftMessage): void {
this.messageHandler(message)
}
getState(): {
state: NodeState
term: number
leaderId: string | null
logLength: number
commitIndex: number
} {
return {
state: this.state,
term: this.currentTerm,
leaderId: this.leaderId,
logLength: this.log.length,
commitIndex: this.commitIndex
}
}
}
// PBFT (Practical Byzantine Fault Tolerance) Implementation
class PBFTNode {
private view: number = 0
private sequenceNumber: number = 0
private primary: string
private replicas: string[]
private log: Map<string, PBFTMessage[]> = new Map()
private prepared: Map<string, Set<string>> = new Map()
private committed: Map<string, Set<string>> = new Map()
private pendingRequests: Map<string, any> = new Map()
constructor(
public nodeId: string,
public clusterNodes: string[],
private f: number = 1 // Number of Byzantine faults tolerated
) {
this.replicas = [...clusterNodes]
this.primary = this.replicas[0]
}
async clientRequest(operation: any): Promise<{
success: boolean
result?: any
message?: string
}> {
const digest = this.calculateDigest(operation)
const requestId = `req_${Date.now()}_${Math.random()}`
this.pendingRequests.set(requestId, operation)
if (this.nodeId === this.primary) {
// Primary starts the PBFT protocol
await this.startPrePrepare(digest, operation, requestId)
} else {
// Forward to primary
console.log(`[PBFT-${this.nodeId}] Forwarding request to primary ${this.primary}`)
}
// Wait for completion (simplified)
return new Promise((resolve) => {
setTimeout(() => {
const committed = this.committed.get(digest)
if (committed && committed.size >= 2 * this.f + 1) {
resolve({
success: true,
result: `Operation ${operation} executed successfully`,
message: 'PBFT consensus reached'
})
} else {
resolve({
success: false,
message: 'PBFT consensus not reached'
})
}
}, 2000)
})
}
private async startPrePrepare(digest: string, operation: any, requestId: string): Promise<void> {
const message: PBFTMessage = {
type: PBFTMessageType.PRE_PREPARE,
view: this.view,
sequenceNumber: ++this.sequenceNumber,
senderId: this.nodeId,
digest,
data: { operation, requestId }
}
console.log(`[PBFT-${this.nodeId}] Sending PRE-PREPARE for sequence ${this.sequenceNumber}`)
// Broadcast to all replicas
for (const replicaId of this.replicas) {
if (replicaId !== this.nodeId) {
await this.sendPBFTMessage(replicaId, message)
}
}
// Store in log
this.addToLog(digest, message)
}
async handlePBFTMessage(message: PBFTMessage): Promise<void> {
const logKey = `${message.view}_${message.sequenceNumber}_${message.digest}`
switch (message.type) {
case PBFTMessageType.PRE_PREPARE:
await this.handlePrePrepare(message, logKey)
break
case PBFTMessageType.PREPARE:
await this.handlePrepare(message, logKey)
break
case PBFTMessageType.COMMIT:
await this.handleCommit(message, logKey)
break
case PBFTMessageType.VIEW_CHANGE:
await this.handleViewChange(message)
break
}
}
private async handlePrePrepare(message: PBFTMessage, logKey: string): Promise<void> {
if (this.nodeId === this.primary) {
console.log(`[PBFT-${this.nodeId}] Ignoring own PRE-PREPARE`)
return
}
console.log(`[PBFT-${this.nodeId}] Received PRE-PREPARE from primary ${message.senderId}`)
// Validate pre-prepare message
if (!this.validatePrePrepare(message)) {
console.log(`[PBFT-${this.nodeId}] Invalid PRE-PREPARE message`)
return
}
// Store and broadcast prepare
this.addToLog(message.digest, message)
const prepareMessage: PBFTMessage = {
type: PBFTMessageType.PREPARE,
view: message.view,
sequenceNumber: message.sequenceNumber,
senderId: this.nodeId,
digest: message.digest
}
// Broadcast prepare to all replicas
for (const replicaId of this.replicas) {
if (replicaId !== this.nodeId) {
await this.sendPBFTMessage(replicaId, prepareMessage)
}
}
}
private async handlePrepare(message: PBFTMessage, logKey: string): Promise<void> {
console.log(`[PBFT-${this.nodeId}] Received PREPARE from ${message.senderId}`)
// Add to prepared set
if (!this.prepared.has(message.digest)) {
this.prepared.set(message.digest, new Set())
}
this.prepared.get(message.digest)!.add(message.senderId)
// Check if we have 2f prepares
const prepares = this.prepared.get(message.digest)!
if (prepares.size >= 2 * this.f) {
console.log(`[PBFT-${this.nodeId}] Quorum reached for PREPARE phase`)
// Move to commit phase
const commitMessage: PBFTMessage = {
type: PBFTMessageType.COMMIT,
view: message.view,
sequenceNumber: message.sequenceNumber,
senderId: this.nodeId,
digest: message.digest
}
// Broadcast commit
for (const replicaId of this.replicas) {
if (replicaId !== this.nodeId) {
await this.sendPBFTMessage(replicaId, commitMessage)
}
}
}
}
private async handleCommit(message: PBFTMessage, logKey: string): Promise<void> {
console.log(`[PBFT-${this.nodeId}] Received COMMIT from ${message.senderId}`)
// Add to committed set
if (!this.committed.has(message.digest)) {
this.committed.set(message.digest, new Set())
}
this.committed.get(message.digest)!.add(message.senderId)
// Check if we have 2f+1 commits
const commits = this.committed.get(message.digest)!
if (commits.size >= 2 * this.f + 1) {
console.log(`[PBFT-${this.nodeId}] Quorum reached for COMMIT phase - EXECUTING`)
// Execute the operation
const logMessages = this.log.get(message.digest) || []
const prePrepare = logMessages.find(m => m.type === PBFTMessageType.PRE_PREPARE)
if (prePrepare && prePrepare.data?.operation) {
this.executeOperation(prePrepare.data.operation)
}
}
}
private async handleViewChange(message: PBFTMessage): Promise<void> {
console.log(`[PBFT-${this.nodeId}] Received VIEW_CHANGE from ${message.senderId}`)
// Simplified view change handling
// In real implementation, would collect view-change messages and select new primary
if (this.view < message.view) {
this.view = message.view
this.primary = this.replicas[this.view % this.replicas.length]
console.log(`[PBFT-${this.nodeId}] View changed to ${this.view}, new primary: ${this.primary}`)
}
}
private validatePrePrepare(message: PBFTMessage): boolean {
// Simplified validation
return message.senderId === this.primary && message.view === this.view
}
private addToLog(digest: string, message: PBFTMessage): void {
if (!this.log.has(digest)) {
this.log.set(digest, [])
}
this.log.get(digest)!.push(message)
}
private executeOperation(operation: any): void {
console.log(`[PBFT-${this.nodeId}] Executing operation: ${operation}`)
// In real implementation, would execute the actual operation
}
private calculateDigest(data: any): string {
// Simple hash function for demo
return btoa(JSON.stringify(data)).slice(0, 16)
}
private async sendPBFTMessage(targetId: string, message: PBFTMessage): Promise<void> {
// Simulate network delay
await new Promise(resolve => setTimeout(resolve, Math.random() * 100 + 50))
console.log(`[PBFT-${this.nodeId}] Sent ${message.type} to ${targetId}`)
// In real implementation, would send over network
}
getViewChange(): void {
console.log(`[PBFT-${this.nodeId}] Initiating view change`)
this.view++
const viewChangeMessage: PBFTMessage = {
type: PBFTMessageType.VIEW_CHANGE,
view: this.view,
sequenceNumber: 0,
senderId: this.nodeId,
digest: ''
}
// Broadcast view change
for (const replicaId of this.replicas) {
if (replicaId !== this.nodeId) {
this.sendPBFTMessage(replicaId, viewChangeMessage)
}
}
}
getConsensusStats(): {
view: number
sequenceNumber: number
primary: string
preparedCount: number
committedCount: number
} {
return {
view: this.view,
sequenceNumber: this.sequenceNumber,
primary: this.primary,
preparedCount: this.prepared.size,
committedCount: this.committed.size
}
}
}
// Consensus Performance Analyzer
class ConsensusAnalyzer {
private metrics: Map<string, Array<{
algorithm: string
operation: string
latency: number
success: boolean
timestamp: Date
nodes: number
}>> = new Map()
recordMetric(
algorithm: string,
operation: string,
latency: number,
success: boolean,
nodeCount: number
): void {
const key = `${algorithm}_${operation}`
if (!this.metrics.has(key)) {
this.metrics.set(key, [])
}
this.metrics.get(key)!.push({
algorithm,
operation,
latency,
success,
timestamp: new Date(),
nodes: nodeCount
})
}
compareAlgorithms(nodeCounts: number[]): {
raft: { avgLatency: number; throughput: number; successRate: number }
pbft: { avgLatency: number; throughput: number; successRate: number }
recommendation: string
} {
const raftMetrics = this.getAlgorithmMetrics('RAFT', nodeCounts)
const pbftMetrics = this.getAlgorithmMetrics('PBFT', nodeCounts)
let recommendation = ''
if (raftMetrics.avgLatency < pbftMetrics.avgLatency) {
recommendation = 'Raft shows better performance for this configuration'
} else {
recommendation = 'PBFT provides better fault tolerance despite higher latency'
}
if (nodeCounts.some(n => n > 10)) {
recommendation += '. Consider Raft for larger clusters due to lower overhead'
} else {
recommendation += '. PBFT is suitable for smaller clusters requiring Byzantine fault tolerance'
}
return {
raft: raftMetrics,
pbft: pbftMetrics,
recommendation
}
}
private getAlgorithmMetrics(algorithm: string, nodeCounts: number[]): {
avgLatency: number
throughput: number
successRate: number
} {
let totalLatency = 0
let totalOperations = 0
let successfulOperations = 0
for (const nodeCount of nodeCounts) {
const key = `${algorithm}_WRITE`
const metrics = this.metrics.get(key) || []
const relevantMetrics = metrics.filter(m => m.nodes === nodeCount)
for (const metric of relevantMetrics) {
totalLatency += metric.latency
totalOperations++
if (metric.success) successfulOperations++
}
}
return {
avgLatency: totalOperations > 0 ? totalLatency / totalOperations : 0,
throughput: totalOperations > 0 ? (successfulOperations / totalOperations) * 100 : 0,
successRate: totalOperations > 0 ? (successfulOperations / totalOperations) * 100 : 0
}
}
}
// Usage Example
async function consensusAlgorithmsExample() {
console.log('=== Consensus Algorithms (Raft & PBFT) ===\n')
// Setup Raft cluster
console.log('1. Setting up Raft Cluster')
const raftNodes: RaftNode[] = []
const raftMessageHandler = (message: RaftMessage) => {
const targetNode = raftNodes.find(n => n.nodeId === message.receiverId)
if (targetNode) {
targetNode.handleMessage(message)
}
}
for (let i = 1; i <= 5; i++) {
const node = new RaftNode(`raft-node-${i}`,
['raft-node-1', 'raft-node-2', 'raft-node-3', 'raft-node-4', 'raft-node-5'],
raftMessageHandler
)
raftNodes.push(node)
}
// Wait for election to complete
await new Promise(resolve => setTimeout(resolve, 10000))
console.log('Raft cluster states:')
raftNodes.forEach(node => {
const state = node.getState()
console.log(` ${node.nodeId}: ${state.state} (term: ${state.term}, leader: ${state.leaderId})`)
})
// Test Raft operations
console.log('\n2. Testing Raft Operations')
const leaderNode = raftNodes.find(n => n.getState().state === NodeState.LEADER)
if (leaderNode) {
const raftStartTime = Date.now()
const raftResult = await leaderNode.clientRequest({ action: 'set', key: 'test', value: 'raft_value' })
const raftLatency = Date.now() - raftStartTime
console.log(`Raft result: ${JSON.stringify(raftResult)}, Latency: ${raftLatency}ms`)
} else {
console.log('No Raft leader elected yet')
}
// Setup PBFT cluster
console.log('\n3. Setting up PBFT Cluster')
const pbftNodes: PBFTNode[] = []
const pbftNodesList = ['pbft-node-1', 'pbft-node-2', 'pbft-node-3', 'pbft-node-4']
for (const nodeId of pbftNodesList) {
const node = new PBFTNode(nodeId, pbftNodesList, 1) // f=1, can tolerate 1 Byzantine fault
pbftNodes.push(node)
}
console.log('PBFT cluster stats:')
pbftNodes.forEach(node => {
const stats = node.getConsensusStats()
console.log(` ${node.nodeId}: view=${stats.view}, primary=${stats.primary}`)
})
// Test PBFT operations
console.log('\n4. Testing PBFT Operations')
const primaryNode = pbftNodes[0] // First node is primary in view 0
const pbftStartTime = Date.now()
const pbftResult = await primaryNode.clientRequest('transfer_amount_100')
const pbftLatency = Date.now() - pbftStartTime
console.log(`PBFT result: ${JSON.stringify(pbftResult)}, Latency: ${pbftLatency}ms`)
// Simulate message passing between PBFT nodes
for (const sender of pbftNodes) {
for (const receiver of pbftNodes) {
if (sender.nodeId !== receiver.nodeId) {
// Simulate some messages
if (Math.random() > 0.7) {
const message: PBFTMessage = {
type: PBFTMessageType.PREPARE,
view: 0,
sequenceNumber: 1,
senderId: sender.nodeId,
digest: 'test_digest'
}
await receiver.handlePBFTMessage(message)
}
}
}
}
// Performance comparison
console.log('\n5. Performance Analysis')
const analyzer = new ConsensusAnalyzer()
// Add sample metrics
for (let i = 0; i < 20; i++) {
analyzer.recordMetric('RAFT', 'WRITE', Math.random() * 200 + 100, Math.random() > 0.1, 5)
analyzer.recordMetric('PBFT', 'WRITE', Math.random() * 400 + 200, Math.random() > 0.05, 4)
}
const comparison = analyzer.compareAlgorithms([5])
console.log('\nPerformance Comparison:')
console.log(`Raft - Latency: ${comparison.raft.avgLatency.toFixed(2)}ms, Success Rate: ${comparison.raft.successRate.toFixed(1)}%`)
console.log(`PBFT - Latency: ${comparison.pbft.avgLatency.toFixed(2)}ms, Success Rate: ${comparison.pbft.successRate.toFixed(1)}%`)
console.log(`\nRecommendation: ${comparison.recommendation}`)
// Final cluster states
console.log('\n6. Final Cluster States')
console.log('\nRaft cluster final states:')
raftNodes.forEach(node => {
const state = node.getState()
console.log(` ${node.nodeId}: ${state.state} (log: ${state.logLength}, committed: ${state.commitIndex})`)
})
console.log('\nPBFT cluster final stats:')
pbftNodes.forEach(node => {
const stats = node.getConsensusStats()
console.log(` ${node.nodeId}: prepared=${stats.preparedCount}, committed=${stats.committedCount}`)
})
}
export {
NodeState,
RaftMessageType,
PBFTMessageType,
RaftNode,
PBFTNode,
ConsensusAnalyzer,
consensusAlgorithmsExample
}
💻 Sharding y Particionamiento
🔴 complex
⭐⭐⭐⭐⭐
Implementando estrategias de sharding y particionamiento de datos para sistemas distribuidos escalables
⏱️ 85 min
🏷️ sharding, partitioning, consistent-hashing, load-balancing
Prerequisites:
Distributed databases knowledge, Load balancing concepts
// Sharding and Partitioning Implementation
// Partitioning Strategies
enum PartitioningStrategy {
HASH = 'HASH',
RANGE = 'RANGE',
CONSISTENT_HASH = 'CONSISTENT_HASH',
DIRECTORY = 'DIRECTORY',
GEOGRAPHIC = 'GEOGRAPHIC'
}
interface Shard {
id: string
nodeId: string
range?: {
start: any
end: any
}
hashRange?: {
start: number
end: number
}
keyRange?: string[]
地理位置?: {
region: string
country?: string
city?: string
}
status: 'ACTIVE' | 'MIGRATING' | 'OFFLINE'
weight: number
lastHealthCheck: Date
}
interface PartitionMetadata {
shardId: string
key: string
partitionKey: any
nodeId: string
createdAt: Date
lastAccessed: Date
accessCount: number
}
// Hash-based Sharding
class HashShardingManager {
private shards: Map<string, Shard> = new Map()
private shardRing: number[] = []
private virtualNodes: number = 150 // Virtual nodes per physical node
constructor(private nodes: string[]) {
this.initializeShards()
}
private initializeShards(): void {
for (const nodeId of this.nodes) {
this.addNode(nodeId)
}
}
addNode(nodeId: string): void {
console.log(`[HASH] Adding node ${nodeId} to shard ring`)
// Create virtual nodes for better distribution
for (let i = 0; i < this.virtualNodes; i++) {
const virtualNodeId = `${nodeId}_${i}`
const hash = this.hash(`${virtualNodeId}`)
this.shards.set(virtualNodeId, {
id: virtualNodeId,
nodeId,
hashRange: { start: hash, end: hash },
status: 'ACTIVE',
weight: 1,
lastHealthCheck: new Date()
})
this.shardRing.push(hash)
}
this.shardRing.sort((a, b) => a - b)
}
removeNode(nodeId: string): void {
console.log(`[HASH] Removing node ${nodeId} from shard ring`)
// Remove all virtual nodes for this physical node
const toRemove: string[] = []
for (const [virtualNodeId, shard] of this.shards.entries()) {
if (shard.nodeId === nodeId) {
toRemove.push(virtualNodeId)
}
}
for (const virtualNodeId of toRemove) {
const shard = this.shards.get(virtualNodeId)!
this.shardRing = this.shardRing.filter(hash => hash !== shard.hashRange!.start)
this.shards.delete(virtualNodeId)
}
console.log(`[HASH] Removed ${toRemove.length} virtual nodes for ${nodeId}`)
}
getShard(key: string): Shard | null {
const keyHash = this.hash(key)
// Find the first shard with hash >= keyHash
for (const hash of this.shardRing) {
if (hash >= keyHash) {
const shard = this.findShardByHash(hash)
if (shard) return shard
}
}
// Wrap around to the first shard
if (this.shardRing.length > 0) {
const shard = this.findShardByHash(this.shardRing[0])
if (shard) return shard
}
return null
}
private findShardByHash(hash: number): Shard | null {
for (const shard of this.shards.values()) {
if (shard.hashRange && shard.hashRange.start === hash) {
return shard
}
}
return null
}
private hash(input: string): number {
// Simple hash function for demo
let hash = 0
for (let i = 0; i < input.length; i++) {
const char = input.charCodeAt(i)
hash = ((hash << 5) - hash) + char
hash = hash & hash // Convert to 32-bit integer
}
return Math.abs(hash)
}
getDistribution(): Record<string, number> {
const distribution: Record<string, number> = {}
// Test with sample keys to see distribution
const testKeys = Array.from({ length: 10000 }, (_, i) => `test_key_${i}`)
for (const key of testKeys) {
const shard = this.getShard(key)
if (shard) {
distribution[shard.nodeId] = (distribution[shard.nodeId] || 0) + 1
}
}
return distribution
}
}
// Range-based Sharding
class RangeShardingManager {
private shards: Map<string, Shard> = new Map()
private ranges: Array<{ start: any; end: any; shardId: string }> = []
constructor() {
this.initializeRanges()
}
private initializeRanges(): void {
// Example: User ID ranges
const ranges = [
{ start: 0, end: 1000000, nodeId: 'node-1', shardId: 'shard-1' },
{ start: 1000001, end: 2000000, nodeId: 'node-2', shardId: 'shard-2' },
{ start: 2000001, end: 3000000, nodeId: 'node-3', shardId: 'shard-3' },
{ start: 3000001, end: 4000000, nodeId: 'node-4', shardId: 'shard-4' },
{ start: 4000001, end: Number.MAX_SAFE_INTEGER, nodeId: 'node-5', shardId: 'shard-5' }
]
for (const range of ranges) {
this.shards.set(range.shardId, {
id: range.shardId,
nodeId: range.nodeId,
range: { start: range.start, end: range.end },
status: 'ACTIVE',
weight: 1,
lastHealthCheck: new Date()
})
this.ranges.push({
start: range.start,
end: range.end,
shardId: range.shardId
})
}
this.ranges.sort((a, b) => a.start - b.start)
}
getShard(key: any): Shard | null {
const keyValue = typeof key === 'number' ? key : parseInt(key)
for (const range of this.ranges) {
if (keyValue >= range.start && keyValue <= range.end) {
return this.shards.get(range.shardId) || null
}
}
return null
}
addRange(start: any, end: any, nodeId: string): void {
const shardId = `shard_${Date.now()}`
this.shards.set(shardId, {
id: shardId,
nodeId,
range: { start, end },
status: 'ACTIVE',
weight: 1,
lastHealthCheck: new Date()
})
this.ranges.push({ start, end, shardId })
this.ranges.sort((a, b) => a.start - b.start)
console.log(`[RANGE] Added range [${start}, ${end}] to node ${nodeId}`)
}
splitRange(shardId: string, splitPoint: any, newNodeId: string): void {
const shard = this.shards.get(shardId)
if (!shard || !shard.range) {
throw new Error(`Shard ${shardId} not found or has no range`)
}
const { start, end } = shard.range
// Remove old range
this.ranges = this.ranges.filter(r => r.shardId !== shardId)
this.shards.delete(shardId)
// Create two new ranges
this.addRange(start, splitPoint, shard.nodeId)
this.addRange(splitPoint + 1, end, newNodeId)
console.log(`[RANGE] Split shard ${shardId} at ${splitPoint}`)
}
mergeRanges(shardId1: string, shardId2: string): void {
const shard1 = this.shards.get(shardId1)
const shard2 = this.shards.get(shardId2)
if (!shard1 || !shard2 || !shard1.range || !shard2.range) {
throw new Error('Both shards must exist and have ranges')
}
const newStart = Math.min(shard1.range.start, shard2.range.start)
const newEnd = Math.max(shard1.range.end, shard2.range.end)
// Remove old ranges
this.ranges = this.ranges.filter(r => r.shardId !== shardId1 && r.shardId !== shardId2)
this.shards.delete(shardId1)
this.shards.delete(shardId2)
// Create merged range
this.addRange(newStart, newEnd, shard1.nodeId)
console.log(`[RANGE] Merged shards ${shardId1} and ${shardId2}`)
}
getRanges(): Array<{ shardId: string; nodeId: string; start: any; end: any }> {
return this.ranges.map(range => {
const shard = this.shards.get(range.shardId)!
return {
shardId: range.shardId,
nodeId: shard.nodeId,
start: range.start,
end: range.end
}
})
}
}
// Consistent Hash Ring Implementation
class ConsistentHashRing {
private ring: number[] = []
private nodes: Map<number, string> = new Map()
private virtualNodes: number = 100
private replicas: number = 3
constructor(nodes: string[], virtualNodes: number = 100) {
this.virtualNodes = virtualNodes
this.initialize(nodes)
}
private initialize(nodes: string[]): void {
for (const node of nodes) {
this.addNode(node)
}
}
addNode(node: string): void {
for (let i = 0; i < this.virtualNodes; i++) {
const virtualNodeKey = `${node}#${i}`
const hash = this.md5(virtualNodeKey)
this.ring.push(hash)
this.nodes.set(hash, node)
}
this.ring.sort((a, b) => a - b)
console.log(`[CONSISTENT_HASH] Added node ${node} with ${this.virtualNodes} virtual nodes`)
}
removeNode(node: string): void {
const toRemove: number[] = []
for (const [hash, nodeId] of this.nodes.entries()) {
if (nodeId === node) {
toRemove.push(hash)
}
}
for (const hash of toRemove) {
this.ring = this.ring.filter(h => h !== hash)
this.nodes.delete(hash)
}
console.log(`[CONSISTENT_HASH] Removed node ${node} and ${toRemove.length} virtual nodes`)
}
getNode(key: string): string | null {
if (this.ring.length === 0) return null
const hash = this.md5(key)
// Find first node with hash >= key hash
for (let i = 0; i < this.ring.length; i++) {
if (this.ring[i] >= hash) {
return this.nodes.get(this.ring[i]) || null
}
}
// Wrap around to first node
return this.nodes.get(this.ring[0]) || null
}
getNodesForKey(key: string, count: number = this.replicas): string[] {
const nodes: string[] = []
const hash = this.md5(key)
let startIndex = 0
for (let i = 0; i < this.ring.length; i++) {
if (this.ring[i] >= hash) {
startIndex = i
break
}
}
// Get next N unique nodes
const visitedNodes = new Set<string>()
let index = startIndex
while (nodes.length < count && visitedNodes.size < this.nodes.size) {
const nodeHash = this.ring[index % this.ring.length]
const node = this.nodes.get(nodeHash)
if (node && !visitedNodes.has(node)) {
nodes.push(node)
visitedNodes.add(node)
}
index++
}
return nodes
}
private md5(input: string): number {
// Simplified hash function - in real implementation use crypto
let hash = 0
for (let i = 0; i < input.length; i++) {
const char = input.charCodeAt(i)
hash = ((hash << 5) - hash) + char
hash = hash & hash
}
return Math.abs(hash) % 1000000 // Modulo to keep numbers manageable
}
getDistribution(): Record<string, number> {
const distribution: Record<string, number> = {}
const testKeys = Array.from({ length: 10000 }, (_, i) => `test_key_${i}`)
for (const key of testKeys) {
const node = this.getNode(key)
if (node) {
distribution[node] = (distribution[node] || 0) + 1
}
}
return distribution
}
getRingInfo(): { totalNodes: number; totalVirtualNodes: number; balanceScore: number } {
const distribution = this.getDistribution()
const values = Object.values(distribution)
const avg = values.reduce((sum, val) => sum + val, 0) / values.length
const variance = values.reduce((sum, val) => sum + Math.pow(val - avg, 2), 0) / values.length
const balanceScore = 1 - (Math.sqrt(variance) / avg) // Lower variance = higher score
return {
totalNodes: new Set(this.nodes.values()).size,
totalVirtualNodes: this.ring.length,
balanceScore: Math.max(0, balanceScore)
}
}
}
// Directory-based Sharding
class DirectoryBasedSharding {
private directory: Map<string, PartitionMetadata> = new Map()
private shards: Map<string, Shard> = new Map()
private loadBalancer: ShardingLoadBalancer
constructor(nodes: string[]) {
this.loadBalancer = new ShardingLoadBalancer(nodes)
this.initializeShards(nodes)
}
private initializeShards(nodes: string[]): void {
for (const nodeId of nodes) {
this.shards.set(`shard_${nodeId}`, {
id: `shard_${nodeId}`,
nodeId,
status: 'ACTIVE',
weight: 1,
lastHealthCheck: new Date()
})
}
}
async assignPartition(key: string, partitionKey: any): Promise<string> {
// Check if key already assigned
if (this.directory.has(key)) {
return this.directory.get(key)!.shardId
}
// Select shard based on load balancing
const shardId = await this.loadBalancer.selectShard(partitionKey)
const shard = this.shards.get(shardId)
if (!shard || shard.status !== 'ACTIVE') {
throw new Error('No active shard available')
}
const metadata: PartitionMetadata = {
shardId,
key,
partitionKey,
nodeId: shard.nodeId,
createdAt: new Date(),
lastAccessed: new Date(),
accessCount: 0
}
this.directory.set(key, metadata)
console.log(`[DIRECTORY] Assigned key ${key} to shard ${shardId} (node: ${shard.nodeId})`)
return shardId
}
getShard(key: string): Shard | null {
const metadata = this.directory.get(key)
if (!metadata) return null
return this.shards.get(metadata.shardId) || null
}
async rebalance(): Promise<{
migratedPartitions: number
newDistribution: Record<string, number>
}> {
console.log('[DIRECTORY] Starting rebalancing...')
const currentLoad = this.calculateLoad()
const targetLoad = Object.keys(currentLoad).length * 0.2 // Target 20% per node
const toMigrate: Array<{ key: string; fromShard: string; toShard: string }> = []
let migratedCount = 0
// Find overloaded shards
for (const [shardId, load] of Object.entries(currentLoad)) {
if (load > targetLoad * 1.2) { // 20% threshold
const underloadedShard = this.findUnderloadedShard(currentLoad, targetLoad)
if (underloadedShard) {
// Find keys to migrate (least recently accessed)
const shardKeys = Array.from(this.directory.entries())
.filter(([_, metadata]) => metadata.shardId === shardId)
.sort((a, b) => a[1].lastAccessed.getTime() - b[1].lastAccessed.getTime())
.slice(0, Math.floor((load - targetLoad) / 2))
for (const [key, metadata] of shardKeys) {
toMigrate.push({
key,
fromShard: shardId,
toShard: underloadedShard
})
if (toMigrate.length >= 10) break // Limit migrations per rebalance
}
}
}
}
// Execute migrations
for (const migration of toMigrate) {
const metadata = this.directory.get(migration.key)!
metadata.shardId = migration.toShard
metadata.nodeId = this.shards.get(migration.toShard)!.nodeId
migratedCount++
console.log(`[DIRECTORY] Migrated key ${migration.key} from ${migration.fromShard} to ${migration.toShard}`)
}
const newDistribution = this.calculateLoad()
return {
migratedPartitions: migratedCount,
newDistribution
}
}
private calculateLoad(): Record<string, number> {
const load: Record<string, number> = {}
for (const metadata of this.directory.values()) {
load[metadata.shardId] = (load[metadata.shardId] || 0) + 1
}
return load
}
private findUnderloadedShard(currentLoad: Record<string, number>, targetLoad: number): string | null {
for (const [shardId, load] of Object.entries(currentLoad)) {
if (load < targetLoad * 0.8) { // 20% threshold
return shardId
}
}
return null
}
getDirectoryStats(): {
totalPartitions: number
distribution: Record<string, number>
hotKeys: Array<{ key: string; accessCount: number; shardId: string }>
} {
const distribution = this.calculateLoad()
const totalPartitions = this.directory.size
// Find hot keys (high access count)
const hotKeys = Array.from(this.directory.entries())
.sort((a, b) => b[1].accessCount - a[1].accessCount)
.slice(0, 10)
.map(([key, metadata]) => ({
key,
accessCount: metadata.accessCount,
shardId: metadata.shardId
}))
return {
totalPartitions,
distribution,
hotKeys
}
}
recordAccess(key: string): void {
const metadata = this.directory.get(key)
if (metadata) {
metadata.lastAccessed = new Date()
metadata.accessCount++
}
}
}
// Load Balancer for Sharding
class ShardingLoadBalancer {
private nodeLoads: Map<string, { connections: number; cpu: number; memory: number }> = new Map()
constructor(nodes: string[]) {
for (const node of nodes) {
this.nodeLoads.set(node, { connections: 0, cpu: 0, memory: 0 })
}
}
async selectShard(partitionKey: any): Promise<string> {
// Find least loaded node
let bestNode = ''
let minScore = Infinity
for (const [nodeId, load] of this.nodeLoads.entries()) {
const score = load.connections * 0.4 + load.cpu * 0.3 + load.memory * 0.3
if (score < minScore) {
minScore = score
bestNode = nodeId
}
}
return `shard_${bestNode}`
}
updateNodeLoad(nodeId: string, load: { connections: number; cpu: number; memory: number }): void {
this.nodeLoads.set(nodeId, load)
}
getLoadDistribution(): Record<string, { connections: number; cpu: number; memory: number }> {
const distribution: Record<string, { connections: number; cpu: number; memory: number }> = {}
for (const [nodeId, load] of this.nodeLoads.entries()) {
distribution[nodeId] = { ...load }
}
return distribution
}
}
// Cross-Shard Query Coordinator
class CrossShardQueryCoordinator {
constructor(
private hashSharding: HashShardingManager,
private rangeSharding: RangeShardingManager,
private directorySharding: DirectoryBasedSharding
) {}
async executeCrossShardQuery(
query: {
type: 'SELECT' | 'JOIN' | 'AGGREGATE'
tables: string[]
conditions?: Record<string, any>
joinKeys?: string[]
}
): Promise<{
results: any[]
shardsQueried: string[]
executionTime: number
}> {
const startTime = Date.now()
const shardsQueried: string[] = []
const results: any[] = []
console.log(`[QUERY] Executing cross-shard query: ${query.type} on ${query.tables.join(', ')}`)
// Determine which shards to query based on query type
switch (query.type) {
case 'SELECT':
await this.executeSelectQuery(query, shardsQueried, results)
break
case 'JOIN':
await this.executeJoinQuery(query, shardsQueried, results)
break
case 'AGGREGATE':
await this.executeAggregateQuery(query, shardsQueried, results)
break
}
const executionTime = Date.now() - startTime
return {
results,
shardsQueried: [...new Set(shardsQueried)],
executionTime
}
}
private async executeSelectQuery(
query: any,
shardsQueried: string[],
results: any[]
): Promise<void> {
// For demo, simulate querying all sharding strategies
const strategies = [
{ name: 'hash', manager: this.hashSharding },
{ name: 'range', manager: this.rangeSharding },
{ name: 'directory', manager: this.directorySharding }
]
for (const strategy of strategies) {
// Simulate finding relevant shards
const relevantShards = this.findRelevantShards(strategy, query.conditions)
for (const shardId of relevantShards) {
shardsQueried.push(`${strategy.name}_${shardId}`)
// Simulate querying the shard
const shardResults = await this.queryShard(shardId, query)
results.push(...shardResults)
}
}
}
private async executeJoinQuery(
query: any,
shardsQueried: string[],
results: any[]
): Promise<void> {
// Simplified join execution - in real implementation would be more complex
if (!query.joinKeys || query.joinKeys.length === 0) {
throw new Error('Join query requires join keys')
}
// For each join key, find which shards contain the data
for (const joinKey of query.joinKeys) {
const shard = this.hashSharding.getShard(joinKey)
if (shard) {
shardsQueried.push(`hash_${shard.id}`)
const shardResults = await this.queryShard(joinKey, query)
results.push(...shardResults)
}
}
}
private async executeAggregateQuery(
query: any,
shardsQueried: string[],
results: any[]
): Promise<void> {
// For aggregate queries, need to query all shards and combine results
const allShards = [
...Array.from(this.hashSharding.getDistribution().keys()),
...this.rangeSharding.getRanges().map(r => r.shardId)
]
for (const shardId of allShards) {
shardsQueried.push(shardId)
const shardResults = await this.queryShard(shardId, query)
results.push(...shardResults)
}
// Combine aggregate results
if (query.conditions?.aggregate === 'COUNT') {
const totalCount = results.reduce((sum, result) => sum + (result.count || 0), 0)
results.length = 0
results.push({ aggregate: 'COUNT', value: totalCount })
}
}
private findRelevantShards(strategy: any, conditions?: Record<string, any>): string[] {
// Simplified shard selection based on conditions
if (conditions?.id) {
const shard = strategy.manager.getShard?.(conditions.id)
return shard ? [shard.id] : []
}
// If no specific conditions, return all shards for the strategy
if (strategy.name === 'hash') {
return Object.keys(strategy.manager.getDistribution())
} else if (strategy.name === 'range') {
return strategy.manager.getRanges().map(r => r.shardId)
}
return []
}
private async queryShard(shardId: string, query: any): Promise<any[]> {
// Simulate network delay and query execution
await new Promise(resolve => setTimeout(resolve, Math.random() * 50 + 10))
// Generate mock results
const resultCount = Math.floor(Math.random() * 10) + 1
const results: any[] = []
for (let i = 0; i < resultCount; i++) {
results.push({
id: `${shardId}_${i}`,
shardId,
data: `Mock data from ${shardId}`,
timestamp: new Date()
})
}
return results
}
}
// Usage Example
async function shardingExample() {
console.log('=== Sharding and Partitioning Strategies ===\n')
// Test Hash Sharding
console.log('1. Hash-based Sharding')
const hashSharding = new HashShardingManager(['node-1', 'node-2', 'node-3', 'node-4'])
// Test key distribution
const testKeys = ['user_123', 'user_456', 'user_789', 'user_999', 'user_111']
console.log('\nKey distribution:')
testKeys.forEach(key => {
const shard = hashSharding.getShard(key)
console.log(` ${key} -> ${shard?.nodeId}`)
})
// Add and remove nodes
console.log('\nAdding node-5 to cluster...')
hashSharding.addNode('node-5')
console.log('\nRemoving node-2 from cluster...')
hashSharding.removeNode('node-2')
// Show distribution
const hashDistribution = hashSharding.getDistribution()
console.log('\nHash sharding distribution:')
Object.entries(hashDistribution).forEach(([node, count]) => {
console.log(` ${node}: ${count} keys`)
})
// Test Range Sharding
console.log('\n2. Range-based Sharding')
const rangeSharding = new RangeShardingManager()
const userIds = [500, 1500, 2500, 3500, 4500]
console.log('\nUser ID distribution:')
userIds.forEach(userId => {
const shard = rangeSharding.getShard(userId)
console.log(` User ${userId} -> ${shard?.nodeId}`)
})
// Split and merge ranges
console.log('\nSplitting shard-1 at 500000...')
rangeSharding.splitRange('shard-1', 500000, 'node-6')
console.log('\nCurrent ranges:')
const ranges = rangeSharding.getRanges()
ranges.forEach(range => {
console.log(` ${range.shardId} (${range.nodeId}): [${range.start}, ${range.end}]`)
})
// Test Consistent Hashing
console.log('\n3. Consistent Hash Ring')
const consistentHash = new ConsistentHashRing(['node-1', 'node-2', 'node-3'], 50)
const testKeys2 = ['key1', 'key2', 'key3', 'key4', 'key5']
console.log('\nKey assignment with replicas:')
testKeys2.forEach(key => {
const nodes = consistentHash.getNodesForKey(key, 2)
console.log(` ${key} -> [${nodes.join(', ')}]`)
})
// Add node and show minimal key movement
console.log('\nAdding node-4 and checking key movement...')
const beforeDistribution = consistentHash.getDistribution()
consistentHash.addNode('node-4')
const afterDistribution = consistentHash.getDistribution()
console.log('Distribution before adding node-4:', beforeDistribution)
console.log('Distribution after adding node-4:', afterDistribution)
const ringInfo = consistentHash.getRingInfo()
console.log(`\nRing info: ${ringInfo.totalNodes} nodes, ${ringInfo.totalVirtualNodes} virtual nodes, balance score: ${ringInfo.balanceScore.toFixed(3)}`)
// Test Directory-based Sharding
console.log('\n4. Directory-based Sharding')
const directorySharding = new DirectoryBasedSharding(['node-1', 'node-2', 'node-3'])
// Assign some partitions
const partitions = [
{ key: 'product_a', partitionKey: 'electronics' },
{ key: 'product_b', partitionKey: 'books' },
{ key: 'user_x', partitionKey: 'premium' },
{ key: 'user_y', partitionKey: 'basic' },
{ key: 'order_1', partitionKey: 'electronics' },
{ key: 'order_2', partitionKey: 'books' }
]
console.log('\nAssigning partitions:')
for (const partition of partitions) {
const shardId = await directorySharding.assignPartition(partition.key, partition.partitionKey)
const shard = directorySharding.getShard(partition.key)
console.log(` ${partition.key} -> ${shard?.nodeId}`)
}
// Simulate some access patterns
for (let i = 0; i < 10; i++) {
directorySharding.recordAccess('product_a') // Make this a hot key
directorySharding.recordAccess('order_1')
}
// Get directory stats
const stats = directorySharding.getDirectoryStats()
console.log('\nDirectory stats:')
console.log(` Total partitions: ${stats.totalPartitions}`)
console.log(' Distribution:', stats.distribution)
console.log(' Hot keys:', stats.hotKeys)
// Rebalancing
console.log('\n5. Rebalancing')
const rebalanceResult = await directorySharding.rebalance()
console.log(`Migrated ${rebalanceResult.migratedPartitions} partitions`)
console.log('New distribution:', rebalanceResult.newDistribution)
// Cross-Shard Queries
console.log('\n6. Cross-Shard Query Coordinator')
const queryCoordinator = new CrossShardQueryCoordinator(
hashSharding,
rangeSharding,
directorySharding
)
// Execute different query types
const queries = [
{
type: 'SELECT' as const,
tables: ['users'],
conditions: { id: 'user_123' }
},
{
type: 'JOIN' as const,
tables: ['users', 'orders'],
joinKeys: ['user_123']
},
{
type: 'AGGREGATE' as const,
tables: ['orders'],
conditions: { aggregate: 'COUNT' }
}
]
for (const query of queries) {
const result = await queryCoordinator.executeCrossShardQuery(query)
console.log(`\n${query.type} query:`)
console.log(` Shards queried: ${result.shardsQueried.length}`)
console.log(` Results returned: ${result.results.length}`)
console.log(` Execution time: ${result.executionTime}ms`)
}
console.log('\n=== Sharding Analysis Complete ===')
}
export {
PartitioningStrategy,
Shard,
PartitionMetadata,
HashShardingManager,
RangeShardingManager,
ConsistentHashRing,
DirectoryBasedSharding,
ShardingLoadBalancer,
CrossShardQueryCoordinator,
shardingExample
}
💻 Mecanismos de Tolerancia a Fallos
🔴 complex
⭐⭐⭐⭐
Implementando tolerancia a fallos comprensiva con circuit breakers, reintentos y estrategias de failover
⏱️ 75 min
🏷️ fault-tolerance, circuit-breaker, retry, failover
Prerequisites:
Microservices architecture knowledge, Resilience patterns
// Fault Tolerance Mechanisms Implementation
// Fault Tolerance Patterns
enum CircuitState {
CLOSED = 'CLOSED',
OPEN = 'OPEN',
HALF_OPEN = 'HALF_OPEN'
}
enum RetryStrategy {
FIXED_DELAY = 'FIXED_DELAY',
EXPONENTIAL_BACKOFF = 'EXPONENTIAL_BACKOFF',
LINEAR_BACKOFF = 'LINEAR_BACKOFF'
}
enum FailoverStrategy {
ACTIVE_PASSIVE = 'ACTIVE_PASSIVE',
ACTIVE_ACTIVE = 'ACTIVE_ACTIVE',
GEOGRAPHIC = 'GEOGRAPHIC'
}
interface ServiceHealth {
serviceId: string
endpoint: string
status: 'HEALTHY' | 'DEGRADED' | 'UNHEALTHY'
lastCheck: Date
responseTime: number
errorRate: number
consecutiveFailures: number
}
interface CircuitBreakerConfig {
failureThreshold: number
timeoutMs: number
halfOpenMaxCalls: number
resetTimeoutMs: number
}
interface RetryConfig {
maxAttempts: number
strategy: RetryStrategy
baseDelayMs: number
maxDelayMs: number
multiplier?: number
}
// Circuit Breaker Implementation
class CircuitBreaker {
private state: CircuitState = CircuitState.CLOSED
private failureCount: number = 0
private successCount: number = 0
private lastFailureTime: number = 0
private nextAttempt: number = 0
private halfOpenCalls: number = 0
constructor(
private serviceId: string,
private config: CircuitBreakerConfig,
private healthMonitor: HealthMonitor
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === CircuitState.OPEN) {
if (Date.now() < this.nextAttempt) {
throw new Error(`Circuit breaker OPEN for ${this.serviceId}. Next attempt at ${new Date(this.nextAttempt).toISOString()}`)
}
this.transitionToHalfOpen()
}
try {
const result = await this.withTimeout(operation, this.config.timeoutMs)
this.onSuccess()
return result
} catch (error) {
this.onFailure()
throw error
}
}
private async withTimeout<T>(operation: () => Promise<T>, timeoutMs: number): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error(`Operation timed out after ${timeoutMs}ms`))
}, timeoutMs)
operation()
.then(result => {
clearTimeout(timeoutId)
resolve(result)
})
.catch(error => {
clearTimeout(timeoutId)
reject(error)
})
})
}
private onSuccess(): void {
this.failureCount = 0
this.successCount++
if (this.state === CircuitState.HALF_OPEN) {
this.halfOpenCalls++
if (this.halfOpenCalls >= this.config.halfOpenMaxCalls) {
this.transitionToClosed()
}
}
this.healthMonitor.recordSuccess(this.serviceId)
}
private onFailure(): void {
this.failureCount++
this.lastFailureTime = Date.now()
if (this.state === CircuitState.HALF_OPEN) {
this.transitionToOpen()
} else if (this.failureCount >= this.config.failureThreshold) {
this.transitionToOpen()
}
this.healthMonitor.recordFailure(this.serviceId)
}
private transitionToClosed(): void {
this.state = CircuitState.CLOSED
this.failureCount = 0
this.successCount = 0
this.halfOpenCalls = 0
console.log(`[CIRCUIT_BREAKER] ${this.serviceId} transitioned to CLOSED`)
}
private transitionToOpen(): void {
this.state = CircuitState.OPEN
this.nextAttempt = Date.now() + this.config.resetTimeoutMs
this.halfOpenCalls = 0
console.log(`[CIRCUIT_BREAKER] ${this.serviceId} transitioned to OPEN until ${new Date(this.nextAttempt).toISOString()}`)
}
private transitionToHalfOpen(): void {
this.state = CircuitState.HALF_OPEN
this.halfOpenCalls = 0
console.log(`[CIRCUIT_BREAKER] ${this.serviceId} transitioned to HALF_OPEN`)
}
getState(): { state: CircuitState; failureCount: number; successCount: number } {
return {
state: this.state,
failureCount: this.failureCount,
successCount: this.successCount
}
}
forceOpen(): void {
this.transitionToOpen()
}
forceClose(): void {
this.transitionToClosed()
}
}
// Retry Mechanism with Different Strategies
class RetryMechanism {
constructor(private config: RetryConfig) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
let lastError: Error
for (let attempt = 1; attempt <= this.config.maxAttempts; attempt++) {
try {
console.log(`[RETRY] Attempt ${attempt}/${this.config.maxAttempts}`)
const result = await operation()
if (attempt > 1) {
console.log(`[RETRY] Operation succeeded on attempt ${attempt}`)
}
return result
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
console.log(`[RETRY] Attempt ${attempt} failed: ${lastError.message}`)
if (attempt === this.config.maxAttempts) {
console.log(`[RETRY] All ${this.config.maxAttempts} attempts exhausted`)
throw lastError
}
const delay = this.calculateDelay(attempt)
console.log(`[RETRY] Waiting ${delay}ms before retry...`)
await this.sleep(delay)
}
}
throw lastError!
}
private calculateDelay(attempt: number): number {
switch (this.config.strategy) {
case RetryStrategy.FIXED_DELAY:
return this.config.baseDelayMs
case RetryStrategy.EXPONENTIAL_BACKOFF:
const exponentialDelay = this.config.baseDelayMs * Math.pow(this.config.multiplier || 2, attempt - 1)
return Math.min(exponentialDelay, this.config.maxDelayMs)
case RetryStrategy.LINEAR_BACKOFF:
const linearDelay = this.config.baseDelayMs + (attempt - 1) * (this.config.baseDelayMs / 2)
return Math.min(linearDelay, this.config.maxDelayMs)
default:
return this.config.baseDelayMs
}
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
}
// Health Monitor Service
class HealthMonitor {
private serviceHealth: Map<string, ServiceHealth> = new Map()
private healthCheckInterval: number = 30000 // 30 seconds
constructor(private services: Array<{ id: string; endpoint: string }>) {
this.initializeServices()
this.startHealthChecks()
}
private initializeServices(): void {
for (const service of this.services) {
this.serviceHealth.set(service.id, {
serviceId: service.id,
endpoint: service.endpoint,
status: 'HEALTHY',
lastCheck: new Date(),
responseTime: 0,
errorRate: 0,
consecutiveFailures: 0
})
}
}
private startHealthChecks(): void {
setInterval(() => {
this.performHealthChecks()
}, this.healthCheckInterval)
}
private async performHealthChecks(): Promise<void> {
for (const [serviceId, health] of this.serviceHealth.entries()) {
try {
const startTime = Date.now()
await this.checkServiceHealth(health.endpoint)
const responseTime = Date.now() - startTime
health.status = 'HEALTHY'
health.responseTime = responseTime
health.lastCheck = new Date()
health.consecutiveFailures = 0
// Update error rate (simplified moving average)
health.errorRate = health.errorRate * 0.9 // Decay factor
console.log(`[HEALTH] ${serviceId}: HEALTHY (${responseTime}ms)`)
} catch (error) {
health.consecutiveFailures++
health.lastCheck = new Date()
if (health.consecutiveFailures >= 3) {
health.status = 'UNHEALTHY'
} else if (health.consecutiveFailures >= 1) {
health.status = 'DEGRADED'
}
// Update error rate
health.errorRate = Math.min(1, health.errorRate * 0.9 + 0.1)
console.log(`[HEALTH] ${serviceId}: ${health.status} (failure #${health.consecutiveFailures})`)
}
}
}
private async checkServiceHealth(endpoint: string): Promise<void> {
// Simulate health check
await new Promise((resolve, reject) => {
setTimeout(() => {
// 90% success rate
if (Math.random() > 0.1) {
resolve(undefined)
} else {
reject(new Error('Service unavailable'))
}
}, Math.random() * 500 + 50) // 50-550ms response time
})
}
recordSuccess(serviceId: string): void {
const health = this.serviceHealth.get(serviceId)
if (health) {
health.consecutiveFailures = 0
health.errorRate = Math.max(0, health.errorRate * 0.9)
}
}
recordFailure(serviceId: string): void {
const health = this.serviceHealth.get(serviceId)
if (health) {
health.consecutiveFailures++
health.errorRate = Math.min(1, health.errorRate * 0.9 + 0.1)
}
}
getHealthyServices(): string[] {
return Array.from(this.serviceHealth.values())
.filter(health => health.status === 'HEALTHY')
.map(health => health.serviceId)
}
getServiceHealth(serviceId: string): ServiceHealth | null {
return this.serviceHealth.get(serviceId) || null
}
getAllServiceHealth(): ServiceHealth[] {
return Array.from(this.serviceHealth.values())
}
}
// Failover Manager
class FailoverManager {
private activeNode: string | null = null
private standbyNodes: Set<string> = new Set()
private failoverHistory: Array<{
timestamp: Date
fromNode: string
toNode: string
reason: string
}> = []
constructor(
private nodes: Array<{ id: string; endpoint: string; region: string; priority: number }>,
private strategy: FailoverStrategy,
private healthMonitor: HealthMonitor
) {
this.initializeFailover()
}
private initializeFailover(): void {
if (this.strategy === FailoverStrategy.ACTIVE_PASSIVE) {
// Sort by priority and select highest as active
const sortedNodes = [...this.nodes].sort((a, b) => b.priority - a.priority)
this.activeNode = sortedNodes[0].id
this.standbyNodes = new Set(sortedNodes.slice(1).map(n => n.id))
} else if (this.strategy === FailoverStrategy.ACTIVE_ACTIVE) {
// All nodes are active
this.activeNode = 'multi-active'
this.standbyNodes = new Set()
}
console.log(`[FAILOVER] Initialized with ${this.strategy} strategy`)
console.log(`[FAILOVER] Active node: ${this.activeNode}`)
console.log(`[FAILOVER] Standby nodes: ${Array.from(this.standbyNodes).join(', ')}`)
}
async executeWithFailover<T>(
operation: (nodeId: string) => Promise<T>,
operationContext: string = 'operation'
): Promise<T> {
const nodesToTry = this.getNodesInOrder()
let lastError: Error | null = null
for (const nodeId of nodesToTry) {
try {
console.log(`[FAILOVER] Executing ${operationContext} on node ${nodeId}`)
// Check if node is healthy
const health = this.healthMonitor.getServiceHealth(nodeId)
if (health && health.status !== 'HEALTHY') {
console.log(`[FAILOVER] Skipping unhealthy node ${nodeId} (${health.status})`)
continue
}
const result = await operation(nodeId)
console.log(`[FAILOVER] ${operationContext} succeeded on node ${nodeId}`)
return result
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
console.log(`[FAILOVER] ${operationContext} failed on node ${nodeId}: ${lastError.message}`)
// If active node failed, initiate failover
if (nodeId === this.activeNode) {
await this.initiateFailover(nodeId, lastError.message)
}
}
}
throw lastError || new Error('All nodes failed')
}
private getNodesInOrder(): string[] {
if (this.strategy === FailoverStrategy.ACTIVE_ACTIVE) {
// Return healthy nodes sorted by priority
const healthyNodes = this.nodes.filter(node => {
const health = this.healthMonitor.getServiceHealth(node.id)
return health && health.status === 'HEALTHY'
})
return healthyNodes.sort((a, b) => b.priority - a.priority).map(n => n.id)
} else {
// Return active first, then healthy standbys
const nodes = [this.activeNode!, ...this.nodes
.filter(n => this.standbyNodes.has(n.id))
.filter(n => {
const health = this.healthMonitor.getServiceHealth(n.id)
return health && health.status === 'HEALTHY'
})
.sort((a, b) => b.priority - a.priority)
.map(n => n.id)]
return nodes.filter(Boolean)
}
}
private async initiateFailover(failedNodeId: string, reason: string): Promise<void> {
console.log(`[FAILOVER] Initiating failover from ${failedNodeId}`)
if (this.strategy === FailoverStrategy.ACTIVE_PASSIVE && this.activeNode === failedNodeId) {
// Find next highest priority healthy standby
const healthyStandbys = this.nodes
.filter(n => this.standbyNodes.has(n.id))
.filter(n => {
const health = this.healthMonitor.getServiceHealth(n.id)
return health && health.status === 'HEALTHY'
})
.sort((a, b) => b.priority - a.priority)
if (healthyStandbys.length > 0) {
const newActiveNode = healthyStandbys[0].id
this.failoverHistory.push({
timestamp: new Date(),
fromNode: failedNodeId,
toNode: newActiveNode,
reason
})
this.standbyNodes.delete(newActiveNode)
this.standbyNodes.add(failedNodeId)
this.activeNode = newActiveNode
console.log(`[FAILOVER] Failover completed: ${failedNodeId} -> ${newActiveNode}`)
} else {
console.log(`[FAILOVER] No healthy standby nodes available for failover`)
}
}
}
getFailoverHistory(): Array<{
timestamp: Date
fromNode: string
toNode: string
reason: string
}> {
return [...this.failoverHistory]
}
getCurrentConfiguration(): {
strategy: FailoverStrategy
activeNode: string | null
standbyNodes: string[]
totalFailovers: number
} {
return {
strategy: this.strategy,
activeNode: this.activeNode,
standbyNodes: Array.from(this.standbyNodes),
totalFailovers: this.failoverHistory.length
}
}
}
// Bulkhead Pattern Implementation
class Bulkhead {
private semaphore: number = 0
private waitingQueue: Array<{ resolve: Function; reject: Function }> = []
private metrics = {
totalCalls: 0,
rejectedCalls: 0,
successfulCalls: 0,
failedCalls: 0
}
constructor(
private name: string,
private maxConcurrentCalls: number,
private maxQueueSize: number
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
this.metrics.totalCalls++
if (this.semaphore >= this.maxConcurrentCalls) {
if (this.waitingQueue.length >= this.maxQueueSize) {
this.metrics.rejectedCalls++
throw new Error(`Bulkhead '${this.name}' rejected call - queue full`)
}
// Wait for available slot
await new Promise((resolve, reject) => {
this.waitingQueue.push({ resolve, reject })
})
}
this.semaphore++
try {
const result = await operation()
this.metrics.successfulCalls++
return result
} catch (error) {
this.metrics.failedCalls++
throw error
} finally {
this.semaphore--
this.processQueue()
}
}
private processQueue(): void {
if (this.waitingQueue.length > 0 && this.semaphore < this.maxConcurrentCalls) {
const next = this.waitingQueue.shift()
if (next) {
next.resolve(undefined)
}
}
}
getMetrics(): typeof this.metrics & {
currentConcurrency: number
queueSize: number
rejectionRate: number
} {
const rejectionRate = this.metrics.totalCalls > 0
? (this.metrics.rejectedCalls / this.metrics.totalCalls) * 100
: 0
return {
...this.metrics,
currentConcurrency: this.semaphore,
queueSize: this.waitingQueue.length,
rejectionRate
}
}
}
// Timeout Manager
class TimeoutManager {
private timeouts: Map<string, NodeJS.Timeout> = new Map()
executeWithTimeout<T>(
operation: () => Promise<T>,
timeoutMs: number,
operationId: string = crypto.randomUUID()
): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
this.timeouts.delete(operationId)
reject(new Error(`Operation ${operationId} timed out after ${timeoutMs}ms`))
}, timeoutMs)
this.timeouts.set(operationId, timeoutId)
operation()
.then(result => {
this.clearTimeout(operationId)
resolve(result)
})
.catch(error => {
this.clearTimeout(operationId)
reject(error)
})
})
}
clearTimeout(operationId: string): void {
const timeoutId = this.timeouts.get(operationId)
if (timeoutId) {
clearTimeout(timeoutId)
this.timeouts.delete(operationId)
}
}
cancelAll(): void {
for (const timeoutId of this.timeouts.values()) {
clearTimeout(timeoutId)
}
this.timeouts.clear()
}
getActiveTimeoutsCount(): number {
return this.timeouts.size
}
}
// Fault Tolerance Orchestrator
class FaultToleranceOrchestrator {
private circuitBreakers: Map<string, CircuitBreaker> = new Map()
private bulkheads: Map<string, Bulkhead> = new Map()
private retryMechanisms: Map<string, RetryMechanism> = new Map()
private timeoutManager = new TimeoutManager()
constructor(
private services: Array<{
id: string
endpoint: string
circuitBreakerConfig?: CircuitBreakerConfig
bulkheadConfig?: { maxConcurrentCalls: number; maxQueueSize: number }
retryConfig?: RetryConfig
}>,
private healthMonitor: HealthMonitor,
private failoverManager: FailoverManager
) {
this.initializeComponents()
}
private initializeComponents(): void {
for (const service of this.services) {
// Circuit Breaker
if (service.circuitBreakerConfig) {
this.circuitBreakers.set(service.id, new CircuitBreaker(
service.id,
service.circuitBreakerConfig,
this.healthMonitor
))
}
// Bulkhead
if (service.bulkheadConfig) {
this.bulkheads.set(service.id, new Bulkhead(
service.id,
service.bulkheadConfig.maxConcurrentCalls,
service.bulkheadConfig.maxQueueSize
))
}
// Retry Mechanism
if (service.retryConfig) {
this.retryMechanisms.set(service.id, new RetryMechanism(service.retryConfig))
}
}
}
async executeServiceCall<T>(
serviceId: string,
operation: () => Promise<T>,
options: {
timeoutMs?: number
useFailover?: boolean
} = {}
): Promise<T> {
const service = this.services.find(s => s.id === serviceId)
if (!service) {
throw new Error(`Service ${serviceId} not found`)
}
const circuitBreaker = this.circuitBreakers.get(serviceId)
const bulkhead = this.bulkheads.get(serviceId)
const retryMechanism = this.retryMechanisms.get(serviceId)
let executeOperation = operation
// Apply bulkhead if configured
if (bulkhead) {
const originalOperation = executeOperation
executeOperation = () => bulkhead.execute(originalOperation)
}
// Apply retry mechanism if configured
if (retryMechanism) {
const originalOperation = executeOperation
executeOperation = () => retryMechanism.execute(originalOperation)
}
// Apply circuit breaker if configured
if (circuitBreaker) {
const originalOperation = executeOperation
executeOperation = () => circuitBreaker.execute(originalOperation)
}
// Apply timeout if specified
if (options.timeoutMs) {
const originalOperation = executeOperation
executeOperation = () => this.timeoutManager.executeWithTimeout(originalOperation, options.timeoutMs!)
}
// Apply failover if requested
if (options.useFailover) {
return this.failoverManager.executeWithFailover(
() => executeOperation(),
`service call to ${serviceId}`
)
}
return executeOperation()
}
getSystemHealth(): {
services: Array<{
id: string
circuitBreakerState?: any
bulkheadMetrics?: any
health: ServiceHealth | null
}>
failoverConfig: any
overallHealth: 'HEALTHY' | 'DEGRADED' | 'UNHEALTHY'
} {
const services = this.services.map(service => {
const circuitBreaker = this.circuitBreakers.get(service.id)
const bulkhead = this.bulkheads.get(service.id)
const health = this.healthMonitor.getServiceHealth(service.id)
return {
id: service.id,
circuitBreakerState: circuitBreaker?.getState(),
bulkheadMetrics: bulkhead?.getMetrics(),
health
}
})
const unhealthyServices = services.filter(s => s.health?.status === 'UNHEALTHY').length
const degradedServices = services.filter(s => s.health?.status === 'DEGRADED').length
let overallHealth: 'HEALTHY' | 'DEGRADED' | 'UNHEALTHY' = 'HEALTHY'
if (unhealthyServices > 0) {
overallHealth = 'UNHEALTHY'
} else if (degradedServices > 0) {
overallHealth = 'DEGRADED'
}
return {
services,
failoverConfig: this.failoverManager.getCurrentConfiguration(),
overallHealth
}
}
}
// Usage Example
async function faultToleranceExample() {
console.log('=== Fault Tolerance Mechanisms ===\n')
// Initialize services
const services = [
{
id: 'user-service',
endpoint: 'http://user-service:8080',
circuitBreakerConfig: {
failureThreshold: 5,
timeoutMs: 5000,
halfOpenMaxCalls: 3,
resetTimeoutMs: 30000
},
bulkheadConfig: {
maxConcurrentCalls: 10,
maxQueueSize: 50
},
retryConfig: {
maxAttempts: 3,
strategy: RetryStrategy.EXPONENTIAL_BACKOFF,
baseDelayMs: 1000,
maxDelayMs: 10000,
multiplier: 2
}
},
{
id: 'order-service',
endpoint: 'http://order-service:8080',
circuitBreakerConfig: {
failureThreshold: 3,
timeoutMs: 8000,
halfOpenMaxCalls: 2,
resetTimeoutMs: 60000
},
retryConfig: {
maxAttempts: 2,
strategy: RetryStrategy.FIXED_DELAY,
baseDelayMs: 2000,
maxDelayMs: 5000
}
},
{
id: 'payment-service',
endpoint: 'http://payment-service:8080',
circuitBreakerConfig: {
failureThreshold: 2,
timeoutMs: 3000,
halfOpenMaxCalls: 1,
resetTimeoutMs: 15000
}
}
]
// Initialize nodes for failover
const nodes = [
{ id: 'primary-db', endpoint: 'http://primary-db:5432', region: 'us-east-1', priority: 10 },
{ id: 'secondary-db', endpoint: 'http://secondary-db:5432', region: 'us-west-1', priority: 8 },
{ id: 'backup-db', endpoint: 'http://backup-db:5432', region: 'eu-west-1', priority: 6 }
]
// Initialize components
const healthMonitor = new HealthMonitor(services)
const failoverManager = new FailoverManager(nodes, FailoverStrategy.ACTIVE_PASSIVE, healthMonitor)
const orchestrator = new FaultToleranceOrchestrator(services, healthMonitor, failoverManager)
console.log('1. Initial System Health')
// Wait for initial health checks
await new Promise(resolve => setTimeout(resolve, 2000))
let systemHealth = orchestrator.getSystemHealth()
console.log(`Overall system health: ${systemHealth.overallHealth}`)
console.log('Service health:')
systemHealth.services.forEach(service => {
console.log(` ${service.id}: ${service.health?.status || 'UNKNOWN'}`)
})
console.log('\n2. Testing Circuit Breaker')
// Simulate service failures to trigger circuit breaker
console.log('Simulating user-service failures...')
for (let i = 0; i < 7; i++) {
try {
await orchestrator.executeServiceCall('user-service', async () => {
// 70% failure rate
if (Math.random() > 0.3) {
throw new Error('Service temporarily unavailable')
}
return 'User data'
}, { timeoutMs: 1000 })
console.log(` Attempt ${i + 1}: SUCCESS`)
} catch (error) {
console.log(` Attempt ${i + 1}: FAILED - ${error.message}`)
}
}
// Check circuit breaker state
const userServiceCB = orchestrator['circuitBreakers'].get('user-service')
if (userServiceCB) {
const cbState = userServiceCB.getState()
console.log(`\nCircuit breaker state: ${cbState.state} (failures: ${cbState.failureCount})`)
}
console.log('\n3. Testing Retry Mechanism')
// Test different retry strategies
const retryConfig = {
maxAttempts: 4,
strategy: RetryStrategy.EXPONENTIAL_BACKOFF,
baseDelayMs: 500,
maxDelayMs: 5000,
multiplier: 2
}
const retryMechanism = new RetryMechanism(retryConfig)
try {
const result = await retryMechanism.execute(async () => {
// 50% success rate
if (Math.random() > 0.5) {
throw new Error('Random failure')
}
return 'Operation succeeded'
})
console.log(`Retry mechanism result: ${result}`)
} catch (error) {
console.log(`Retry mechanism failed: ${error.message}`)
}
console.log('\n4. Testing Bulkhead Pattern')
const bulkhead = new Bulkhead('test-bulkhead', 3, 5)
// Submit concurrent requests
const promises = []
for (let i = 0; i < 8; i++) {
promises.push(
bulkhead.execute(async () => {
await new Promise(resolve => setTimeout(resolve, 1000))
return `Request ${i} completed`
}).catch(error => error.message)
)
}
const bulkheadResults = await Promise.all(promises)
console.log('Bulkhead results:')
bulkheadResults.forEach((result, index) => {
console.log(` Request ${index}: ${result}`)
})
const bulkheadMetrics = bulkhead.getMetrics()
console.log(`Bulkhead metrics: ${JSON.stringify(bulkheadMetrics)}`)
console.log('\n5. Testing Failover')
// Simulate database failover
try {
await orchestrator.executeServiceCall('user-service', async () => {
return await failoverManager.executeWithFailover(async (nodeId) => {
console.log(` Executing database operation on ${nodeId}`)
// Simulate primary failure
if (nodeId === 'primary-db' && Math.random() > 0.3) {
throw new Error('Primary database failed')
}
await new Promise(resolve => setTimeout(resolve, 200))
return `Data from ${nodeId}`
}, 'database query')
}, { useFailover: true })
} catch (error) {
console.log(`Failover test failed: ${error.message}`)
}
const failoverHistory = failoverManager.getFailoverHistory()
if (failoverHistory.length > 0) {
console.log('\nFailover history:')
failoverHistory.forEach(failover => {
console.log(` ${failover.timestamp.toISOString()}: ${failover.fromNode} -> ${failover.toNode} (${failover.reason})`)
})
}
console.log('\n6. Final System Health Check')
// Wait for some time to see health changes
await new Promise(resolve => setTimeout(resolve, 3000))
systemHealth = orchestrator.getSystemHealth()
console.log(`Final overall health: ${systemHealth.overallHealth}`)
console.log('\nService details:')
systemHealth.services.forEach(service => {
console.log(`\n${service.id}:`)
console.log(` Health: ${service.health?.status}`)
console.log(` Response time: ${service.health?.responseTime}ms`)
console.log(` Error rate: ${(service.health?.errorRate || 0).toFixed(2)}%`)
if (service.circuitBreakerState) {
console.log(` Circuit breaker: ${service.circuitBreakerState.state}`)
}
if (service.bulkheadMetrics) {
console.log(` Bulkhead: ${service.bulkheadMetrics.currentConcurrency}/${service.bulkheadMetrics.maxConcurrency}`)
}
})
console.log('\n=== Fault Tolerance Demo Complete ===')
}
export {
CircuitState,
RetryStrategy,
FailoverStrategy,
ServiceHealth,
CircuitBreaker,
RetryMechanism,
HealthMonitor,
FailoverManager,
Bulkhead,
TimeoutManager,
FaultToleranceOrchestrator,
faultToleranceExample
}