🎯 empfohlene Sammlungen
Balanced sample collections from various categories for you to explore
QUIC-Protokoll-Beispiele
Umfassende QUIC-Protokoll-Beispiele, einschließlich HTTP/3-Implementierung, zuverlässigem UDP-Transport, Verbindungsmigration und Performance-Optimierung
💻 HTTP/3 Server Implementierung
🔴 complex
⭐⭐⭐⭐⭐
Vollständiger HTTP/3-Server mit QUIC-Transport, TLS 1.3 und modernen Web-Features
⏱️ 40 min
🏷️ http3, quic, tls, server, web-server, api
// HTTP/3 Server Implementation with QUIC Transport
// Complete server setup with TLS 1.3 and HTTP/3 features
const { createServer } = require('http3');
const fs = require('fs');
const path = require('path');
class HTTP3Server {
constructor(options = {}) {
this.options = {
port: options.port || 4433,
host: options.host || '0.0.0.0',
key: options.key || fs.readFileSync(path.join(__dirname, 'server.key')),
cert: options.cert || fs.readFileSync(path.join(__dirname, 'server.crt')),
ca: options.ca || fs.readFileSync(path.join(__dirname, 'ca.crt')),
maxStreams: options.maxStreams || 100,
idleTimeout: options.idleTimeout || 30000,
...options
};
this.server = null;
this.streamHandlers = new Map();
this.middleware = [];
}
start() {
this.server = createServer({
key: this.options.key,
cert: this.options.cert,
ca: this.options.ca,
alpn: 'h3',
settings: {
maxConcurrentStreams: this.options.maxStreams,
idleTimeout: this.options.idleTimeout,
maxStreamData: 1024 * 1024, // 1MB
maxData: 10 * 1024 * 1024, // 10MB
initialMaxData: 1024 * 1024
}
});
// Setup request handling
this.server.on('request', (req, res) => {
this.handleRequest(req, res);
});
// Setup stream handling
this.server.on('stream', (stream, headers) => {
this.handleStream(stream, headers);
});
// Setup connection handling
this.server.on('connection', (connection) => {
this.handleConnection(connection);
});
// Setup error handling
this.server.on('error', (error) => {
console.error('HTTP/3 server error:', error);
});
// Start listening
this.server.listen(this.options.port, this.options.host, () => {
console.log(`HTTP/3 server listening on https://${this.options.host}:${this.options.port}`);
});
}
async handleRequest(req, res) {
try {
// Apply middleware
for (const middleware of this.middleware) {
const result = await middleware(req, res);
if (result === false) return; // Middleware handled the response
}
// Route handling
const route = this.matchRoute(req.method, req.url);
if (route) {
await route.handler(req, res);
} else {
// Static file serving
await this.serveStaticFile(req, res);
}
} catch (error) {
console.error('Request handling error:', error);
res.statusCode = 500;
res.end('Internal Server Error');
}
}
handleStream(stream, headers) {
const streamId = headers[':path'] || 'unknown';
console.log(`New HTTP/3 stream: ${streamId}`);
stream.on('data', (chunk) => {
console.log(`Received data on stream ${streamId}:`, chunk.length);
});
stream.on('end', () => {
console.log(`Stream ${streamId} ended`);
});
stream.on('error', (error) => {
console.error(`Stream ${streamId} error:`, error);
});
// Send response
stream.write('HTTP/3 Stream Response\n');
stream.end();
}
handleConnection(connection) {
console.log('New HTTP/3 connection established');
connection.on('close', () => {
console.log('HTTP/3 connection closed');
});
connection.on('error', (error) => {
console.error('HTTP/3 connection error:', error);
});
// Connection metrics
this.updateConnectionMetrics(connection);
}
// Route system
get(path, handler) {
this.addRoute('GET', path, handler);
}
post(path, handler) {
this.addRoute('POST', path, handler);
}
put(path, handler) {
this.addRoute('PUT', path, handler);
}
delete(path, handler) {
this.addRoute('DELETE', path, handler);
}
addRoute(method, path, handler) {
const key = `${method}:${path}`;
this.streamHandlers.set(key, { method, path, handler });
}
matchRoute(method, url) {
const path = new URL(url, `http://${this.options.host}`).pathname;
const key = `${method}:${path}`;
return this.streamHandlers.get(key);
}
// Middleware system
use(middleware) {
this.middleware.push(middleware);
}
// Static file serving
async serveStaticFile(req, res) {
const url = new URL(req.url, `http://${this.options.host}`);
let filePath = url.pathname;
// Default to index.html
if (filePath === '/') {
filePath = '/index.html';
}
// Security: prevent directory traversal
filePath = path.normalize(filePath).replace(/^\.\./, '');
const fullPath = path.join(__dirname, 'public', filePath);
try {
const stats = await fs.promises.stat(fullPath);
if (stats.isDirectory()) {
filePath = path.join(filePath, 'index.html');
}
const data = await fs.promises.readFile(fullPath);
const ext = path.extname(fullPath);
// Set appropriate content type
const contentType = this.getContentType(ext);
res.setHeader('Content-Type', contentType);
// Enable HTTP/3 specific features
res.setHeader('Cache-Control', 'public, max-age=3600');
res.setHeader('Vary', 'Accept-Encoding');
// HTTP/3 Early Hints
if (ext === '.html') {
res.writeEarlyHints({
link: '</style.css>; rel=preload; as=style',
link: '</script.js>; rel=preload; as=script'
});
}
res.end(data);
} catch (error) {
if (error.code === 'ENOENT') {
res.statusCode = 404;
res.end('File Not Found');
} else {
console.error('Static file error:', error);
res.statusCode = 500;
res.end('Internal Server Error');
}
}
}
getContentType(ext) {
const contentTypes = {
'.html': 'text/html',
'.css': 'text/css',
'.js': 'application/javascript',
'.json': 'application/json',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.gif': 'image/gif',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.wasm': 'application/wasm'
};
return contentTypes[ext] || 'application/octet-stream';
}
// HTTP/3 specific features
updateConnectionMetrics(connection) {
const metrics = {
remoteAddress: connection.remoteAddress,
state: connection.state,
version: connection.version,
alpn: connection.alpn
};
console.log('Connection metrics:', metrics);
}
// Push resources (HTTP/3 equivalent)
pushResources(res, resources) {
for (const resource of resources) {
res.push(resource);
}
}
stop() {
if (this.server) {
this.server.close();
console.log('HTTP/3 server stopped');
}
}
}
// Usage example with HTTP/3 features
const server = new HTTP3Server({
port: 4433,
maxStreams: 200,
idleTimeout: 60000
});
// Add middleware for logging
server.use((req, res) => {
console.log(`${new Date().toISOString()} - ${req.method} ${req.url}`);
});
// Add routes
server.get('/api/time', (req, res) => {
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({
time: new Date().toISOString(),
protocol: 'HTTP/3',
version: '1.0'
}));
});
server.post('/api/echo', async (req, res) => {
const chunks = [];
req.on('data', chunk => chunks.push(chunk));
req.on('end', () => {
const body = Buffer.concat(chunks);
res.setHeader('Content-Type', 'application/json');
res.end(JSON.stringify({
echo: body.toString(),
length: body.length,
timestamp: Date.now()
}));
});
});
// WebSocket alternative with HTTP/3
server.get('/ws', (req, res) => {
// HTTP/3 doesn't have native WebSocket, but we can simulate
// with unidirectional streams
console.log('WebSocket-like connection request');
res.writeHead(200, {
'Content-Type': 'text/plain',
'Protocol': 'websocket-like'
});
// Stream-based communication
setInterval(() => {
res.write(`data: ${JSON.stringify({ time: Date.now() })}\n\n`);
}, 1000);
});
// Server-Sent Events with HTTP/3
server.get('/events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
const sendEvent = (event, data) => {
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
};
// Send periodic events
const interval = setInterval(() => {
sendEvent('ping', { time: Date.now() });
}, 5000);
req.on('close', () => {
clearInterval(interval);
});
});
// Start server
server.start();
// Graceful shutdown
process.on('SIGINT', () => {
console.log('Shutting down HTTP/3 server...');
server.stop();
process.exit(0);
});
💻 Zuverlässiger UDP-Transport QUIC
🔴 complex
⭐⭐⭐⭐⭐
QUIC-basierter zuverlässiger UDP-Transport mit Stau- und Flusskontrolle
⏱️ 50 min
🏷️ quic, udp, reliable-transport, congestion-control, flow-control
// QUIC Reliable Transport over UDP
// Implementation with congestion control, flow control, and reliability
const dgram = require('dgram');
const crypto = require('crypto');
class QUICConnection {
constructor(socket, remoteAddress, remotePort, isClient = false) {
this.socket = socket;
this.remoteAddress = remoteAddress;
this.remotePort = remotePort;
this.isClient = isClient;
// QUIC connection state
this.connectionId = crypto.randomBytes(16);
this.state = 'idle'; // idle, handshake, connected, closing, closed
this.version = 1;
// Streams management
this.streams = new Map(); // streamId -> QUICStream
this.nextStreamId = isClient ? 0 : 1;
this.maxStreamId = 2 ** 60 - 4;
// Flow control
this.flowControlWindow = 1024 * 1024; // 1MB
this.maxStreamData = 256 * 1024; // 256KB per stream
this.maxData = 10 * 1024 * 1024; // 10MB total
// Congestion control
this.congestionControl = new CubicCongestionControl();
// Buffers
this.sendBuffer = [];
this.receiveBuffer = new Map(); // packetNumber -> buffer
this.ackedPackets = new Set();
// Encryption
this.keys = null;
this.iv = crypto.randomBytes(12);
// Timers
this.ackTimer = null;
this.retransmissionTimer = null;
this.idleTimer = null;
this.setupPacketHandling();
}
setupPacketHandling() {
this.packetNumber = 0;
this.largestAckedPacket = 0;
}
async connect() {
this.state = 'handshake';
// Send initial packet
const initialPacket = this.createPacket({
type: 'initial',
version: this.version,
connectionId: this.connectionId,
packetNumber: ++this.packetNumber,
payload: this.createCryptoHandshake()
});
await this.sendPacket(initialPacket);
this.startRetransmissionTimer();
}
createPacket(options) {
const packet = {
header: {
type: options.type,
version: options.version,
connectionId: options.connectionId,
packetNumber: options.packetNumber
},
payload: options.payload || Buffer.alloc(0)
};
// Add QUIC frame headers
if (options.frames) {
packet.frames = options.frames;
}
return packet;
}
createCryptoHandshake() {
// Simplified crypto handshake
const handshake = {
crypto: {
algorithms: ['AES-256-GCM', 'ChaCha20-Poly1305'],
hash: 'SHA-256',
keyExchange: 'X25519'
},
parameters: {
maxIdleTimeout: 30000,
maxPacketSize: 1200,
initialMaxData: this.maxData,
initialMaxStreamData: this.maxStreamData
}
};
return Buffer.from(JSON.stringify(handshake));
}
async sendPacket(packet) {
try {
const encryptedPacket = await this.encryptPacket(packet);
const packetBuffer = this.serializePacket(encryptedPacket);
this.socket.send(packetBuffer, this.remotePort, this.remoteAddress);
console.log(`Sent QUIC packet ${packet.packetNumber} to ${this.remoteAddress}:${this.remotePort}`);
} catch (error) {
console.error('Error sending packet:', error);
}
}
async encryptPacket(packet) {
if (!this.keys) {
return packet; // Not yet encrypted
}
// Implement AEAD encryption (simplified)
const plaintext = Buffer.from(JSON.stringify(packet));
const aad = this.createAdditionalAuthenticatedData(packet);
// Use crypto.createCipheriv for actual implementation
// This is a simplified version
return {
encrypted: plaintext,
aad: aad
};
}
createAdditionalAuthenticatedData(packet) {
const header = packet.header;
return Buffer.from(`${header.type}|${header.version}|${header.packetNumber}`);
}
serializePacket(packet) {
// Simplified packet serialization
const header = Buffer.alloc(16);
header.writeUInt8(0x01, 0); // Long header
header.writeUInt8(this.version, 1);
header.writeUInt8(0x01, 2); // Initial packet type
connectionId.copy(header, 4);
const payload = packet.payload || Buffer.alloc(0);
return Buffer.concat([header, payload]);
}
handleReceivedPacket(packetBuffer) {
try {
const packet = this.deserializePacket(packetBuffer);
if (this.largestAckedPacket < packet.packetNumber) {
this.largestAckedPacket = packet.packetNumber;
}
switch (packet.type) {
case 'initial':
this.handleInitialPacket(packet);
break;
case 'handshake':
this.handleHandshakePacket(packet);
break;
case '1-RTT':
this.handleProtectedPacket(packet);
break;
default:
console.log(`Unknown packet type: ${packet.type}`);
}
// Send acknowledgment
this.sendAck();
} catch (error) {
console.error('Error handling packet:', error);
}
}
deserializePacket(buffer) {
const header = {
type: '1-RTT',
version: 1,
packetNumber: 0,
connectionId: buffer.slice(4, 20)
};
return {
header: header,
type: header.type,
version: header.version,
packetNumber: header.packetNumber,
connectionId: header.connectionId,
payload: buffer.slice(20)
};
}
handleInitialPacket(packet) {
console.log('Handling initial packet');
if (this.state === 'idle' && this.isClient) {
// Server responding to client
this.processHandshake(packet.payload);
}
}
handleHandshakePacket(packet) {
console.log('Handling handshake packet');
this.processHandshake(packet.payload);
}
handleProtectedPacket(packet) {
console.log('Handling protected packet');
this.processFrames(packet.payload);
}
processHandshake(payload) {
try {
const handshake = JSON.parse(payload.toString());
console.log('Handshake parameters:', handshake);
// Setup encryption keys (simplified)
this.setupEncryption(handshake.crypto);
// Update flow control parameters
this.flowControlWindow = handshake.parameters.initialMaxData;
this.maxStreamData = handshake.parameters.initialMaxStreamData;
this.state = 'connected';
console.log('QUIC connection established');
} catch (error) {
console.error('Error processing handshake:', error);
}
}
processFrames(payload) {
// Process QUIC frames (STREAM, ACK, PING, etc.)
const frames = this.parseFrames(payload);
for (const frame of frames) {
switch (frame.type) {
case 'STREAM':
this.handleStreamFrame(frame);
break;
case 'ACK':
this.handleAckFrame(frame);
break;
case 'PING':
this.handlePingFrame(frame);
break;
case 'CONNECTION_CLOSE':
this.handleConnectionCloseFrame(frame);
break;
}
}
}
parseFrames(payload) {
const frames = [];
let offset = 0;
while (offset < payload.length) {
const frameType = payload[offset++];
switch (frameType) {
case 0x00: // STREAM frame
const streamFrame = this.parseStreamFrame(payload, offset);
frames.push(streamFrame);
offset += streamFrame.length;
break;
case 0x02: // ACK frame
const ackFrame = this.parseAckFrame(payload, offset);
frames.push(ackFrame);
offset += ackFrame.length;
break;
case 0x01: // PING frame
frames.push({ type: 'PING', length: 1 });
break;
default:
// Skip unknown frame types
offset = payload.length;
}
}
return frames;
}
parseStreamFrame(buffer, offset) {
const streamId = this.readVariableLengthInteger(buffer, offset);
offset += streamId.length;
const offsetData = this.readVariableLengthInteger(buffer, offset);
offset += offsetData.length;
const length = this.readVariableLengthInteger(buffer, offset);
offset += length.length;
const data = buffer.slice(offset, offset + length.value);
return {
type: 'STREAM',
streamId: streamId.value,
offset: offsetData.value,
length: length.value,
data: data,
fin: buffer[offset + length.value] === 0x01
};
}
parseAckFrame(buffer, offset) {
const largestAcked = this.readVariableLengthInteger(buffer, offset);
offset += largestAcked.length;
const ackDelay = this.readVariableLengthInteger(buffer, offset);
offset += ackDelay.length;
return {
type: 'ACK',
largestAcked: largestAcked.value,
ackDelay: ackDelay.value,
length: largestAcked.length + ackDelay.length + 1
};
}
readVariableLengthInteger(buffer, offset) {
const first = buffer[offset];
let value = first & 0x3f;
let length = 1;
if (first & 0xc0) {
const additionalBytes = Math.floor((first & 0xc0) >> 6);
length += additionalBytes;
for (let i = 1; i <= additionalBytes; i++) {
value = (value << 8) | buffer[offset + i];
}
}
return { value, length };
}
handleStreamFrame(frame) {
const streamId = frame.streamId;
if (!this.streams.has(streamId)) {
this.streams.set(streamId, new QUICStream(streamId, this));
}
const stream = this.streams.get(streamId);
stream.handleData(frame.offset, frame.data, frame.fin);
}
handleAckFrame(frame) {
console.log(`Received ACK for packet ${frame.largestAcked}`);
// Update congestion control
this.congestionControl.onPacketAcked(frame.largestAcked);
// Clear retransmission timer
if (this.retransmissionTimer) {
clearTimeout(this.retransmissionTimer);
this.retransmissionTimer = null;
}
}
handlePingFrame(frame) {
console.log('Received PING frame');
this.sendPing();
}
handleConnectionCloseFrame(frame) {
console.log('Connection closed by peer');
this.close();
}
// Stream management
createStream() {
const streamId = this.nextStreamId;
this.nextStreamId += 4; // Only client or server initiated streams
const stream = new QUICStream(streamId, this);
this.streams.set(streamId, stream);
return stream;
}
getStream(streamId) {
return this.streams.get(streamId);
}
// Congestion control
async sendData(streamId, data, fin = false) {
const stream = this.streams.get(streamId);
if (!stream) {
throw new Error(`Stream ${streamId} not found`);
}
// Check flow control
if (stream.bytesSent + data.length > this.maxStreamData) {
throw new Error('Flow control limit exceeded');
}
// Check congestion control
if (!this.congestionControl.canSend(data.length)) {
// Buffer data until we can send
stream.bufferData(data);
return;
}
// Create stream frame
const frame = {
type: 'STREAM',
streamId: streamId,
data: data,
fin: fin
};
// Send packet with stream frame
const packet = this.createPacket({
type: '1-RTT',
packetNumber: ++this.packetNumber,
frames: [frame]
});
await this.sendPacket(packet);
// Update congestion control
this.congestionControl.onPacketSent(data.length);
// Start retransmission timer
this.startRetransmissionTimer();
console.log(`Sent ${data.length} bytes on stream ${streamId}`);
}
sendAck() {
const ackFrame = {
type: 'ACK',
largestAcked: this.largestAckedPacket,
ackDelay: 0,
ackRanges: [[this.largestAckedPacket, this.largestAckedPacket]]
};
const packet = this.createPacket({
type: '1-RTT',
packetNumber: ++this.packetNumber,
frames: [ackFrame]
});
this.sendPacket(packet);
}
sendPing() {
const pingFrame = { type: 'PING' };
const packet = this.createPacket({
type: '1-RTT',
packetNumber: ++this.packetNumber,
frames: [pingFrame]
});
this.sendPacket(packet);
}
startRetransmissionTimer() {
if (this.retransmissionTimer) {
clearTimeout(this.retransmissionTimer);
}
this.retransmissionTimer = setTimeout(() => {
this.handleRetransmissionTimeout();
}, this.congestionControl.getRetransmissionTimeout());
}
handleRetransmissionTimeout() {
console.log('Retransmission timeout');
this.congestionControl.onRetransmissionTimeout();
// Retransmit unacknowledged packets
this.retransmitUnackedPackets();
}
retransmitUnackedPackets() {
// Implement packet retransmission logic
console.log('Retransmitting unacked packets');
}
setupEncryption(cryptoParams) {
// Simplified encryption setup
console.log('Setting up encryption with:', cryptoParams);
// In real implementation, derive keys from TLS handshake
}
close() {
this.state = 'closing';
// Send connection close frame
const closeFrame = {
type: 'CONNECTION_CLOSE',
errorCode: 0,
reason: 'Connection closed'
};
const packet = this.createPacket({
type: '1-RTT',
packetNumber: ++this.packetNumber,
frames: [closeFrame]
});
this.sendPacket(packet);
// Clean up timers
if (this.retransmissionTimer) {
clearTimeout(this.retransmissionTimer);
}
this.state = 'closed';
}
}
class QUICStream {
constructor(streamId, connection) {
this.id = streamId;
this.connection = connection;
this.state = 'idle'; // idle, open, closed
this.buffer = [];
this.bytesReceived = 0;
this.bytesSent = 0;
this.flowControlWindow = 64 * 1024; // 64KB
this.maxData = 256 * 1024; // 256KB
}
handleData(offset, data, fin) {
this.bytesReceived += data.length;
// Add data to buffer at correct offset
this.buffer.push({ offset, data });
this.buffer.sort((a, b) => a.offset - b.offset);
if (fin) {
this.state = 'closed';
}
// Trigger data available event
this.onDataAvailable();
}
onDataAvailable() {
// Emit data to application
const data = this.getBufferedData();
if (data) {
console.log(`Stream ${this.id} received ${data.length} bytes`);
this.emit('data', data);
}
}
getBufferedData() {
// Return contiguous buffered data
let data = Buffer.alloc(0);
let expectedOffset = 0;
for (const chunk of this.buffer) {
if (chunk.offset === expectedOffset) {
data = Buffer.concat([data, chunk.data]);
expectedOffset += chunk.data.length;
} else {
// Gap in data, wait for more
break;
}
}
return data.length > 0 ? data : null;
}
async write(data) {
if (this.state === 'closed') {
throw new Error('Stream is closed');
}
await this.connection.sendData(this.id, data, false);
this.bytesSent += data.length;
}
async end(data = null) {
if (data) {
await this.write(data);
}
await this.connection.sendData(this.id, Buffer.alloc(0), true);
this.state = 'closed';
}
emit(event, data) {
// Simplified event emission
if (this.listeners && this.listeners[event]) {
this.listeners[event].forEach(callback => callback(data));
}
}
on(event, callback) {
if (!this.listeners) {
this.listeners = {};
}
if (!this.listeners[event]) {
this.listeners[event] = [];
}
this.listeners[event].push(callback);
}
bufferData(data) {
this.buffer.push({ offset: this.bytesSent, data });
}
}
class CubicCongestionControl {
constructor() {
this.cwnd = 10 * 1024; // Initial congestion window
this.ssthresh = Infinity; // Slow start threshold
this.minCwnd = 2 * 1024; // Minimum congestion window
this.maxCwnd = 10 * 1024 * 1024; // Maximum congestion window
this.reno = 0; // Reno recovery
this.lastMaxCwnd = 0;
this.rtt = 100; // Initial RTT estimate in ms
this.rttVar = 50; // RTT variance
this.rto = 1000; // Retransmission timeout
this.packetsInFlight = 0;
this.bytesInFlight = 0;
this.endOfRecovery = null;
}
canSend(bytes) {
return this.bytesInFlight + bytes <= this.cwnd;
}
onPacketSent(bytes) {
this.packetsInFlight++;
this.bytesInFlight += bytes;
}
onPacketAcked(packetNumber) {
this.packetsInFlight--;
if (this.endOfRecovery && packetNumber < this.endOfRecovery) {
// Still in recovery
return;
}
// Update congestion window based on current phase
if (this.cwnd <= this.ssthresh) {
// Slow start
this.cwnd += Math.min(bytes, 1500); // Acked bytes or 1 MSS
} else {
// Congestion avoidance (CUBIC)
const timeSinceLastMaxCwnd = Date.now() - this.lastMaxCwndTime;
const k = Math.cbrt(this.lastMaxCwnd / 0.4); // CUBIC constant
const w = this.lastMaxCwnd + 0.4 * Math.pow(timeSinceLastMaxCwnd / 1000 - k, 3);
this.cwnd = Math.min(w, this.maxCwnd);
}
this.endOfRecovery = null;
this.bytesInFlight = Math.max(0, this.bytesInFlight - 1500);
}
onPacketLost() {
console.log('Packet lost, entering congestion recovery');
this.lastMaxCwnd = Math.max(this.cwnd, this.lastMaxCwnd);
this.lastMaxCwndTime = Date.now();
// Reduce congestion window
this.ssthresh = this.cwnd / 2;
this.cwnd = this.cwnd / 2;
this.cwnd = Math.max(this.cwnd, this.minCwnd);
this.endOfRecovery = Date.now() + this.rtt;
}
onRetransmissionTimeout() {
console.log('Retransmission timeout');
this.ssthresh = this.cwnd / 2;
this.cwnd = this.minCwnd;
// Reset RTT estimates
this.rto = Math.min(this.rto * 2, 60000); // Exponential backoff
}
updateRTT(newRTT) {
if (this.rtt === 0) {
this.rtt = newRTT;
this.rttVar = newRTT / 2;
} else {
this.rttVar = (3 * this.rttVar + Math.abs(newRTT - this.rtt)) / 4;
this.rtt = (7 * this.rtt + newRTT) / 8;
}
this.rto = this.rtt + 4 * this.rttVar;
this.rto = Math.min(Math.max(this.rto, 200), 60000); // Clamp between 200ms and 60s
}
getRetransmissionTimeout() {
return this.rto;
}
}
// Server implementation
class QUICServer {
constructor(port = 0) {
this.port = port;
this.socket = null;
this.connections = new Map();
this.connectionId = 0;
}
start() {
this.socket = dgram.createSocket('udp4');
this.socket.on('message', (msg, rinfo) => {
this.handlePacket(msg, rinfo);
});
this.socket.on('listening', () => {
const address = this.socket.address();
console.log(`QUIC server listening on ${address.address}:${address.port}`);
});
this.socket.bind(this.port);
}
handlePacket(buffer, rinfo) {
const connectionKey = `${rinfo.address}:${rinfo.port}`;
if (!this.connections.has(connectionKey)) {
console.log(`New connection from ${connectionKey}`);
const connection = new QUICConnection(
this.socket,
rinfo.address,
rinfo.port,
false // Server connection
);
this.connections.set(connectionKey, connection);
// Clean up on connection close
connection.on('close', () => {
this.connections.delete(connectionKey);
console.log(`Connection ${connectionKey} closed`);
});
}
const connection = this.connections.get(connectionKey);
connection.handleReceivedPacket(buffer);
}
stop() {
if (this.socket) {
this.socket.close();
console.log('QUIC server stopped');
}
}
}
// Usage example
const server = new QUICServer(4433);
server.start();
console.log('QUIC Reliable Transport Server started');
// Client example
const clientSocket = dgram.createSocket('udp4');
const client = new QUICConnection(clientSocket, '127.0.0.1', 4433, true);
client.on('connected', () => {
console.log('QUIC connection established');
// Create a stream and send data
const stream = client.createStream();
stream.write('Hello, QUIC!');
stream.end();
});
client.on('data', (data) => {
console.log('Received data:', data.toString());
});
// Start connection
client.connect();
// Handle incoming packets
clientSocket.on('message', (msg, rinfo) => {
if (rinfo.address === '127.0.0.1' && rinfo.port === 4433) {
client.handleReceivedPacket(msg);
}
});
clientSocket.bind(); // Bind to random port
process.on('SIGINT', () => {
server.stop();
clientSocket.close();
process.exit(0);
});
💻 QUIC-Verbindungsmigration
🔴 complex
⭐⭐⭐⭐⭐
Implementierung der QUIC-Verbindungsmigration zwischen verschiedenen Netzwerk-Interfaces
⏱️ 45 min
🏷️ connection-migration, network-interfaces, path-validation, seamless-handover
// QUIC Connection Migration Implementation
// Seamless migration between network interfaces and IP addresses
const dgram = require('dgram');
const os = require('os');
const EventEmitter = require('events');
class QUICConnectionMigrator extends EventEmitter {
constructor(quicConnection) {
super();
this.quicConnection = quicConnection;
// Migration state
this.activeConnections = new Map(); // interface -> socket
this.primaryInterface = null;
this.connectionId = quicConnection.connectionId;
// Network interfaces
this.interfaces = new Map(); // name -> interface info
this.monitorInterval = null;
// Path management
this.paths = new Map(); // pathId -> path info
this.activePath = null;
this.pathValidation = new Map(); // pathId -> validation state
// Migration policies
this.migrationPolicy = 'automatic'; // automatic, manual, disabled
this.migrationThreshold = {
packetLoss: 0.1, // 10% packet loss triggers migration
latency: 500, // 500ms latency increase triggers migration
bandwidth: 100 // 100KB/s bandwidth drop triggers migration
};
this.initializeInterfaces();
this.startMonitoring();
}
initializeInterfaces() {
const networkInterfaces = os.networkInterfaces();
for (const [name, interfaces] of Object.entries(networkInterfaces)) {
for (const iface of interfaces) {
if (iface.family === 'IPv4' && !iface.internal) {
this.interfaces.set(name, {
name: name,
address: iface.address,
netmask: iface.netmask,
mac: iface.mac,
family: iface.family,
internal: iface.internal
});
// Create socket for each interface
this.createSocketForInterface(name, iface.address);
}
}
}
console.log('Available network interfaces:', Array.from(this.interfaces.keys()));
}
createSocketForInterface(interfaceName, address) {
try {
const socket = dgram.createSocket('udp4');
socket.bind({ address: address, port: 0 });
socket.on('message', (msg, rinfo) => {
this.handlePacket(msg, rinfo, interfaceName);
});
socket.on('error', (error) => {
console.error(`Socket error for interface ${interfaceName}:`, error);
this.emit('socketError', { interfaceName, error });
});
this.activeConnections.set(interfaceName, socket);
// Set as primary if this is the first interface
if (!this.primaryInterface) {
this.primaryInterface = interfaceName;
console.log(`Primary interface set to: ${interfaceName} (${address})`);
}
} catch (error) {
console.error(`Failed to create socket for interface ${interfaceName}:`, error);
}
}
startMonitoring() {
this.monitorInterval = setInterval(() => {
this.monitorInterfaces();
this.analyzePerformance();
this.checkMigrationConditions();
}, 1000); // Monitor every second
}
monitorInterfaces() {
const currentInterfaces = os.networkInterfaces();
const activeInterfaceNames = new Set();
for (const [name, interfaces] of Object.entries(currentInterfaces)) {
for (const iface of interfaces) {
if (iface.family === 'IPv4' && !iface.internal) {
activeInterfaceNames.add(name);
if (!this.interfaces.has(name)) {
console.log(`New interface detected: ${name}`);
this.emit('interfaceAdded', { name, interface: iface });
}
}
}
}
// Check for removed interfaces
for (const [name, iface] of this.interfaces.entries()) {
if (!activeInterfaceNames.has(name)) {
console.log(`Interface removed: ${name}`);
this.interfaces.delete(name);
const socket = this.activeConnections.get(name);
if (socket) {
socket.close();
this.activeConnections.delete(name);
}
this.emit('interfaceRemoved', { name, interface: iface });
// Migrate if primary interface was removed
if (this.primaryInterface === name) {
this.selectNewPrimaryInterface();
}
}
}
}
analyzePerformance() {
for (const [interfaceName, path] of this.paths.entries()) {
const metrics = this.calculatePathMetrics(path);
// Update path metrics
path.metrics = {
...path.metrics,
...metrics,
lastUpdate: Date.now()
};
// Emit performance events
this.emit('pathMetrics', {
interfaceName,
metrics
});
}
}
calculatePathMetrics(path) {
const now = Date.now();
const recentPackets = path.packets.filter(
p => now - p.timestamp < 5000 // Last 5 seconds
);
const metrics = {
packetLoss: 0,
latency: 0,
bandwidth: 0,
jitter: 0
};
if (recentPackets.length === 0) {
return metrics;
}
// Calculate packet loss
const lostPackets = recentPackets.filter(p => p.lost).length;
metrics.packetLoss = lostPackets / recentPackets.length;
// Calculate latency and jitter
const rttPackets = recentPackets.filter(p => p.rtt);
if (rttPackets.length > 0) {
const rtts = rttPackets.map(p => p.rtt);
metrics.latency = rtts.reduce((sum, rtt) => sum + rtt, 0) / rtts.length;
// Calculate jitter (RTT variance)
const avgRtt = metrics.latency;
metrics.jitter = Math.sqrt(
rtts.reduce((sum, rtt) => sum + Math.pow(rtt - avgRtt, 2), 0) / rtts.length
);
}
// Calculate bandwidth
const timeWindow = 5000; // 5 seconds
const recentBytes = recentPackets.reduce((sum, p) => sum + (p.size || 0), 0);
metrics.bandwidth = (recentBytes * 8) / (timeWindow / 1000); // bits per second
return metrics;
}
checkMigrationConditions() {
if (this.migrationPolicy === 'disabled') {
return;
}
if (this.migrationPolicy === 'manual') {
this.emit('migrationCheck', this.getMigrationRecommendations());
return;
}
// Automatic migration
const recommendations = this.getMigrationRecommendations();
for (const rec of recommendations) {
if (rec.severity === 'critical' || rec.severity === 'high') {
console.log(`Triggering automatic migration: ${rec.reason}`);
this.migrateToInterface(rec.targetInterface);
break;
}
}
}
getMigrationRecommendations() {
const recommendations = [];
for (const [interfaceName, path] of this.paths.entries()) {
if (!path.metrics) continue;
const metrics = path.metrics;
// Check packet loss
if (metrics.packetLoss > this.migrationThreshold.packetLoss) {
recommendations.push({
sourceInterface: interfaceName,
targetInterface: this.findBestAlternativeInterface(interfaceName),
reason: `High packet loss: ${(metrics.packetLoss * 100).toFixed(2)}%`,
severity: metrics.packetLoss > 0.2 ? 'critical' : 'high',
metrics
});
}
// Check latency
if (metrics.latency > this.migrationThreshold.latency) {
recommendations.push({
sourceInterface: interfaceName,
targetInterface: this.findBestAlternativeInterface(interfaceName),
reason: `High latency: ${metrics.latency.toFixed(2)}ms`,
severity: metrics.latency > 1000 ? 'critical' : 'medium',
metrics
});
}
// Check bandwidth
if (metrics.bandwidth < this.migrationThreshold.bandwidth * 1024) {
recommendations.push({
sourceInterface: interfaceName,
targetInterface: this.findBestAlternativeInterface(interfaceName),
reason: `Low bandwidth: ${(metrics.bandwidth / 1024).toFixed(2)}KB/s`,
severity: metrics.bandwidth < 50 * 1024 ? 'high' : 'low',
metrics
});
}
}
return recommendations.sort((a, b) => {
const severityOrder = { critical: 3, high: 2, medium: 1, low: 0 };
return severityOrder[b.severity] - severityOrder[a.severity];
});
}
findBestAlternativeInterface(currentInterface) {
const alternatives = Array.from(this.interfaces.keys())
.filter(name => name !== currentInterface);
// Score each alternative interface
const scored = alternatives.map(name => {
const path = this.paths.get(name);
let score = 100;
if (path && path.metrics) {
// Penalize poor performance
score -= path.metrics.packetLoss * 100;
score -= path.metrics.latency / 10;
score += path.metrics.bandwidth / 1000;
} else {
// Unknown interface, give neutral score
score = 50;
}
return { name, score };
});
// Return the best scoring interface
scored.sort((a, b) => b.score - a.score);
return scored[0]?.name;
}
async migrateToInterface(targetInterface) {
if (!this.interfaces.has(targetInterface)) {
throw new Error(`Target interface ${targetInterface} not available`);
}
if (targetInterface === this.primaryInterface) {
console.log('Already using target interface');
return;
}
console.log(`Migrating from ${this.primaryInterface} to ${targetInterface}`);
try {
// Start migration process
this.emit('migrationStart', {
from: this.primaryInterface,
to: targetInterface
});
// Create new path
const newSocket = this.activeConnections.get(targetInterface);
const oldSocket = this.activeConnections.get(this.primaryInterface);
// Send PATH_CHALLENGE to validate new path
await this.validatePath(targetInterface);
// Update QUIC connection to use new path
await this.updateConnectionPath(targetInterface);
// Gracefully switch to new socket
const success = await this.switchConnectionSockets(oldSocket, newSocket);
if (success) {
// Clean up old socket
this.primaryInterface = targetInterface;
console.log(`Successfully migrated to ${targetInterface}`);
this.emit('migrationComplete', {
from: oldSocket.address,
to: newSocket.address,
success: true
});
} else {
throw new Error('Socket switch failed');
}
} catch (error) {
console.error(`Migration to ${targetInterface} failed:`, error);
this.emit('migrationFailed', {
from: this.primaryInterface,
to: targetInterface,
error: error.message
});
}
}
async validatePath(interfaceName) {
return new Promise((resolve, reject) => {
const socket = this.activeConnections.get(interfaceName);
if (!socket) {
reject(new Error(`Socket not available for interface ${interfaceName}`));
return;
}
const pathId = this.generatePathId();
const challenge = crypto.randomBytes(8);
// Send PATH_CHALLENGE
const packet = this.createPathChallengePacket(pathId, challenge);
socket.send(packet, this.quicConnection.remotePort, this.quicConnection.remoteAddress);
// Wait for PATH_RESPONSE
const timeout = setTimeout(() => {
reject(new Error('Path validation timeout'));
}, 5000);
const onResponse = (msg, rinfo) => {
// Check if this is the PATH_RESPONSE
if (this.isPathResponse(msg, pathId, challenge)) {
clearTimeout(timeout);
socket.off('message', onResponse);
resolve(true);
}
};
socket.on('message', onResponse);
// Store validation state
this.pathValidation.set(pathId, {
interfaceName,
challenge,
startTime: Date.now()
});
});
}
generatePathId() {
return Math.floor(Math.random() * 2 ** 32);
}
createPathChallengePacket(pathId, challenge) {
// Create PATH_CHALLENGE frame (simplified)
const frame = Buffer.alloc(9);
frame.writeUInt8(0x1A, 0); // PATH_CHALLENGE frame type
frame.writeUInt32BE(pathId, 1);
challenge.copy(frame, 5);
return frame;
}
isPathResponse(packet, pathId, challenge) {
// Simplified PATH_RESPONSE check
return packet.length >= 9 &&
packet.readUInt8(0) === 0x1B && // PATH_RESPONSE frame type
packet.readUInt32BE(1) === pathId;
}
async updateConnectionPath(interfaceName) {
// Update QUIC connection to use new path
const interfaceInfo = this.interfaces.get(interfaceName);
// This would integrate with the actual QUIC implementation
// to update the connection's local address
console.log(`Updating connection path to ${interfaceName} (${interfaceInfo.address})`);
// Record the new path
const pathId = this.generatePathId();
this.paths.set(pathId, {
interfaceName,
address: interfaceInfo.address,
packets: [],
created: Date.now()
});
this.activePath = pathId;
}
async switchConnectionSockets(oldSocket, newSocket) {
// Implement seamless socket switching
return new Promise((resolve) => {
// Buffer packets on old socket
const buffer = [];
const oldPacketHandler = (msg, rinfo) => {
buffer.push({ msg, rinfo });
};
oldSocket.on('message', oldPacketHandler);
// Small delay to ensure no packets are lost
setTimeout(() => {
oldSocket.off('message', oldPacketHandler);
// Replay buffered packets on new socket
buffer.forEach(({ msg, rinfo }) => {
this.quicConnection.handleReceivedPacket(msg, rinfo);
});
resolve(true);
}, 100);
});
}
selectNewPrimaryInterface() {
const availableInterfaces = Array.from(this.interfaces.keys());
if (availableInterfaces.length === 0) {
console.log('No network interfaces available');
this.emit('noInterfacesAvailable');
return;
}
// Select the first available interface
const newPrimary = availableInterfaces[0];
this.primaryInterface = newPrimary;
console.log(`New primary interface: ${newPrimary}`);
this.emit('primaryInterfaceChanged', { newInterface: newPrimary });
}
handlePacket(packet, rinfo, interfaceName) {
// Record packet metrics for path analysis
this.recordPacketMetrics(interfaceName, packet);
// Check if this packet is for our connection
if (this.shouldHandlePacket(packet, interfaceName)) {
this.quicConnection.handleReceivedPacket(packet);
}
}
shouldHandlePacket(packet, interfaceName) {
// Simplified check - in real implementation, verify connection ID
return interfaceName === this.primaryInterface || this.paths.size > 0;
}
recordPacketMetrics(interfaceName, packet) {
if (!this.paths.has(interfaceName)) {
return;
}
const path = this.paths.get(interfaceName);
path.packets.push({
timestamp: Date.now(),
size: packet.length,
lost: false
});
// Clean up old packets (older than 1 minute)
const cutoff = Date.now() - 60000;
path.packets = path.packets.filter(p => p.timestamp > cutoff);
}
// Public API
setMigrationPolicy(policy) {
this.migrationPolicy = policy;
console.log(`Migration policy set to: ${policy}`);
}
setMigrationThresholds(thresholds) {
this.migrationThreshold = { ...this.migrationThreshold, ...thresholds };
console.log('Migration thresholds updated:', this.migrationThreshold);
}
getInterfaceStatus() {
return Array.from(this.interfaces.entries()).map(([name, info]) => ({
name,
address: info.address,
isPrimary: name === this.primaryInterface,
hasConnection: this.activeConnections.has(name)
}));
}
getPathMetrics() {
const metrics = {};
for (const [pathId, path] of this.paths.entries()) {
metrics[pathId] = {
interfaceName: path.interfaceName,
address: path.address,
metrics: path.metrics,
created: path.created
};
}
return metrics;
}
stop() {
if (this.monitorInterval) {
clearInterval(this.monitorInterval);
this.monitorInterval = null;
}
// Close all sockets
for (const socket of this.activeConnections.values()) {
socket.close();
}
this.activeConnections.clear();
console.log('Connection migration stopped');
}
}
// Usage example with existing QUIC connection
class MigratableQUICConnection {
constructor() {
this.baseConnection = null; // Your existing QUICConnection
this.migrator = null;
}
async connect(serverAddress, serverPort) {
// Create base QUIC connection
this.baseConnection = new QUICConnection(/* ... */);
// Add migration support
this.migrator = new QUICConnectionMigrator(this.baseConnection);
// Listen for migration events
this.migrator.on('migrationStart', (info) => {
console.log('Connection migration starting:', info);
});
this.migrator.on('migrationComplete', (info) => {
console.log('Connection migration complete:', info);
});
this.migrator.on('migrationFailed', (info) => {
console.error('Connection migration failed:', info);
});
// Establish initial connection
await this.baseConnection.connect();
}
// Migration control
enableAutoMigration() {
this.migrator.setMigrationPolicy('automatic');
}
disableMigration() {
this.migrator.setMigrationPolicy('disabled');
}
setCustomMigrationThresholds(thresholds) {
this.migrator.setMigrationThresholds(thresholds);
}
async manualMigration(targetInterface) {
return await this.migrator.migrateToInterface(targetInterface);
}
getConnectionStatus() {
return {
interfaces: this.migrator.getInterfaceStatus(),
paths: this.migrator.getPathMetrics(),
primaryInterface: this.migrator.primaryInterface
};
}
close() {
if (this.migrator) {
this.migrator.stop();
}
if (this.baseConnection) {
this.baseConnection.close();
}
}
}
// Example usage
async function demonstrateMigration() {
const connection = new MigratableQUICConnection();
// Configure migration
connection.enableAutoMigration();
connection.setCustomMigrationThresholds({
packetLoss: 0.05, // 5% packet loss
latency: 300, // 300ms latency
bandwidth: 200 // 200KB/s minimum
});
// Connect to server
await connection.connect('example.com', 4433);
// Monitor connection status
setInterval(() => {
const status = connection.getConnectionStatus();
console.log('Connection status:', status);
}, 5000);
// Manual migration example
// await connection.manualMigration('wlan0');
// Graceful shutdown
setTimeout(() => {
connection.close();
}, 60000);
}
// Network change detection
class NetworkChangeDetector {
constructor(migrator) {
this.migrator = migrator;
this.currentNetworkState = null;
this.changeThreshold = 3; // Require 3 consecutive changes
this.changeCount = 0;
}
startDetection() {
setInterval(() => {
this.detectNetworkChanges();
}, 2000);
}
detectNetworkChanges() {
const currentState = this.captureNetworkState();
if (this.currentNetworkState &&
!this.compareNetworkStates(this.currentNetworkState, currentState)) {
this.changeCount++;
if (this.changeCount >= this.changeThreshold) {
console.log('Network change detected, triggering migration');
this.migrator.emit('networkChanged', {
from: this.currentNetworkState,
to: currentState
});
this.changeCount = 0;
}
} else {
this.changeCount = 0;
}
this.currentNetworkState = currentState;
}
captureNetworkState() {
const interfaces = os.networkInterfaces();
const state = {
timestamp: Date.now(),
interfaces: []
};
for (const [name, ifaces] of Object.entries(interfaces)) {
for (const iface of ifaces) {
if (iface.family === 'IPv4' && !iface.internal) {
state.interfaces.push({
name,
address: iface.address,
mac: iface.mac
});
}
}
}
return state;
}
compareNetworkStates(state1, state2) {
if (state1.interfaces.length !== state2.interfaces.length) {
return false;
}
const sorted1 = state1.interfaces.sort((a, b) => a.name.localeCompare(b.name));
const sorted2 = state2.interfaces.sort((a, b) => a.name.localeCompare(b.name));
return JSON.stringify(sorted1) === JSON.stringify(sorted2);
}
}
// Start demonstration
demonstrateMigration().catch(console.error);
💻 QUIC Performance-Optimierung
🔴 complex
⭐⭐⭐⭐⭐
Erweitertes QUIC-Performance-Tuning mit Zero-Copy I/O und Kernel-Bypass
⏱️ 55 min
🏷️ performance, optimization, zero-copy, memory-pool, kernel-bypass
// QUIC Performance Optimization
// Zero-copy I/O, memory pools, kernel bypass, and advanced tuning
const dgram = require('dgram');
const fs = require('fs');
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const EventEmitter = require('events');
class QUICPerformanceOptimizer {
constructor(options = {}) {
this.options = {
// Memory management
bufferSize: options.bufferSize || 64 * 1024, // 64KB
poolSize: options.poolSize || 100,
maxPoolSize: options.maxPoolSize || 1000,
// Zero-copy settings
enableZeroCopy: options.enableZeroCopy !== false,
enableKernelBypass: options.enableKernelBypass || false,
// Threading
enableWorkers: options.enableWorkers || true,
workerCount: options.workerCount || require('os').cpus().length,
// Tuning parameters
mtuDiscovery: options.mtuDiscovery !== false,
adaptiveCongestion: options.adaptiveCongestion !== false,
packetCoalescing: options.packetCoalescing !== false,
...options
};
// Memory pool management
this.bufferPool = new BufferPool(this.options.bufferSize, this.options.poolSize);
this.packetPool = new PacketPool(this.options.maxPoolSize);
// Worker management
this.workers = [];
this.workQueue = [];
this.workerIndex = 0;
// Performance metrics
this.metrics = new QUICMetrics();
this.initialize();
}
initialize() {
// Initialize buffer pools
this.bufferPool.initialize();
this.packetPool.initialize();
// Initialize workers if enabled
if (this.options.enableWorkers) {
this.initializeWorkers();
}
// Setup performance monitoring
this.setupMonitoring();
}
initializeWorkers() {
console.log(`Initializing ${this.options.workerCount} QUIC workers...`);
for (let i = 0; i < this.options.workerCount; i++) {
const worker = new Worker(__filename, {
workerData: {
type: 'worker',
workerId: i,
bufferSize: this.options.bufferSize
}
});
worker.on('message', (msg) => {
this.handleWorkerMessage(msg, i);
});
worker.on('error', (error) => {
console.error(`Worker ${i} error:`, error);
this.metrics.recordWorkerError(i);
});
worker.on('exit', (code) => {
console.log(`Worker ${i} exited with code ${code}`);
if (code !== 0) {
this.restartWorker(i);
}
});
this.workers.push(worker);
}
}
restartWorker(index) {
console.log(`Restarting worker ${index}`);
const worker = new Worker(__filename, {
workerData: {
type: 'worker',
workerId: index,
bufferSize: this.options.bufferSize
}
});
worker.on('message', (msg) => {
this.handleWorkerMessage(msg, index);
});
this.workers[index] = worker;
}
setupMonitoring() {
// Monitor memory usage
setInterval(() => {
const memUsage = process.memoryUsage();
this.metrics.updateMemoryUsage(memUsage);
}, 1000);
// Monitor performance metrics
setInterval(() => {
this.metrics.logPerformanceReport();
}, 5000);
}
// Zero-copy packet processing
async processPacketZeroCopy(socket, buffer, rinfo, callback) {
if (!this.options.enableZeroCopy) {
// Fallback to regular processing
return await this.processPacketRegular(socket, buffer, rinfo, callback);
}
const startTime = process.hrtime.bigint();
try {
// Use buffer pool for temporary storage
const pooledBuffer = this.bufferPool.acquire();
// Zero-copy: directly use the received buffer
await this.processPacketInWorker(buffer, rinfo, pooledBuffer);
// Return buffer to pool
this.bufferPool.release(pooledBuffer);
// Record metrics
const endTime = process.hrtime.bigint();
const processingTime = Number(endTime - startTime) / 1000000; // Convert to ms
this.metrics.recordPacketProcessing(processingTime, buffer.length);
} catch (error) {
console.error('Zero-copy packet processing error:', error);
throw error;
}
}
async processPacketRegular(socket, buffer, rinfo, callback) {
// Regular packet processing with copying
const packetCopy = Buffer.from(buffer);
return await callback(packetCopy, rinfo);
}
async processPacketInWorker(buffer, rinfo, outputBuffer) {
return new Promise((resolve, reject) => {
if (this.options.enableWorkers && this.workers.length > 0) {
// Distribute to workers
const worker = this.workers[this.workerIndex];
this.workerIndex = (this.workerIndex + 1) % this.workers.length;
const timeout = setTimeout(() => {
reject(new Error('Worker timeout'));
}, 1000);
worker.once('message', (result) => {
clearTimeout(timeout);
if (result.error) {
reject(new Error(result.error));
} else {
// Copy result to output buffer (zero-copy alternative would be shared memory)
result.data.copy(outputBuffer);
resolve();
}
});
worker.send({
type: 'processPacket',
buffer: buffer,
rinfo: rinfo
});
} else {
// Process in main thread
this.processPacketInline(buffer, rinfo, outputBuffer);
resolve();
}
});
}
processPacketInline(buffer, rinfo, outputBuffer) {
// Inline packet processing
const packetHeader = this.parseQUICHeader(buffer);
// Process QUIC frames
const frames = this.parseQUICFrames(buffer.slice(packetHeader.length));
// Generate response (if needed)
const response = this.generateResponse(frames);
// Copy to output buffer
response.copy(outputBuffer);
}
parseQUICHeader(buffer) {
// Simplified QUIC header parsing
if (buffer.length < 1) return 0;
const firstByte = buffer.readUInt8(0);
let headerLength = 1;
// Long header format
if (firstByte & 0x80) {
headerLength += 4; // Version
headerLength += 1; // Destination Connection ID length
headerLength += 16; // Destination Connection ID
headerLength += 1; // Source Connection ID length
// Source Connection ID length would be read here
} else {
// Short header format
headerLength += 1; // Destination Connection ID
}
return headerLength;
}
parseQUICFrames(data) {
const frames = [];
let offset = 0;
while (offset < data.length) {
const frameType = data[offset++];
let frameLength = 1;
switch (frameType) {
case 0x00: // STREAM frame
const streamInfo = this.parseStreamFrameInfo(data, offset);
frameLength += streamInfo.length;
frames.push({
type: 'STREAM',
streamId: streamInfo.streamId,
data: data.slice(offset, offset + streamInfo.dataLength)
});
offset += streamInfo.dataLength;
break;
case 0x02: // ACK frame
const ackInfo = this.parseAckFrameInfo(data, offset);
frameLength += ackInfo.length;
frames.push({
type: 'ACK',
ranges: ackInfo.ranges
});
offset += ackInfo.length;
break;
default:
// Unknown frame type, skip
break;
}
offset += frameLength;
}
return frames;
}
parseStreamFrameInfo(buffer, offset) {
const streamId = this.readVariableLengthInteger(buffer, offset);
offset += streamId.length;
const offsetData = this.readVariableLengthInteger(buffer, offset);
offset += offsetData.length;
const length = this.readVariableLengthInteger(buffer, offset);
offset += length.length;
return {
streamId: streamId.value,
dataLength: length.value,
length: streamId.length + offsetData.length + length.length
};
}
parseAckFrameInfo(buffer, offset) {
const largestAcked = this.readVariableLengthInteger(buffer, offset);
offset += largestAcked.length;
const ackDelay = this.readVariableLengthInteger(buffer, offset);
offset += ackDelay.length;
// Simplified: single ACK range
return {
ranges: [[largestAcked.value, largestAcked.value]],
length: largestAcked.length + ackDelay.length
};
}
readVariableLengthInteger(buffer, offset) {
const first = buffer[offset];
let value = first & 0x3f;
let length = 1;
if (first & 0xc0) {
const additionalBytes = Math.floor((first & 0xc0) >> 6);
length += additionalBytes;
for (let i = 1; i <= additionalBytes; i++) {
value = (value << 8) | buffer[offset + i];
}
}
return { value, length };
}
generateResponse(frames) {
// Generate response based on frames
const response = Buffer.alloc(1024);
let offset = 0;
// Add ACK frame if needed
if (frames.some(f => f.type === 'STREAM')) {
offset += this.writeAckFrame(response, offset);
}
// Add response data
response.write('HTTP/3 Response', offset);
offset += 16;
return response.slice(0, offset);
}
writeAckFrame(buffer, offset) {
buffer.writeUInt8(0x02, offset++); // ACK frame type
buffer.writeUInt32BE(1, offset); // Largest ACKed packet number
offset += 4;
buffer.writeUInt32BE(0, offset); // ACK delay
offset += 4;
return 9; // ACK frame length
}
handleWorkerMessage(message, workerId) {
switch (message.type) {
case 'packetProcessed':
this.metrics.recordWorkerPacketProcessing(workerId, message.processingTime);
break;
case 'error':
console.error(`Worker ${workerId} error:`, message.error);
break;
default:
console.log(`Unknown worker message from ${workerId}:`, message);
}
}
// Kernel bypass implementation (Linux-specific)
setupKernelBypass() {
if (!this.options.enableKernelBypass || process.platform !== 'linux') {
return false;
}
try {
// Setup raw socket for kernel bypass
this.rawSocket = require('raw-socket').createSocket({
protocol: require('raw-socket').Protocol.UDP,
address: '0.0.0.0'
});
// Configure socket for high performance
const sock = this.rawSocket.fd;
// Set socket buffer sizes
this.setSocketBufferSize(sock, 1024 * 1024); // 1MB
this.setSocketTimeout(sock, 0); // No timeout
// Enable SO_REUSEPORT
this.setSocketOption(sock, 'SO_REUSEPORT', 1);
console.log('Kernel bypass enabled');
return true;
} catch (error) {
console.error('Failed to enable kernel bypass:', error);
return false;
}
}
setSocketBufferSize(fd, size) {
const SOL_SOCKET = 1;
const SO_RCVBUF = 8;
const SO_SNDBUF = 7;
this.setSocketOption(fd, SO_RCVBUF, size);
this.setSocketOption(fd, SO_SNDBUF, size);
}
setSocketOption(fd, option, value) {
// Would use native bindings for setsockopt
// This is a simplified implementation
console.log(`Setting socket option ${option} to ${value}`);
}
// MTU Discovery
async discoverMTU(targetAddress, targetPort) {
if (!this.options.mtuDiscovery) {
return 1500; // Default Ethernet MTU
}
let mtu = 1500;
let step = 64; // Start with 64 byte increments
while (step > 1) {
const testPacket = Buffer.alloc(mtu);
try {
// Send test packet
await this.sendTestPacket(testPacket, targetAddress, targetPort);
mtu += step;
} catch (error) {
// Packet too large, step back
mtu -= step;
step = Math.floor(step / 2);
}
}
console.log(`Discovered MTU: ${mtu} bytes`);
return mtu;
}
async sendTestPacket(packet, address, port) {
return new Promise((resolve, reject) => {
const socket = dgram.createSocket('udp4');
socket.send(packet, port, address, (error) => {
socket.close();
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
// Adaptive Congestion Control
setupAdaptiveCongestion() {
if (!this.options.adaptiveCongestion) {
return;
}
this.congestionController = new AdaptiveCongestionControl({
initialCwnd: 10 * 1024,
minCwnd: 2 * 1024,
maxCwnd: 100 * 1024 * 1024, // 100MB
rttTarget: 50, // 50ms target RTT
lossThreshold: 0.01 // 1% packet loss threshold
});
}
// Packet Coalescing
coalescePackets(packets) {
if (!this.options.packetCoalescing || packets.length <= 1) {
return packets;
}
const coalesced = Buffer.alloc(packets.reduce((sum, p) => sum + p.length, 0));
let offset = 0;
for (const packet of packets) {
packet.copy(coalesced, offset);
offset += packet.length;
}
console.log(`Coalesced ${packets.length} packets into ${coalesced.length} bytes`);
return [coalesced];
}
getPerformanceMetrics() {
return {
memory: {
bufferPoolUsage: this.bufferPool.getUsage(),
packetPoolUsage: this.packetPool.getUsage(),
processMemory: process.memoryUsage()
},
workers: {
count: this.workers.length,
utilization: this.metrics.getWorkerUtilization()
},
network: this.metrics.getNetworkMetrics(),
processing: this.metrics.getProcessingMetrics()
};
}
shutdown() {
console.log('Shutting down QUIC performance optimizer...');
// Shutdown workers
for (const worker of this.workers) {
worker.terminate();
}
// Clean up pools
this.bufferPool.cleanup();
this.packetPool.cleanup();
// Close raw socket
if (this.rawSocket) {
this.rawSocket.close();
}
}
}
// Buffer Pool Implementation
class BufferPool {
constructor(bufferSize, poolSize) {
this.bufferSize = bufferSize;
this.poolSize = poolSize;
this.available = [];
this.inUse = new Set();
this.totalCreated = 0;
this.hitCount = 0;
this.missCount = 0;
}
initialize() {
// Pre-allocate buffers
for (let i = 0; i < this.poolSize; i++) {
this.available.push(Buffer.allocUnsafe(this.bufferSize));
this.totalCreated++;
}
console.log(`Buffer pool initialized with ${this.poolSize} buffers of size ${this.bufferSize}`);
}
acquire() {
if (this.available.length > 0) {
const buffer = this.available.pop();
this.inUse.add(buffer);
this.hitCount++;
return buffer;
}
// Pool exhausted, create new buffer
const buffer = Buffer.allocUnsafe(this.bufferSize);
this.inUse.add(buffer);
this.totalCreated++;
this.missCount++;
console.log(`Buffer pool miss, created new buffer. Total: ${this.totalCreated}`);
return buffer;
}
release(buffer) {
if (this.inUse.has(buffer)) {
this.inUse.delete(buffer);
// Zero out buffer for security
buffer.fill(0);
// Only return to pool if we haven't exceeded max size
if (this.available.length < this.poolSize) {
this.available.push(buffer);
}
}
}
getUsage() {
return {
available: this.available.length,
inUse: this.inUse.size,
total: this.totalCreated,
hitRate: this.hitCount / (this.hitCount + this.missCount),
missRate: this.missCount / (this.hitCount + this.missCount)
};
}
cleanup() {
this.available = [];
this.inUse.clear();
console.log('Buffer pool cleaned up');
}
}
// Packet Pool Implementation
class PacketPool {
constructor(maxSize) {
this.maxSize = maxSize;
this.packets = [];
this.usage = new Map();
}
initialize() {
// Packet pool is dynamically sized
console.log(`Packet pool initialized with max size ${this.maxSize}`);
}
acquire() {
if (this.packets.length > 0) {
const packet = this.packets.pop();
packet.lastUsed = Date.now();
this.usage.set(packet, Date.now());
return packet;
}
// Create new packet
const packet = {
data: Buffer.allocUnsafe(1500), // MTU size
lastUsed: Date.now(),
created: Date.now()
};
this.usage.set(packet, Date.now());
return packet;
}
release(packet) {
if (this.usage.has(packet)) {
this.usage.delete(packet);
if (this.packets.length < this.maxSize) {
this.packets.push(packet);
}
}
}
getUsage() {
return {
available: this.packets.length,
inUse: this.usage.size,
maxSize: this.maxSize
};
}
cleanup() {
this.packets = [];
this.usage.clear();
console.log('Packet pool cleaned up');
}
}
// Metrics Collection
class QUICMetrics {
constructor() {
this.startTime = Date.now();
this.packetsProcessed = 0;
this.totalProcessingTime = 0;
this.workerMetrics = new Map();
this.networkMetrics = {
bytesSent: 0,
bytesReceived: 0,
packetsSent: 0,
packetsReceived: 0
};
}
recordPacketProcessing(processingTime, packetSize) {
this.packetsProcessed++;
this.totalProcessingTime += processingTime;
this.networkMetrics.bytesReceived += packetSize;
this.networkMetrics.packetsReceived++;
}
recordWorkerPacketProcessing(workerId, processingTime) {
if (!this.workerMetrics.has(workerId)) {
this.workerMetrics.set(workerId, {
packetsProcessed: 0,
totalProcessingTime: 0,
errors: 0
});
}
const metrics = this.workerMetrics.get(workerId);
metrics.packetsProcessed++;
metrics.totalProcessingTime += processingTime;
}
recordWorkerError(workerId) {
if (!this.workerMetrics.has(workerId)) {
this.workerMetrics.set(workerId, {
packetsProcessed: 0,
totalProcessingTime: 0,
errors: 0
});
}
this.workerMetrics.get(workerId).errors++;
}
updateMemoryUsage(memUsage) {
this.memoryUsage = memUsage;
}
getWorkerUtilization() {
const utilization = {};
for (const [workerId, metrics] of this.workerMetrics.entries()) {
const avgTime = metrics.packetsProcessed > 0
? metrics.totalProcessingTime / metrics.packetsProcessed
: 0;
utilization[workerId] = {
packetsProcessed: metrics.packetsProcessed,
averageProcessingTime: avgTime,
errors: metrics.errors,
errorRate: metrics.errors / (metrics.packetsProcessed + metrics.errors)
};
}
return utilization;
}
getNetworkMetrics() {
const uptime = Date.now() - this.startTime;
return {
...this.networkMetrics,
uptime: uptime,
bytesPerSecond: (this.networkMetrics.bytesReceived / uptime) * 1000,
packetsPerSecond: (this.networkMetrics.packetsReceived / uptime) * 1000
};
}
getProcessingMetrics() {
return {
totalPackets: this.packetsProcessed,
averageProcessingTime: this.packetsProcessed > 0
? this.totalProcessingTime / this.packetsProcessed
: 0,
packetsPerSecond: this.packetsProcessed / ((Date.now() - this.startTime) / 1000)
};
}
logPerformanceReport() {
const networkMetrics = this.getNetworkMetrics();
const processingMetrics = this.getProcessingMetrics();
console.log('=== QUIC Performance Report ===');
console.log(`Uptime: ${(Date.now() - this.startTime) / 1000}s`);
console.log(`Network: ${networkMetrics.bytesPerSecond.toFixed(2)} B/s, ${networkMetrics.packetsPerSecond.toFixed(2)} p/s`);
console.log(`Processing: ${processingMetrics.averageProcessingTime.toFixed(2)}ms avg, ${processingMetrics.packetsPerSecond.toFixed(2)} p/s`);
if (this.memoryUsage) {
console.log(`Memory: ${(this.memoryUsage.heapUsed / 1024 / 1024).toFixed(2)}MB heap`);
}
console.log('=====================================');
}
}
// Worker thread code
if (!isMainThread && workerData.type === 'worker') {
// Worker thread implementation
parentPort.on('message', async (message) => {
switch (message.type) {
case 'processPacket':
try {
const startTime = process.hrtime.bigint();
// Process packet in worker
const result = await processPacketInWorker(message.buffer, message.rinfo);
const endTime = process.hrtime.bigint();
const processingTime = Number(endTime - startTime) / 1000000;
parentPort.postMessage({
type: 'packetProcessed',
processingTime: processingTime,
result: result
});
} catch (error) {
parentPort.postMessage({
type: 'error',
error: error.message
});
}
break;
default:
parentPort.postMessage({
type: 'error',
error: `Unknown worker message type: ${message.type}`
});
}
});
async function processPacketInWorker(buffer, rinfo) {
// Simulate packet processing
await new Promise(resolve => setTimeout(resolve, Math.random() * 10));
return {
processed: true,
rinfo: rinfo,
dataSize: buffer.length
};
}
}
// Example usage
async function demonstratePerformanceOptimization() {
const optimizer = new QUICPerformanceOptimizer({
bufferSize: 128 * 1024, // 128KB buffers
poolSize: 200, // 200 buffers in pool
enableZeroCopy: true,
enableWorkers: true,
workerCount: 4,
mtuDiscovery: true,
adaptiveCongestion: true,
packetCoalescing: true
});
// Create optimized QUIC socket
const socket = dgram.createSocket('udp4');
// Handle incoming packets with zero-copy
socket.on('message', async (msg, rinfo) => {
await optimizer.processPacketZeroCopy(socket, msg, rinfo, (packet, rinfo) => {
// Process the packet
console.log(`Processed packet from ${rinfo.address}:${rinfo.port}`);
});
});
// Monitor performance
setInterval(() => {
const metrics = optimizer.getPerformanceMetrics();
console.log('Performance metrics:', metrics);
}, 10000);
// Graceful shutdown
setTimeout(() => {
console.log('Shutting down...');
optimizer.shutdown();
socket.close();
process.exit(0);
}, 60000);
}
// Start demonstration
if (isMainThread) {
demonstratePerformanceOptimization().catch(console.error);
} else {
// Worker code is handled above
}