🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples gRPC
Exemples du protocole de communication gRPC (Google Remote Procedure Call) avec Protocol Buffers et définitions de services
💻 Définition Protocol Buffers protobuf
🟢 simple
Définitions de service et structures de messages Protocol Buffers de base
// Protocol Buffers v3 syntax
syntax = "proto3";
// Package declaration for namespace
package userservice;
// Import statement for common types
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// Option for Go specific settings
option go_package = "github.com/example/userservice";
// Simple message definition
message User {
string id = 1; // Unique identifier
string name = 2; // User name
string email = 3; // User email
int32 age = 4; // User age
bool active = 5; // Account status
google.protobuf.Timestamp created_at = 6; // Creation timestamp
repeated string roles = 7; // User roles list
map<string, string> metadata = 8; // Additional metadata
}
// Request and response messages
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
repeated string roles = 4;
}
message CreateUserResponse {
User user = 1;
string message = 2;
}
message GetUserRequest {
string user_id = 1;
}
message GetUserResponse {
User user = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 limit = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
int32 total = 2;
}
message UpdateUserRequest {
string user_id = 1;
User user = 2;
}
message UpdateUserResponse {
User user = 1;
string message = 2;
}
message DeleteUserRequest {
string user_id = 1;
}
message DeleteUserResponse {
bool success = 1;
string message = 2;
}
// Enum definition
message User {
enum Status {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
Status status = 9;
}
// Service definition
service UserService {
// Unary RPC
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse);
// Server streaming RPC
rpc ListUsers(ListUsersRequest) returns (stream ListUsersResponse);
// Client streaming RPC
rpc BatchCreateUsers(stream CreateUserRequest) returns (CreateUserResponse);
// Bidirectional streaming RPC
rpc StreamUsers(stream GetUserRequest) returns (stream GetUserResponse);
}
// Health check service
service HealthCheck {
rpc Check(google.protobuf.Empty) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}
// Authentication service
service AuthService {
rpc Login(LoginRequest) returns (LoginResponse);
rpc ValidateToken(TokenRequest) returns (TokenResponse);
rpc RefreshToken(TokenRequest) returns (TokenResponse);
}
message LoginRequest {
string username = 1;
string password = 2;
}
message LoginResponse {
string access_token = 1;
string refresh_token = 2;
int64 expires_in = 3;
}
message TokenRequest {
string token = 1;
}
message TokenResponse {
bool valid = 1;
string user_id = 2;
repeated string roles = 3;
}
💻 Implémentation Serveur gRPC javascript
🟡 intermediate
Implémentation de serveur gRPC avec Node.js
// gRPC Server Implementation in Node.js
const grpc = require('@grpc/grpc-js');
const fs = require('fs');
const path = require('path');
// Load Protocol Buffer definitions
const PROTO_PATH = path.join(__dirname, 'user.proto');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const userProto = grpc.loadPackageDefinition(packageDefinition).userservice;
// In-memory user database
const usersDatabase = new Map();
let userIdCounter = 1;
// Service implementation handlers
class UserServiceImpl {
// Unary RPC: Create User
createUser(call, callback) {
const { name, email, age, roles } = call.request;
// Validate input
if (!name || !email) {
return callback({
code: grpc.status.INVALID_ARGUMENT,
message: 'Name and email are required'
});
}
// Create new user
const user = {
id: (userIdCounter++).toString(),
name,
email,
age: age || 0,
active: true,
created_at: new Date().toISOString(),
roles: roles || ['user'],
metadata: {}
};
usersDatabase.set(user.id, user);
callback(null, {
user,
message: 'User created successfully'
});
}
// Unary RPC: Get User
getUser(call, callback) {
const { user_id } = call.request;
const user = usersDatabase.get(user_id);
if (!user) {
return callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
}
callback(null, { user });
}
// Unary RPC: Update User
updateUser(call, callback) {
const { user_id, user: updatedData } = call.request;
const existingUser = usersDatabase.get(user_id);
if (!existingUser) {
return callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
}
// Update user data
const updatedUser = {
...existingUser,
...updatedData,
id: existingUser.id // Preserve ID
};
usersDatabase.set(user_id, updatedUser);
callback(null, {
user: updatedUser,
message: 'User updated successfully'
});
}
// Unary RPC: Delete User
deleteUser(call, callback) {
const { user_id } = call.request;
const user = usersDatabase.get(user_id);
if (!user) {
return callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
}
usersDatabase.delete(user_id);
callback(null, {
success: true,
message: 'User deleted successfully'
});
}
// Server Streaming RPC: List Users
listUsers(call) {
const { page = 1, limit = 10, filter } = call.request;
const users = Array.from(usersDatabase.values());
// Apply filter if provided
let filteredUsers = users;
if (filter) {
filteredUsers = users.filter(user =>
user.name.toLowerCase().includes(filter.toLowerCase()) ||
user.email.toLowerCase().includes(filter.toLowerCase())
);
}
// Pagination
const startIndex = (page - 1) * limit;
const endIndex = startIndex + limit;
const paginatedUsers = filteredUsers.slice(startIndex, endIndex);
// Send response
call.write({
users: paginatedUsers,
total: filteredUsers.length
});
call.end();
}
// Client Streaming RPC: Batch Create Users
async batchCreateUsers(call, callback) {
const createdUsers = [];
call.on('data', (request) => {
try {
const { name, email, age, roles } = request;
if (!name || !email) {
return call.emit('error', {
code: grpc.status.INVALID_ARGUMENT,
message: 'Name and email are required'
});
}
const user = {
id: (userIdCounter++).toString(),
name,
email,
age: age || 0,
active: true,
created_at: new Date().toISOString(),
roles: roles || ['user'],
metadata: {}
};
usersDatabase.set(user.id, user);
createdUsers.push(user);
} catch (error) {
call.emit('error', {
code: grpc.status.INTERNAL,
message: error.message
});
}
});
call.on('end', () => {
callback(null, {
user: createdUsers[createdUsers.length - 1] || null,
message: `Batch created ${createdUsers.length} users successfully`
});
});
}
// Bidirectional Streaming RPC: Stream Users
streamUsers(call) {
call.on('data', (request) => {
const { user_id } = request;
const user = usersDatabase.get(user_id);
if (user) {
call.write({ user });
} else {
call.emit('error', {
code: grpc.status.NOT_FOUND,
message: `User ${user_id} not found`
});
}
});
call.on('end', () => {
call.end();
});
}
}
// Authentication service implementation
class AuthServiceImpl {
login(call, callback) {
const { username, password } = call.request;
// Simple authentication logic (in production, use proper hashing and database)
if (username === 'admin' && password === 'password') {
const accessToken = 'mock-access-token-' + Date.now();
const refreshToken = 'mock-refresh-token-' + Date.now();
callback(null, {
access_token: accessToken,
refresh_token: refreshToken,
expires_in: 3600
});
} else {
callback({
code: grpc.status.UNAUTHENTICATED,
message: 'Invalid credentials'
});
}
}
validateToken(call, callback) {
const { token } = call.request;
// Simple token validation (in production, use proper JWT validation)
if (token && token.startsWith('mock-access-token-')) {
callback(null, {
valid: true,
user_id: 'admin',
roles: ['admin', 'user']
});
} else {
callback({
code: grpc.status.UNAUTHENTICATED,
message: 'Invalid token'
});
}
}
refreshToken(call, callback) {
const { token } = call.request;
if (token && token.startsWith('mock-refresh-token-')) {
const newAccessToken = 'mock-access-token-' + Date.now();
callback(null, {
valid: true,
user_id: 'admin',
roles: ['admin', 'user']
});
} else {
callback({
code: grpc.status.UNAUTHENTICATED,
message: 'Invalid refresh token'
});
}
}
}
// Health check service implementation
class HealthCheckImpl {
check(call, callback) {
callback(null, {
status: 1 // SERVING
});
}
watch(call) {
const interval = setInterval(() => {
call.write({
status: 1 // SERVING
});
}, 1000);
call.on('end', () => {
clearInterval(interval);
call.end();
});
}
}
// Create gRPC server
function createServer() {
const server = new grpc.Server();
// Add services
server.addService(userProto.UserService.service, new UserServiceImpl());
server.addService(userProto.AuthService.service, new AuthServiceImpl());
server.addService(userProto.HealthCheck.service, new HealthCheckImpl());
return server;
}
// Server configuration
const server = createServer();
const PORT = process.env.GRPC_PORT || 50051;
const HOST = process.env.GRPC_HOST || '0.0.0.0';
// SSL/TLS configuration (optional)
const credentials = grpc.ServerCredentials.createInsecure();
// For SSL:
// const credentials = grpc.ServerCredentials.createSsl(
// fs.readFileSync(path.join(__dirname, 'ca.crt')),
// [{
// cert_chain: fs.readFileSync(path.join(__dirname, 'server.crt')),
// private_key: fs.readFileSync(path.join(__dirname, 'server.key'))
// }],
// false
// );
// Start server
server.bindAsync(`${HOST}:${PORT}`, credentials, (err, port) => {
if (err) {
console.error('Failed to start server:', err);
return;
}
console.log(`gRPC server listening on ${HOST}:${PORT}`);
server.start();
});
// Graceful shutdown
process.on('SIGINT', () => {
console.log('Shutting down gRPC server...');
server.tryShutdown((err) => {
if (err) {
console.error('Error during shutdown:', err);
process.exit(1);
}
console.log('Server stopped successfully');
process.exit(0);
});
});
// Force shutdown after timeout
setTimeout(() => {
console.log('Forcing server shutdown...');
server.forceShutdown();
process.exit(0);
}, 10000);
module.exports = { createServer, UserServiceImpl, AuthServiceImpl, HealthCheckImpl };
💻 Implémentation Client gRPC javascript
🟡 intermediate
Implémentation de client gRPC avec Node.js
// gRPC Client Implementation in Node.js
const grpc = require('@grpc/grpc-js');
const path = require('path');
// Load Protocol Buffer definitions
const PROTO_PATH = path.join(__dirname, 'user.proto');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const userProto = grpc.loadPackageDefinition(packageDefinition).userservice;
// Client configuration
const GRPC_HOST = process.env.GRPC_HOST || 'localhost';
const GRPC_PORT = process.env.GRPC_PORT || 50051;
const SERVER_ADDRESS = `${GRPC_HOST}:${GRPC_PORT}`;
// SSL/TLS configuration (optional)
const credentials = grpc.credentials.createInsecure();
// For SSL:
// const credentials = grpc.credentials.createSsl(
// fs.readFileSync(path.join(__dirname, 'ca.crt')),
// fs.readFileSync(path.join(__dirname, 'client.key')),
// fs.readFileSync(path.join(__dirname, 'client.crt'))
// );
// Create client instances
const userClient = new userProto.UserService(
SERVER_ADDRESS,
credentials
);
const authClient = new userProto.AuthService(
SERVER_ADDRESS,
credentials
);
const healthClient = new userProto.HealthCheck(
SERVER_ADDRESS,
credentials
);
// Client utility class
class GrpcClient {
constructor() {
this.userClient = userClient;
this.authClient = authClient;
this.healthClient = healthClient;
}
// Health check method
async checkHealth() {
return new Promise((resolve, reject) => {
this.healthClient.check({}, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// Authentication methods
async login(username, password) {
return new Promise((resolve, reject) => {
const request = { username, password };
this.authClient.login(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
async validateToken(token) {
return new Promise((resolve, reject) => {
const request = { token };
this.authClient.validateToken(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// User service methods
async createUser(userData) {
return new Promise((resolve, reject) => {
const request = {
name: userData.name,
email: userData.email,
age: userData.age || 0,
roles: userData.roles || ['user']
};
this.userClient.createUser(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
async getUser(userId) {
return new Promise((resolve, reject) => {
const request = { user_id: userId };
this.userClient.getUser(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
async updateUser(userId, userData) {
return new Promise((resolve, reject) => {
const request = {
user_id: userId,
user: userData
};
this.userClient.updateUser(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
async deleteUser(userId) {
return new Promise((resolve, reject) => {
const request = { user_id: userId };
this.userClient.deleteUser(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// Server streaming method
async listUsers(page = 1, limit = 10, filter = '') {
return new Promise((resolve, reject) => {
const request = { page, limit, filter };
const call = this.userClient.listUsers(request);
const users = [];
let total = 0;
call.on('data', (response) => {
users.push(...response.users);
total = response.total;
});
call.on('end', () => {
resolve({
users,
total,
page,
limit
});
});
call.on('error', (error) => {
reject(error);
});
});
}
// Client streaming method
async batchCreateUsers(usersData) {
return new Promise((resolve, reject) => {
const call = this.userClient.batchCreateUsers();
// Send all user data
usersData.forEach(userData => {
const request = {
name: userData.name,
email: userData.email,
age: userData.age || 0,
roles: userData.roles || ['user']
};
call.write(request);
});
// End the stream
call.end();
// Handle response
call.on('data', (response) => {
resolve(response);
});
call.on('error', (error) => {
reject(error);
});
});
}
// Bidirectional streaming method
async streamUsers(userIds) {
return new Promise((resolve, reject) => {
const call = this.userClient.streamUsers();
const users = [];
call.on('data', (response) => {
users.push(response.user);
});
call.on('end', () => {
resolve(users);
});
call.on('error', (error) => {
reject(error);
});
// Send user IDs to stream
userIds.forEach(userId => {
call.write({ user_id: userId });
});
// End the stream
call.end();
});
}
}
// Example usage
async function demonstrateClient() {
const client = new GrpcClient();
try {
// Health check
console.log('Checking server health...');
const health = await client.checkHealth();
console.log('Health status:', health.status === 1 ? 'SERVING' : 'NOT_SERVING');
// Authentication
console.log('\nAuthenticating...');
const authResponse = await client.login('admin', 'password');
console.log('Login successful:', authResponse);
// Validate token
const tokenValidation = await client.validateToken(authResponse.access_token);
console.log('Token valid:', tokenValidation.valid);
// Create user
console.log('\nCreating user...');
const userResponse = await client.createUser({
name: 'John Doe',
email: '[email protected]',
age: 30,
roles: ['user', 'developer']
});
console.log('Created user:', userResponse.user);
const userId = userResponse.user.id;
// Get user
console.log('\nGetting user...');
const getUserResponse = await client.getUser(userId);
console.log('Retrieved user:', getUserResponse.user);
// Update user
console.log('\nUpdating user...');
const updateResponse = await client.updateUser(userId, {
name: 'John Smith',
age: 31
});
console.log('Updated user:', updateResponse.user);
// List users
console.log('\nListing users...');
const listResponse = await client.listUsers(1, 10, '');
console.log(`Found ${listResponse.total} users:`, listResponse.users);
// Batch create users
console.log('\nBatch creating users...');
const batchResponse = await client.batchCreateUsers([
{ name: 'Alice Johnson', email: '[email protected]', age: 25 },
{ name: 'Bob Wilson', email: '[email protected]', age: 28 },
{ name: 'Charlie Brown', email: '[email protected]', age: 32 }
]);
console.log('Batch creation response:', batchResponse.message);
// Stream users
console.log('\nStreaming users...');
const streamResponse = await client.streamUsers([userId, '999', '888']);
console.log('Streamed users:', streamResponse);
// Delete user
console.log('\nDeleting user...');
const deleteResponse = await client.deleteUser(userId);
console.log('Delete response:', deleteResponse);
} catch (error) {
console.error('Error:', error);
if (error.code) {
console.error('gRPC Error Code:', error.code);
console.error('gRPC Error Message:', error.message);
}
}
}
// Error handling utility
function handleGrpcError(error) {
if (error.code === undefined) {
return 'Network error or connection failed';
}
const errorMessages = {
[grpc.status.OK]: 'OK',
[grpc.status.CANCELLED]: 'Operation cancelled',
[grpc.status.UNKNOWN]: 'Unknown error',
[grpc.status.INVALID_ARGUMENT]: 'Invalid argument',
[grpc.status.DEADLINE_EXCEEDED]: 'Deadline exceeded',
[grpc.status.NOT_FOUND]: 'Not found',
[grpc.status.ALREADY_EXISTS]: 'Already exists',
[grpc.status.PERMISSION_DENIED]: 'Permission denied',
[grpc.status.UNAUTHENTICATED]: 'Unauthenticated',
[grpc.status.RESOURCE_EXHAUSTED]: 'Resource exhausted',
[grpc.status.FAILED_PRECONDITION]: 'Failed precondition',
[grpc.status.ABORTED]: 'Aborted',
[grpc.status.OUT_OF_RANGE]: 'Out of range',
[grpc.status.UNIMPLEMENTED]: 'Unimplemented',
[grpc.status.INTERNAL]: 'Internal error',
[grpc.status.UNAVAILABLE]: 'Unavailable',
[grpc.status.DATA_LOSS]: 'Data loss',
[grpc.status.DO_NOT_USE]: 'Do not use'
};
return errorMessages[error.code] || `Unknown gRPC error: ${error.code}`;
}
// Export client class and utility functions
module.exports = {
GrpcClient,
handleGrpcError,
demonstrateClient
};
// Run demonstration if this file is executed directly
if (require.main === module) {
demonstrateClient().catch(console.error);
}
💻 Réflexion de Service gRPC
🟡 intermediate
Utilisation de la réflexion de service pour les requêtes dynamiques
// gRPC Server Reflection Implementation
const grpc = require('@grpc/grpc-js');
const fs = require('fs');
const path = require('path');
// Server reflection service implementation
class ServerReflectionService {
constructor(server, protoFiles = []) {
this.server = server;
this.services = new Map();
this.fileDescriptors = new Map();
this.symbolTable = new Map();
// Load proto files
this.loadProtoFiles(protoFiles);
// Register reflection service
this.registerReflectionService();
}
loadProtoFiles(protoFiles) {
protoFiles.forEach(filePath => {
try {
const protoContent = fs.readFileSync(filePath, 'utf8');
const fileName = path.basename(filePath);
// Parse proto file (simplified - in production use proper parser)
const descriptor = this.parseProtoFile(protoContent, fileName);
this.fileDescriptors.set(fileName, descriptor);
// Extract service information
this.extractServices(descriptor, fileName);
console.log(`Loaded proto file: ${fileName}`);
} catch (error) {
console.error(`Failed to load proto file ${filePath}:`, error.message);
}
});
}
parseProtoFile(content, fileName) {
// Simplified proto file parsing
// In production, use @grpc/proto-loader or protobufjs
const descriptor = {
name: fileName,
package: '',
messages: [],
services: [],
enums: [],
dependencies: []
};
const lines = content.split('\n');
let currentService = null;
let currentMessage = null;
lines.forEach(line => {
line = line.trim();
if (line.startsWith('package ')) {
descriptor.package = line.replace('package ', '').replace(';', '');
} else if (line.startsWith('service ')) {
const serviceName = line.match(/service\s+(\w+)/);
if (serviceName) {
currentService = {
name: serviceName[1],
methods: []
};
descriptor.services.push(currentService);
}
} else if (line.startsWith('rpc ') && currentService) {
const methodMatch = line.match(/rpc\s+(\w+)\s*\(([^)]+)\)\s*returns\s*\(([^)]+)\)/);
if (methodMatch) {
currentService.methods.push({
name: methodMatch[1],
inputType: methodMatch[2].trim(),
outputType: methodMatch[3].trim(),
clientStreaming: line.includes('stream ') && line.indexOf('stream ') < line.indexOf('rpc ') + 4,
serverStreaming: line.includes('stream ') && line.indexOf('stream ') > line.indexOf('returns ')
});
}
} else if (line.startsWith('message ')) {
const messageName = line.match(/message\s+(\w+)/);
if (messageName) {
currentMessage = {
name: messageName[1],
fields: []
};
descriptor.messages.push(currentMessage);
}
} else if (currentMessage && line.includes('=')) {
const fieldMatch = line.match(/(\w+)\s+(\w+)\s*=\s+(\d+)/);
if (fieldMatch) {
currentMessage.fields.push({
name: fieldMatch[2],
type: fieldMatch[1],
number: parseInt(fieldMatch[3])
});
}
}
});
return descriptor;
}
extractServices(descriptor, fileName) {
descriptor.services.forEach(service => {
const fullName = descriptor.package ? `${descriptor.package}.${service.name}` : service.name;
this.services.set(fullName, {
name: service.name,
fullName: fullName,
file: fileName,
package: descriptor.package,
methods: service.methods.map(method => ({
name: method.name,
inputType: method.inputType,
outputType: method.outputType,
clientStreaming: method.clientStreaming,
serverStreaming: method.serverStreaming
}))
});
// Add to symbol table
this.symbolTable.set(fullName, 'SERVICE');
service.methods.forEach(method => {
const methodFullName = `${fullName}.${method.name}`;
this.symbolTable.set(methodFullName, 'METHOD');
});
});
descriptor.messages.forEach(message => {
const fullName = descriptor.package ? `${descriptor.package}.${message.name}` : message.name;
this.symbolTable.set(fullName, 'MESSAGE');
message.fields.forEach(field => {
const fieldFullName = `${fullName}.${field.name}`;
this.symbolTable.set(fieldFullName, 'FIELD');
});
});
}
registerReflectionService() {
const reflectionProto = this.createReflectionProto();
this.server.addService(reflectionProto.ServerReflection.service, {
serverReflectionInfo: this.handleServerReflectionInfo.bind(this)
});
}
createReflectionProto() {
// Simplified reflection proto definition
return {
ServerReflection: {
service: {
serverReflectionInfo: {
path: '/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo',
requestStream: true,
responseStream: true
}
}
}
};
}
async handleServerReflectionInfo(call) {
call.on('data', (request) => {
try {
const response = this.processReflectionRequest(request);
call.write(response);
} catch (error) {
call.write({
errorResponse: {
errorCode: grpc.status.INTERNAL,
errorMessage: error.message
}
});
}
});
call.on('end', () => {
call.end();
});
}
processReflectionRequest(request) {
if (request.listServices) {
return this.handleListServices(request.listServices);
} else if (request.fileByFilename) {
return this.handleFileByFilename(request.fileByFilename);
} else if (request.fileContainingSymbol) {
return this.handleFileContainingSymbol(request.fileContainingSymbol);
} else if (request.allExtensionNumbersOfType) {
return this.handleAllExtensionNumbers(request.allExtensionNumbersOfType);
} else if (request.listExtensionNumbers) {
return this.handleListExtensionNumbers(request.listExtensionNumbers);
} else {
throw new Error('Unknown reflection request type');
}
}
handleListServices(request) {
const services = [];
// Get services from the server
if (this.server && this.server.services) {
Object.keys(this.server.services).forEach(servicePath => {
const serviceName = servicePath.split('/').pop();
if (!serviceName.includes('ServerReflection')) {
services.push({ name: serviceName });
}
});
}
return {
listServicesResponse: {
service: services
}
};
}
handleFileByFilename(request) {
const fileName = request.filename;
const descriptor = this.fileDescriptors.get(fileName);
if (!descriptor) {
return {
errorResponse: {
errorCode: grpc.status.NOT_FOUND,
errorMessage: `File not found: ${fileName}`
}
};
}
const fileDescriptorProto = this.serializeDescriptor(descriptor);
return {
fileDescriptorResponse: {
fileDescriptorProto: [fileDescriptorProto]
}
};
}
handleFileContainingSymbol(request) {
const symbol = request.symbol;
const fileName = this.findFileForSymbol(symbol);
if (!fileName) {
return {
errorResponse: {
errorCode: grpc.status.NOT_FOUND,
errorMessage: `Symbol not found: ${symbol}`
}
};
}
const descriptor = this.fileDescriptors.get(fileName);
const fileDescriptorProto = this.serializeDescriptor(descriptor);
return {
fileDescriptorResponse: {
fileDescriptorProto: [fileDescriptorProto]
}
};
}
handleAllExtensionNumbers(request) {
// Simplified implementation
return {
allExtensionNumbersResponse: {
baseTypeName: request.extending_type,
extensionNumber: []
}
};
}
handleListExtensionNumbers(request) {
// Simplified implementation
return {
extensionNumberResponse: {
baseTypeName: request.containing_type,
extensionNumber: []
}
};
}
findFileForSymbol(symbol) {
for (const [fileName, descriptor] of this.fileDescriptors) {
const fullName = descriptor.package ? `${descriptor.package}.${symbol}` : symbol;
// Check if symbol exists in this file
if (descriptor.services.some(s => s.name === symbol) ||
descriptor.messages.some(m => m.name === symbol)) {
return fileName;
}
}
return null;
}
serializeDescriptor(descriptor) {
// Simplified serialization - in production use proper protobuf serialization
const serialized = JSON.stringify({
name: descriptor.name,
package: descriptor.package,
dependency: descriptor.dependencies,
messageType: descriptor.messages.map(msg => ({
name: msg.name,
field: msg.fields.map(field => ({
name: field.name,
number: field.number,
label: 'LABEL_OPTIONAL',
type: this.getProtobufType(field.type)
}))
})),
serviceType: descriptor.services.map(svc => ({
name: svc.name,
method: svc.methods.map(method => ({
name: method.name,
inputType: method.inputType,
outputType: method.outputType,
clientStreaming: method.clientStreaming,
serverStreaming: method.serverStreaming
}))
}))
});
return Buffer.from(serialized);
}
getProtobufType(type) {
const typeMap = {
'string': 'TYPE_STRING',
'int32': 'TYPE_INT32',
'int64': 'TYPE_INT64',
'bool': 'TYPE_BOOL',
'double': 'TYPE_DOUBLE',
'float': 'TYPE_FLOAT'
};
return typeMap[type] || 'TYPE_STRING';
}
}
// Reflection client for service discovery
class ReflectionClient {
constructor(address, credentials) {
this.address = address;
this.credentials = credentials;
this.cache = new Map();
}
async listServices() {
const cacheKey = 'listServices';
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey);
}
const response = await this.makeReflectionRequest({
listServices: {}
});
const services = response.listServicesResponse.service.map(s => s.name);
this.cache.set(cacheKey, services);
return services;
}
async getServiceDescriptor(serviceName) {
const cacheKey = `service_${serviceName}`;
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey);
}
const response = await this.makeReflectionRequest({
fileContainingSymbol: { symbol: serviceName }
});
if (!response.fileDescriptorResponse || !response.fileDescriptorResponse.fileDescriptorProto.length) {
throw new Error(`Service descriptor not found for ${serviceName}`);
}
const descriptor = this.deserializeDescriptor(response.fileDescriptorResponse.fileDescriptorProto[0]);
this.cache.set(cacheKey, descriptor);
return descriptor;
}
async getAllServiceDescriptors() {
const services = await this.listServices();
const descriptors = {};
for (const serviceName of services) {
try {
descriptors[serviceName] = await this.getServiceDescriptor(serviceName);
} catch (error) {
console.warn(`Failed to get descriptor for ${serviceName}:`, error.message);
}
}
return descriptors;
}
async makeReflectionRequest(request) {
return new Promise((resolve, reject) => {
const client = this.createReflectionClient();
const call = client.serverReflectionInfo();
call.write(request);
let response = null;
let error = null;
call.on('data', (data) => {
response = data;
});
call.on('end', () => {
if (error) {
reject(error);
} else if (response) {
resolve(response);
} else {
reject(new Error('No response received'));
}
});
call.on('error', (err) => {
error = err;
});
call.end();
});
}
createReflectionClient() {
// Simplified client creation
return {
serverReflectionInfo: () => ({
write: () => {},
on: () => {},
end: () => {}
})
};
}
deserializeDescriptor(buffer) {
// Simplified deserialization - in production use proper protobuf deserialization
return JSON.parse(buffer.toString());
}
clearCache() {
this.cache.clear();
}
}
// Service discovery utilities
class ServiceDiscovery {
constructor(reflectionClient) {
this.reflectionClient = reflectionClient;
this.serviceRegistry = new Map();
}
async discoverServices() {
console.log('Discovering services...');
const services = await this.reflectionClient.listServices();
console.log(`Found ${services.length} services:`, services);
for (const serviceName of services) {
try {
await this.discoverService(serviceName);
} catch (error) {
console.error(`Failed to discover ${serviceName}:`, error.message);
}
}
console.log('Service discovery completed');
return this.serviceRegistry;
}
async discoverService(serviceName) {
console.log(`Discovering service: ${serviceName}`);
const descriptor = await this.reflectionClient.getServiceDescriptor(serviceName);
const serviceInfo = {
name: serviceName,
package: descriptor.package,
methods: descriptor.serviceType[0].method.map(method => ({
name: method.name,
inputType: method.inputType,
outputType: method.outputType,
clientStreaming: method.clientStreaming,
serverStreaming: method.serverStreaming,
path: `/${descriptor.package ? descriptor.package + '.' : ''}${serviceName}/${method.name}`
})),
messages: descriptor.messageType.map(msg => ({
name: msg.name,
fields: msg.field.map(field => ({
name: field.name,
type: field.type,
number: field.number
}))
}))
};
this.serviceRegistry.set(serviceName, serviceInfo);
console.log(`Service ${serviceName} discovered with ${serviceInfo.methods.length} methods`);
return serviceInfo;
}
getService(serviceName) {
return this.serviceRegistry.get(serviceName);
}
getAllServices() {
return Array.from(this.serviceRegistry.values());
}
findMethod(methodName) {
for (const [serviceName, serviceInfo] of this.serviceRegistry) {
const method = serviceInfo.methods.find(m => m.name === methodName);
if (method) {
return {
service: serviceName,
method: method
};
}
}
return null;
}
generateClientCode(serviceName, language = 'javascript') {
const serviceInfo = this.serviceRegistry.get(serviceName);
if (!serviceInfo) {
throw new Error(`Service ${serviceName} not found`);
}
if (language === 'javascript') {
return this.generateJavascriptClient(serviceInfo);
} else if (language === 'typescript') {
return this.generateTypeScriptClient(serviceInfo);
} else {
throw new Error(`Language ${language} not supported`);
}
}
generateJavascriptClient(serviceInfo) {
let code = `// Auto-generated gRPC client for ${serviceInfo.name}\n`;
code += `const grpc = require('@grpc/grpc-js');\n\n`;
// Client class
code += `class ${serviceInfo.name}Client {\n`;
code += ` constructor(address, credentials) {\n`;
code += ` this.client = new grpc.Client(address, credentials);\n`;
code += ` }\n\n`;
// Generate methods
serviceInfo.methods.forEach(method => {
const methodName = method.name.charAt(0).toLowerCase() + method.name.slice(1);
if (method.clientStreaming && method.serverStreaming) {
// Bidirectional streaming
code += ` ${methodName}() {\n`;
code += ` return this.client.makeBidiStreamRequest(\n`;
code += ` '${method.path}'\n`;
code += ` );\n`;
code += ` }\n\n`;
} else if (method.clientStreaming) {
// Client streaming
code += ` ${methodName}(callback) {\n`;
code += ` return this.client.makeClientStreamRequest(\n`;
code += ` '${method.path}', callback\n`;
code += ` );\n`;
code += ` }\n\n`;
} else if (method.serverStreaming) {
// Server streaming
code += ` ${methodName}(request) {\n`;
code += ` return this.client.makeServerStreamRequest(\n`;
code += ` '${method.path}', request\n`;
code += ` );\n`;
code += ` }\n\n`;
} else {
// Unary
code += ` ${methodName}(request, callback) {\n`;
code += ` return this.client.makeUnaryRequest(\n`;
code += ` '${method.path}', request, callback\n`;
code += ` );\n`;
code += ` }\n\n`;
}
});
code += `}\n\n`;
code += `module.exports = ${serviceInfo.name}Client;\n`;
return code;
}
generateTypeScriptClient(serviceInfo) {
// Similar to JavaScript but with TypeScript types
return this.generateJavascriptClient(serviceInfo).replace('javascript', 'typescript');
}
}
module.exports = {
ServerReflectionService,
ReflectionClient,
ServiceDiscovery
};
💻 Health Checking gRPC
🟡 intermediate
Implémentation du protocole standard de health checking
// gRPC Health Checking Implementation
const grpc = require('@grpc/grpc-js');
// Health checking status enum
const HealthCheckResponseServingStatus = {
UNKNOWN: 0,
SERVING: 1,
NOT_SERVING: 2,
SERVICE_UNKNOWN: 3
};
// Health check implementation
class HealthCheckService {
constructor(options = {}) {
this.statuses = new Map();
this.defaultStatus = options.defaultStatus || HealthCheckResponseServingStatus.SERVING;
this.checkInterval = options.checkInterval || 30000;
this.detailedLogging = options.detailedLogging || false;
this.watchers = new Map();
// Set default service status
this.setStatus('', this.defaultStatus);
}
// Set health status for a service
setStatus(service, status) {
const oldStatus = this.statuses.get(service);
this.statuses.set(service, status);
if (this.detailedLogging) {
console.log(`Health status changed for '${service}': ${this.getStatusString(oldStatus)} -> ${this.getStatusString(status)}`);
}
// Notify all watchers for this service
this.notifyWatchers(service, status);
}
// Get health status for a service
getStatus(service) {
return this.statuses.get(service) || HealthCheckResponseServingStatus.SERVICE_UNKNOWN;
}
// Clear status for a service
clearStatus(service) {
this.statuses.delete(service);
this.notifyWatchers(service, HealthCheckResponseServingStatus.SERVICE_UNKNOWN);
}
// Get all service statuses
getAllStatuses() {
const result = {};
this.statuses.forEach((status, service) => {
result[service] = {
status: status,
statusString: this.getStatusString(status)
};
});
return result;
}
// gRPC health check method
check(call, callback) {
const service = call.request.service || '';
const status = this.getStatus(service);
if (this.detailedLogging) {
console.log(`Health check requested for service: '${service}', status: ${this.getStatusString(status)}`);
}
callback(null, {
status: status
});
}
// gRPC health watch method (server streaming)
watch(call) {
const service = call.request.service || '';
const watcherId = this.generateWatcherId();
if (this.detailedLogging) {
console.log(`Health watch started for service: '${service}', watcher: ${watcherId}`);
}
// Store watcher
if (!this.watchers.has(service)) {
this.watchers.set(service, new Map());
}
this.watchers.get(service).set(watcherId, call);
// Send current status immediately
const currentStatus = this.getStatus(service);
call.write({
status: currentStatus
});
// Handle watcher cleanup
const cleanup = () => {
if (this.watchers.has(service)) {
this.watchers.get(service).delete(watcherId);
if (this.watchers.get(service).size === 0) {
this.watchers.delete(service);
}
}
if (this.detailedLogging) {
console.log(`Health watch ended for service: '${service}', watcher: ${watcherId}`);
}
};
call.on('end', cleanup);
call.on('error', cleanup);
call.on('cancel', cleanup);
}
// Notify all watchers of a service status change
notifyWatchers(service, status) {
if (!this.watchers.has(service)) {
return;
}
const watchers = this.watchers.get(service);
const message = { status: status };
watchers.forEach((call, watcherId) => {
try {
call.write(message);
} catch (error) {
console.error(`Failed to notify watcher ${watcherId} for service ${service}:`, error.message);
watchers.delete(watcherId);
}
});
}
// Generate unique watcher ID
generateWatcherId() {
return 'watcher-' + Math.random().toString(36).substr(2, 9);
}
// Convert status to string
getStatusString(status) {
const statusStrings = {
[HealthCheckResponseServingStatus.UNKNOWN]: 'UNKNOWN',
[HealthCheckResponseServingStatus.SERVING]: 'SERVING',
[HealthCheckResponseServingStatus.NOT_SERVING]: 'NOT_SERVING',
[HealthCheckResponseServingStatus.SERVICE_UNKNOWN]: 'SERVICE_UNKNOWN'
};
return statusStrings[status] || 'UNKNOWN';
}
// Get watcher statistics
getWatcherStats() {
const stats = {
totalWatchers: 0,
serviceWatchers: {}
};
this.watchers.forEach((watchers, service) => {
stats.totalWatchers += watchers.size;
stats.serviceWatchers[service] = watchers.size;
});
return stats;
}
}
// Advanced health check with dependencies
class AdvancedHealthCheckService extends HealthCheckService {
constructor(options = {}) {
super(options);
this.dependencies = new Map();
this.checkers = new Map();
this.checkResults = new Map();
this.checkTimeout = options.checkTimeout || 5000;
this.failureThreshold = options.failureThreshold || 3;
this.recoveryThreshold = options.recoveryThreshold || 2;
}
// Add dependency for a service
addDependency(service, dependency, checker) {
if (!this.dependencies.has(service)) {
this.dependencies.set(service, new Set());
}
this.dependencies.get(service).add(dependency);
this.checkers.set(dependency, checker);
// Start monitoring the dependency
this.startDependencyMonitoring(dependency, checker);
}
// Start monitoring a dependency
async startDependencyMonitoring(dependency, checker) {
const checkDependency = async () => {
try {
const startTime = Date.now();
const result = await Promise.race([
checker(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Health check timeout')), this.checkTimeout)
)
]);
const responseTime = Date.now() - startTime;
this.updateCheckResult(dependency, {
healthy: true,
lastCheck: new Date(),
responseTime: responseTime,
message: result.message || 'OK',
details: result.details || {}
});
} catch (error) {
this.updateCheckResult(dependency, {
healthy: false,
lastCheck: new Date(),
error: error.message,
lastError: error
});
}
};
// Run check immediately
checkDependency();
// Schedule periodic checks
setInterval(checkDependency, this.checkInterval);
}
// Update check result for a dependency
updateCheckResult(dependency, result) {
const previousResult = this.checkResults.get(dependency) || {
consecutiveFailures: 0,
consecutiveSuccesses: 0
};
const newResult = {
...previousResult,
...result,
consecutiveFailures: result.healthy ? 0 : previousResult.consecutiveFailures + 1,
consecutiveSuccesses: result.healthy ? previousResult.consecutiveSuccesses + 1 : 0
};
this.checkResults.set(dependency, newResult);
// Determine if dependency should be considered unhealthy
const isUnhealthy = newResult.consecutiveFailures >= this.failureThreshold;
const isRecovered = newResult.consecutiveSuccesses >= this.recoveryThreshold;
// Update service status based on dependency health
this.updateServiceStatusBasedOnDependencies();
}
// Update service status based on all dependencies
updateServiceStatusBasedOnDependencies() {
for (const [service, deps] of this.dependencies) {
let allHealthy = true;
let anyUnhealthy = false;
for (const dependency of deps) {
const result = this.checkResults.get(dependency);
if (!result) {
allHealthy = false;
} else if (result.consecutiveFailures >= this.failureThreshold) {
allHealthy = false;
anyUnhealthy = true;
}
}
let newStatus;
if (allHealthy) {
newStatus = HealthCheckResponseServingStatus.SERVING;
} else if (anyUnhealthy) {
newStatus = HealthCheckResponseServingStatus.NOT_SERVING;
} else {
newStatus = HealthCheckResponseServingStatus.UNKNOWN;
}
const currentStatus = this.getStatus(service);
if (currentStatus !== newStatus) {
this.setStatus(service, newStatus);
}
}
}
// Get detailed health status
getDetailedStatus(service) {
const status = this.getStatus(service);
const dependencies = this.dependencies.get(service) || new Set();
const dependencyStatuses = {};
for (const dependency of dependencies) {
const result = this.checkResults.get(dependency);
dependencyStatuses[dependency] = {
healthy: result ? result.consecutiveFailures < this.failureThreshold : false,
lastCheck: result ? result.lastCheck : null,
responseTime: result ? result.responseTime : null,
error: result && result.error ? result.error : null,
consecutiveFailures: result ? result.consecutiveFailures : 0,
consecutiveSuccesses: result ? result.consecutiveSuccesses : 0
};
}
return {
service: service,
status: status,
statusString: this.getStatusString(status),
dependencies: dependencyStatuses,
allDependenciesHealthy: Object.values(dependencyStatuses).every(dep => dep.healthy),
lastUpdated: new Date()
};
}
// Get all detailed statuses
getAllDetailedStatuses() {
const result = {};
// Include services with dependencies
for (const service of this.dependencies.keys()) {
result[service] = this.getDetailedStatus(service);
}
// Include services without dependencies
for (const service of this.statuses.keys()) {
if (!result[service]) {
result[service] = {
service: service,
status: this.getStatus(service),
statusString: this.getStatusString(this.getStatus(service)),
dependencies: {},
allDependenciesHealthy: true,
lastUpdated: new Date()
};
}
}
return result;
}
}
// Health check client for monitoring services
class HealthCheckClient {
constructor(address, credentials = grpc.credentials.createInsecure()) {
this.address = address;
this.credentials = credentials;
this.client = this.createHealthClient();
}
createHealthClient() {
// Create health check client
// In production, load the actual health proto
return {
check: (request, callback) => {
// Simplified implementation
setTimeout(() => {
callback(null, { status: HealthCheckResponseServingStatus.SERVING });
}, 100);
},
watch: (request) => {
// Simplified streaming implementation
const call = {
write: () => {},
on: (event, handler) => {
if (event === 'data') {
// Simulate periodic health updates
setInterval(() => {
handler({ status: HealthCheckResponseServingStatus.SERVING });
}, 5000);
}
}
};
return call;
}
};
}
// Check health of a specific service
async checkHealth(service = '') {
return new Promise((resolve, reject) => {
const request = { service: service };
this.client.check(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve({
service: service,
status: response.status,
statusString: this.getStatusString(response.status),
timestamp: new Date()
});
}
});
});
}
// Watch health status changes
watchHealth(service = '', callback) {
const request = { service: service };
const call = this.client.watch(request);
call.on('data', (response) => {
callback(null, {
service: service,
status: response.status,
statusString: this.getStatusString(response.status),
timestamp: new Date()
});
});
call.on('error', (error) => {
callback(error);
});
return call;
}
// Check health with timeout
async checkHealthWithTimeout(service = '', timeoutMs = 5000) {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Health check timeout after ${timeoutMs}ms`));
}, timeoutMs);
this.checkHealth(service)
.then(result => {
clearTimeout(timeout);
resolve(result);
})
.catch(error => {
clearTimeout(timeout);
reject(error);
});
});
}
// Check health with retries
async checkHealthWithRetry(service = '', maxRetries = 3, retryDelay = 1000) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await this.checkHealth(service);
} catch (error) {
lastError = error;
if (attempt < maxRetries) {
console.warn(`Health check attempt ${attempt} failed for ${service}, retrying in ${retryDelay}ms:`, error.message);
await this.sleep(retryDelay);
}
}
}
throw lastError;
}
// Batch health check for multiple services
async checkMultipleServices(services) {
const promises = services.map(service =>
this.checkHealth(service).catch(error => ({
service: service,
error: error.message,
status: HealthCheckResponseServingStatus.SERVICE_UNKNOWN,
statusString: 'SERVICE_UNKNOWN',
timestamp: new Date()
}))
);
const results = await Promise.allSettled(promises);
return results.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
} else {
return {
service: services[index],
error: result.reason.message,
status: HealthCheckResponseServingStatus.SERVICE_UNKNOWN,
statusString: 'SERVICE_UNKNOWN',
timestamp: new Date()
};
}
});
}
getStatusString(status) {
const statusStrings = {
[HealthCheckResponseServingStatus.UNKNOWN]: 'UNKNOWN',
[HealthCheckResponseServingStatus.SERVING]: 'SERVING',
[HealthCheckResponseServingStatus.NOT_SERVING]: 'NOT_SERVING',
[HealthCheckResponseServingStatus.SERVICE_UNKNOWN]: 'SERVICE_UNKNOWN'
};
return statusStrings[status] || 'UNKNOWN';
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Health check utilities
class HealthCheckUtils {
// Create database health checker
static createDatabaseHealthChecker(dbConnection) {
return async () => {
try {
await dbConnection.raw('SELECT 1');
return { message: 'Database connection healthy' };
} catch (error) {
throw new Error(`Database connection failed: ${error.message}`);
}
};
}
// Create Redis health checker
static createRedisHealthChecker(redisClient) {
return async () => {
try {
await redisClient.ping();
return { message: 'Redis connection healthy' };
} catch (error) {
throw new Error(`Redis connection failed: ${error.message}`);
}
};
}
// Create HTTP endpoint health checker
static createHttpHealthChecker(url, expectedStatus = 200) {
return async () => {
try {
const response = await fetch(url, {
method: 'GET',
timeout: 5000
});
if (response.status === expectedStatus) {
return {
message: `HTTP endpoint healthy (status: ${response.status})`,
details: { statusCode: response.status }
};
} else {
throw new Error(`HTTP endpoint returned status ${response.status}, expected ${expectedStatus}`);
}
} catch (error) {
throw new Error(`HTTP endpoint check failed: ${error.message}`);
}
};
}
// Create custom health checker
static createCustomHealthChecker(checkFunction) {
return async () => {
try {
const result = await checkFunction();
return {
message: 'Custom health check passed',
details: result
};
} catch (error) {
throw new Error(`Custom health check failed: ${error.message}`);
}
};
}
// Create composite health checker (checks multiple dependencies)
static createCompositeHealthChecker(checkers) {
return async () => {
const results = {};
let allHealthy = true;
for (const [name, checker] of Object.entries(checkers)) {
try {
const result = await checker();
results[name] = {
healthy: true,
result: result
};
} catch (error) {
results[name] = {
healthy: false,
error: error.message
};
allHealthy = false;
}
}
return {
message: allHealthy ? 'All dependencies healthy' : 'Some dependencies unhealthy',
details: results,
allHealthy: allHealthy
};
};
}
}
module.exports = {
HealthCheckResponseServingStatus,
HealthCheckService,
AdvancedHealthCheckService,
HealthCheckClient,
HealthCheckUtils
};
💻 Streaming gRPC javascript
🔴 complex
Implémentation de communication streaming client, serveur et bidirectionnelle
// Advanced gRPC Streaming Implementation
const grpc = require('@grpc/grpc-js');
const { EventEmitter } = require('events');
// Custom streaming implementation with backpressure and flow control
class GrpcStreamManager extends EventEmitter {
constructor() {
super();
this.activeStreams = new Map();
this.streamMetrics = {
totalStreams: 0,
activeCount: 0,
messagesSent: 0,
messagesReceived: 0
};
}
// Create a managed stream with metrics
createManagedStream(stream, streamId, type) {
const managedStream = {
stream,
streamId,
type, // 'client', 'server', 'bidirectional'
startTime: Date.now(),
messageCount: 0,
byteCount: 0,
isActive: true
};
this.activeStreams.set(streamId, managedStream);
this.streamMetrics.totalStreams++;
this.streamMetrics.activeCount++;
// Add event listeners for metrics
const originalWrite = stream.write.bind(stream);
stream.write = (message) => {
managedStream.messageCount++;
managedStream.byteCount += JSON.stringify(message).length;
this.streamMetrics.messagesSent++;
this.emit('messageSent', {
streamId,
type,
message: managedStream.messageCount,
bytes: managedStream.byteCount
});
return originalWrite(message);
};
// Clean up when stream ends
stream.on('end', () => {
managedStream.isActive = false;
this.streamMetrics.activeCount--;
const duration = Date.now() - managedStream.startTime;
this.emit('streamEnded', {
streamId,
type,
duration,
messageCount: managedStream.messageCount,
byteCount: managedStream.byteCount
});
this.activeStreams.delete(streamId);
});
stream.on('error', (error) => {
managedStream.isActive = false;
this.streamMetrics.activeCount--;
this.emit('streamError', {
streamId,
type,
error: error.message,
messageCount: managedStream.messageCount
});
this.activeStreams.delete(streamId);
});
return stream;
}
getMetrics() {
return {
...this.streamMetrics,
activeStreams: this.activeStreams.size,
streams: Array.from(this.activeStreams.values())
};
}
}
// Server Streaming Example: Real-time Notifications
class NotificationService {
constructor(streamManager) {
this.streamManager = streamManager;
this.subscribers = new Map();
}
// Subscribe to notifications (server streaming)
subscribeToNotifications(call, metadata) {
const userId = metadata.get('user-id') || 'anonymous';
const streamId = `notification-${userId}-${Date.now()}`;
console.log(`User ${userId} subscribed to notifications`);
const managedCall = this.streamManager.createManagedStream(
call, streamId, 'server'
);
// Store subscription
this.subscribers.set(userId, managedCall);
// Send initial subscription confirmation
call.write({
id: 'subscription-confirmation',
type: 'SUBSCRIPTION',
timestamp: new Date().toISOString(),
userId: userId,
message: 'Successfully subscribed to notifications'
});
// Periodic notifications
const interval = setInterval(() => {
if (this.subscribers.has(userId)) {
call.write({
id: `notification-${Date.now()}`,
type: 'INFO',
timestamp: new Date().toISOString(),
userId: userId,
message: `Periodic update for user ${userId}`,
data: {
serverTime: new Date().toISOString(),
randomValue: Math.random()
}
});
}
}, 5000);
// Cleanup on stream end
call.on('end', () => {
clearInterval(interval);
this.subscribers.delete(userId);
console.log(`User ${userId} unsubscribed from notifications`);
});
call.on('error', (error) => {
clearInterval(interval);
this.subscribers.delete(userId);
console.error(`Notification stream error for user ${userId}:`, error.message);
});
}
// Broadcast notification to all subscribers
broadcastNotification(notification) {
const message = {
id: `broadcast-${Date.now()}`,
type: 'BROADCAST',
timestamp: new Date().toISOString(),
message: notification.message,
data: notification.data || {}
};
this.subscribers.forEach((subscription, userId) => {
if (subscription.stream.isActive) {
try {
subscription.stream.write({ ...message, userId });
} catch (error) {
console.error(`Failed to send broadcast to user ${userId}:`, error.message);
this.subscribers.delete(userId);
}
}
});
}
}
// Client Streaming Example: Data Upload with Progress
class DataUploadService {
constructor(streamManager) {
this.streamManager = streamManager;
this.uploads = new Map();
}
// Handle data upload stream (client streaming)
handleDataUpload(call, callback) {
const uploadId = `upload-${Date.now()}`;
const streamId = `upload-${uploadId}`;
console.log(`Starting data upload: ${uploadId}`);
const uploadData = {
id: uploadId,
startTime: Date.now(),
chunks: [],
totalSize: 0,
receivedSize: 0
};
this.uploads.set(uploadId, uploadData);
const managedCall = this.streamManager.createManagedStream(
call, streamId, 'client'
);
// Handle incoming chunks
call.on('data', (chunk) => {
uploadData.chunks.push(chunk);
uploadData.receivedSize += chunk.size || 0;
uploadData.totalSize = chunk.totalSize || uploadData.totalSize;
const progress = uploadData.totalSize > 0
? (uploadData.receivedSize / uploadData.totalSize) * 100
: 0;
console.log(`Upload ${uploadId}: ${progress.toFixed(2)}% complete`);
// Send progress update
call.write({
type: 'PROGRESS',
uploadId,
progress: progress,
receivedSize: uploadData.receivedSize,
totalSize: uploadData.totalSize,
timestamp: new Date().toISOString()
});
});
// Handle stream completion
call.on('end', () => {
const duration = Date.now() - uploadData.startTime;
const averageSpeed = uploadData.receivedSize / (duration / 1000); // bytes/sec
console.log(`Upload ${uploadId} completed. Duration: ${duration}ms, Speed: ${averageSpeed.toFixed(2)} bytes/sec`);
callback(null, {
success: true,
uploadId,
totalChunks: uploadData.chunks.length,
totalSize: uploadData.receivedSize,
duration: duration,
averageSpeed: averageSpeed
});
this.uploads.delete(uploadId);
});
// Handle stream errors
call.on('error', (error) => {
console.error(`Upload ${uploadId} failed:`, error.message);
this.uploads.delete(uploadId);
callback({
code: grpc.status.INTERNAL,
message: `Upload failed: ${error.message}`
});
});
}
getUploadStatus(uploadId) {
return this.uploads.get(uploadId);
}
}
// Bidirectional Streaming Example: Real-time Chat
class ChatService {
constructor(streamManager) {
this.streamManager = streamManager;
this.rooms = new Map();
this.users = new Map();
}
// Handle chat room (bidirectional streaming)
joinChatRoom(call) {
let currentRoom = null;
let currentUser = null;
const streamId = `chat-${Date.now()}`;
const managedCall = this.streamManager.createManagedStream(
call, streamId, 'bidirectional'
);
// Handle incoming messages
call.on('data', (message) => {
const timestamp = new Date().toISOString();
switch (message.type) {
case 'JOIN':
currentUser = {
id: message.userId,
name: message.userName || `User-${message.userId}`,
joinTime: timestamp
};
currentRoom = message.roomId || 'default';
// Join room
if (!this.rooms.has(currentRoom)) {
this.rooms.set(currentRoom, new Set());
}
this.rooms.get(currentRoom).add(managedCall);
this.users.set(currentUser.id, currentUser);
console.log(`User ${currentUser.name} joined room ${currentRoom}`);
// Send join confirmation
call.write({
type: 'SYSTEM',
roomId: currentRoom,
timestamp,
message: `Welcome to room ${currentRoom}, ${currentUser.name}!`,
userCount: this.rooms.get(currentRoom).size
});
// Notify other users
this.broadcastToRoom(currentRoom, {
type: 'SYSTEM',
roomId: currentRoom,
timestamp,
message: `${currentUser.name} joined the room`
}, managedCall);
break;
case 'MESSAGE':
if (currentRoom && currentUser) {
const chatMessage = {
id: `msg-${Date.now()}`,
type: 'MESSAGE',
roomId: currentRoom,
userId: currentUser.id,
userName: currentUser.name,
content: message.content,
timestamp
};
// Broadcast to all users in the room
this.broadcastToRoom(currentRoom, chatMessage);
}
break;
case 'LEAVE':
if (currentRoom && currentUser) {
this.leaveRoom(currentRoom, currentUser, managedCall);
}
break;
case 'TYPING':
if (currentRoom && currentUser) {
this.broadcastToRoom(currentRoom, {
type: 'TYPING',
roomId: currentRoom,
userId: currentUser.id,
userName: currentUser.name,
timestamp
}, managedCall);
}
break;
}
});
// Handle stream end
call.on('end', () => {
if (currentRoom && currentUser) {
this.leaveRoom(currentRoom, currentUser, managedCall);
}
});
call.on('error', (error) => {
console.error(`Chat stream error:`, error.message);
if (currentRoom && currentUser) {
this.leaveRoom(currentRoom, currentUser, managedCall);
}
});
}
// Broadcast message to room
broadcastToRoom(roomId, message, excludeCall = null) {
const room = this.rooms.get(roomId);
if (!room) return;
room.forEach(stream => {
if (stream !== excludeCall && stream.stream.isActive) {
try {
stream.stream.write(message);
} catch (error) {
console.error(`Failed to broadcast to room ${roomId}:`, error.message);
room.delete(stream);
}
}
});
}
// Leave room
leaveRoom(roomId, user, stream) {
const room = this.rooms.get(roomId);
if (room) {
room.delete(stream);
console.log(`User ${user.name} left room ${roomId}`);
// Notify other users
this.broadcastToRoom(roomId, {
type: 'SYSTEM',
roomId,
timestamp: new Date().toISOString(),
message: `${user.name} left the room`,
userCount: room.size
});
// Clean up empty rooms
if (room.size === 0) {
this.rooms.delete(roomId);
console.log(`Room ${roomId} deleted (empty)`);
}
}
this.users.delete(user.id);
}
getRoomStatus(roomId) {
const room = this.rooms.get(roomId);
return {
roomId,
userCount: room ? room.size : 0,
exists: !!room
};
}
getAllRooms() {
const rooms = {};
this.rooms.forEach((users, roomId) => {
rooms[roomId] = users.size;
});
return rooms;
}
}
// Streaming utility functions
class StreamingUtils {
// Apply backpressure to slow down streaming
static async applyBackpressure(stream, messages, delay = 100) {
for (const message of messages) {
if (stream.destroyed || stream.closed) {
break;
}
const ready = stream.write(message);
if (!ready) {
console.log('Backpressure applied, waiting for drain...');
await new Promise(resolve => {
stream.once('drain', resolve);
});
}
// Add delay between messages
if (delay > 0) {
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// Create a retry mechanism for streaming operations
static async retryStreamingOperation(operation, maxRetries = 3, delay = 1000) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
console.warn(`Streaming operation attempt ${attempt} failed:`, error.message);
if (attempt < maxRetries) {
await new Promise(resolve => setTimeout(resolve, delay * attempt));
}
}
}
throw lastError;
}
// Compress large messages
static compressMessage(message) {
const messageStr = JSON.stringify(message);
if (messageStr.length > 1024 * 1024) { // 1MB threshold
// In production, use proper compression like gzip
return {
...message,
compressed: true,
originalSize: messageStr.length
};
}
return message;
}
// Batch multiple messages
static batchMessages(messages, batchSize = 10) {
const batches = [];
for (let i = 0; i < messages.length; i += batchSize) {
batches.push(messages.slice(i, i + batchSize));
}
return batches;
}
}
module.exports = {
GrpcStreamManager,
NotificationService,
DataUploadService,
ChatService,
StreamingUtils
};
💻 Intercepteurs gRPC javascript
🔴 complex
Implémentation d'intercepteurs d'authentification, logging et monitoring
// gRPC Interceptors Implementation
const grpc = require('@grpc/grpc-js');
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
// Base interceptor class
class BaseInterceptor {
constructor(options = {}) {
this.options = options;
}
// Interceptor method to be implemented by subclasses
intercept(call, next) {
throw new Error('intercept method must be implemented');
}
}
// Authentication Interceptor
class AuthInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.jwtSecret = options.jwtSecret || 'default-secret';
this.excludePaths = options.excludePaths || [];
this.apiKeyHeader = options.apiKeyHeader || 'x-api-key';
this.apiKeys = new Set(options.apiKeys || []);
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
// Skip authentication for excluded paths
if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
return next();
}
const metadata = call.metadata.getMap();
// Try JWT authentication
const authorization = metadata['authorization'];
if (authorization) {
const token = authorization[0].replace('Bearer ', '');
try {
const decoded = jwt.verify(token, this.jwtSecret);
call.call.user = {
id: decoded.userId,
email: decoded.email,
roles: decoded.roles || []
};
return next();
} catch (error) {
console.error('JWT verification failed:', error.message);
}
}
// Try API key authentication
const apiKey = metadata[this.apiKeyHeader];
if (apiKey && this.apiKeys.has(apiKey[0])) {
call.call.user = {
id: 'api-key-user',
roles: ['api']
};
return next();
}
// Authentication failed
throw {
code: grpc.status.UNAUTHENTICATED,
message: 'Authentication required'
};
}
}
// Logging Interceptor
class LoggingInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.logLevel = options.logLevel || 'info';
this.logBody = options.logBody || false;
this.excludePaths = options.excludePaths || [];
this.logger = options.logger || console;
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
const startTime = Date.now();
// Skip logging for excluded paths
if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
return next();
}
// Log request
const requestId = crypto.randomUUID();
const logData = {
requestId,
method: path,
type: 'request',
timestamp: new Date().toISOString(),
metadata: this.sanitizeMetadata(call.metadata.getMap())
};
if (this.logBody && call.request) {
logData.body = this.sanitizeBody(call.request);
}
this.logger.log(this.logLevel, `gRPC Request: ${JSON.stringify(logData)}`);
// Intercept response
const originalCallback = call.call.sendMetadata.bind(call.call);
call.call.sendMetadata = function(response, callback) {
const duration = Date.now() - startTime;
const responseLogData = {
requestId,
method: path,
type: 'response',
duration: duration,
timestamp: new Date().toISOString()
};
if (call.interceptorResponse) {
responseLogData.status = call.interceptorResponse.code || grpc.status.OK;
responseLogData.message = call.interceptorResponse.message || 'Success';
}
logger.log(logLevel, `gRPC Response: ${JSON.stringify(responseLogData)}`);
return originalCallback(response, callback);
};
// Handle streaming calls
if (call.call.write && call.call.read) {
this.interceptStreamingCall(call, requestId, path, startTime);
}
return next().catch(error => {
const duration = Date.now() - startTime;
const errorLogData = {
requestId,
method: path,
type: 'error',
duration: duration,
timestamp: new Date().toISOString(),
error: {
code: error.code,
message: error.message,
details: error.details
}
};
this.logger.error(`gRPC Error: ${JSON.stringify(errorLogData)}`);
throw error;
});
}
interceptStreamingCall(call, requestId, path, startTime) {
const originalWrite = call.call.write.bind(call.call);
const originalRead = call.call.read.bind(call.call);
let messagesWritten = 0;
let messagesRead = 0;
call.call.write = function(message, callback) {
messagesWritten++;
if (this.logBody) {
this.logger.log(this.logLevel, `gRPC Stream Write [${requestId}]: ${JSON.stringify({
requestId,
method: path,
messageType: 'write',
messageCount: messagesWritten,
body: this.sanitizeBody(message)
})}`);
}
return originalWrite(message, callback);
}.bind(this);
call.call.read = function(callback) {
return originalRead((error, response) => {
if (response) {
messagesRead++;
if (this.logBody) {
this.logger.log(this.logLevel, `gRPC Stream Read [${requestId}]: ${JSON.stringify({
requestId,
method: path,
messageType: 'read',
messageCount: messagesRead,
body: this.sanitizeBody(response)
})}`);
}
}
callback(error, response);
});
}.bind(this);
}
sanitizeMetadata(metadata) {
const sanitized = {};
Object.keys(metadata).forEach(key => {
if (key.toLowerCase().includes('authorization') ||
key.toLowerCase().includes('token') ||
key.toLowerCase().includes('password') ||
key.toLowerCase().includes('secret')) {
sanitized[key] = '[REDACTED]';
} else {
sanitized[key] = metadata[key];
}
});
return sanitized;
}
sanitizeBody(body) {
if (!body || typeof body !== 'object') {
return body;
}
const sanitized = JSON.parse(JSON.stringify(body));
// Remove sensitive fields
const sensitiveKeys = ['password', 'token', 'secret', 'key', 'creditCard'];
const removeSensitiveKeys = (obj) => {
if (typeof obj !== 'object' || obj === null) {
return;
}
Object.keys(obj).forEach(key => {
if (sensitiveKeys.some(sensitive => key.toLowerCase().includes(sensitive.toLowerCase()))) {
obj[key] = '[REDACTED]';
} else if (typeof obj[key] === 'object') {
removeSensitiveKeys(obj[key]);
}
});
};
removeSensitiveKeys(sanitized);
return sanitized;
}
}
// Rate Limiting Interceptor
class RateLimitInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.windowMs = options.windowMs || 60000; // 1 minute
this.maxRequests = options.maxRequests || 100;
this.requests = new Map();
this.excludePaths = options.excludePaths || [];
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
// Skip rate limiting for excluded paths
if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
return next();
}
const clientId = this.getClientId(call);
const now = Date.now();
const windowStart = now - this.windowMs;
// Clean old requests
if (!this.requests.has(clientId)) {
this.requests.set(clientId, []);
}
const clientRequests = this.requests.get(clientId);
// Remove requests outside the current window
const validRequests = clientRequests.filter(timestamp => timestamp > windowStart);
this.requests.set(clientId, validRequests);
// Check if rate limit exceeded
if (validRequests.length >= this.maxRequests) {
const headers = {
'X-RateLimit-Limit': this.maxRequests.toString(),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': new Date(now + this.windowMs).toISOString()
};
throw {
code: grpc.status.RESOURCE_EXHAUSTED,
message: `Rate limit exceeded. Maximum ${this.maxRequests} requests per ${this.windowMs}ms allowed.`,
metadata: headers
};
}
// Add current request timestamp
validRequests.push(now);
const remainingRequests = this.maxRequests - validRequests.length;
const headers = {
'X-RateLimit-Limit': this.maxRequests.toString(),
'X-RateLimit-Remaining': remainingRequests.toString(),
'X-RateLimit-Reset': new Date(now + this.windowMs).toISOString()
};
// Add headers to response
const originalCallback = call.call.sendMetadata.bind(call.call);
call.call.sendMetadata = function(response, callback) {
// Merge rate limit headers into response metadata
if (response) {
Object.keys(headers).forEach(key => {
response[key] = [headers[key]];
});
}
return originalCallback(response, callback);
};
return next();
}
getClientId(call) {
// Try to get client ID from user
if (call.call.user && call.call.user.id) {
return `user:${call.call.user.id}`;
}
// Try to get client ID from IP
const metadata = call.metadata.getMap();
const xForwardedFor = metadata['x-forwarded-for'];
const xRealIp = metadata['x-real-ip'];
const ip = (xForwardedFor && xForwardedFor[0]) ||
(xRealIp && xRealIp[0]) ||
'unknown';
return `ip:${ip}`;
}
}
// Metrics Interceptor
class MetricsInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.metrics = {
totalRequests: 0,
totalErrors: 0,
methodMetrics: new Map(),
responseTime: []
};
this.startTime = Date.now();
this.reportInterval = options.reportInterval || 60000;
if (options.autoReport !== false) {
this.startReporting();
}
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
const startTime = Date.now();
this.metrics.totalRequests++;
// Track method-specific metrics
if (!this.metrics.methodMetrics.has(path)) {
this.metrics.methodMetrics.set(path, {
count: 0,
errors: 0,
totalResponseTime: 0
});
}
const methodMetrics = this.metrics.methodMetrics.get(path);
methodMetrics.count++;
return next().then(response => {
const duration = Date.now() - startTime;
// Update metrics
methodMetrics.totalResponseTime += duration;
this.metrics.responseTime.push(duration);
// Keep only last 1000 response times
if (this.metrics.responseTime.length > 1000) {
this.metrics.responseTime = this.metrics.responseTime.slice(-1000);
}
return response;
}).catch(error => {
const duration = Date.now() - startTime;
// Update error metrics
this.metrics.totalErrors++;
methodMetrics.errors++;
methodMetrics.totalResponseTime += duration;
throw error;
});
}
startReporting() {
setInterval(() => {
const report = this.generateReport();
console.log('gRPC Metrics Report:', JSON.stringify(report, null, 2));
}, this.reportInterval);
}
generateReport() {
const now = Date.now();
const uptime = now - this.startTime;
// Calculate average response time
const avgResponseTime = this.metrics.responseTime.length > 0
? this.metrics.responseTime.reduce((a, b) => a + b, 0) / this.metrics.responseTime.length
: 0;
// Calculate percentiles
const sortedResponseTimes = [...this.metrics.responseTime].sort((a, b) => a - b);
const p50 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.5)] || 0;
const p95 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.95)] || 0;
const p99 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.99)] || 0;
// Calculate method statistics
const methodStats = {};
this.metrics.methodMetrics.forEach((metrics, method) => {
methodStats[method] = {
...metrics,
avgResponseTime: metrics.count > 0 ? metrics.totalResponseTime / metrics.count : 0,
errorRate: metrics.count > 0 ? (metrics.errors / metrics.count) * 100 : 0
};
});
return {
uptime: uptime,
totalRequests: this.metrics.totalRequests,
totalErrors: this.metrics.totalErrors,
errorRate: this.metrics.totalRequests > 0
? (this.metrics.totalErrors / this.metrics.totalRequests) * 100
: 0,
avgResponseTime: Math.round(avgResponseTime),
p50ResponseTime: p50,
p95ResponseTime: p95,
p99ResponseTime: p99,
methodStats: methodStats,
timestamp: new Date().toISOString()
};
}
getMetrics() {
return this.generateReport();
}
}
// Circuit Breaker Interceptor
class CircuitBreakerInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.failureThreshold = options.failureThreshold || 5;
this.recoveryTimeout = options.recoveryTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.excludePaths = options.excludePaths || [];
this.circuitStates = new Map();
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
// Skip circuit breaking for excluded paths
if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
return next();
}
const circuitState = this.getCircuitState(path);
const now = Date.now();
// Check if circuit is open
if (circuitState.state === 'OPEN') {
if (now - circuitState.lastFailureTime > this.recoveryTimeout) {
// Try to close circuit (half-open state)
circuitState.state = 'HALF_OPEN';
circuitState.requestCount = 0;
} else {
// Circuit is still open
throw {
code: grpc.status.UNAVAILABLE,
message: 'Service temporarily unavailable (circuit breaker open)'
};
}
}
// Execute request and update circuit state
return next().then(response => {
// Success - reset circuit
this.recordSuccess(path);
return response;
}).catch(error => {
// Failure - update circuit state
this.recordFailure(path);
throw error;
});
}
getCircuitState(path) {
if (!this.circuitStates.has(path)) {
this.circuitStates.set(path, {
state: 'CLOSED', // CLOSED, OPEN, HALF_OPEN
failureCount: 0,
requestCount: 0,
lastFailureTime: 0
});
}
return this.circuitStates.get(path);
}
recordSuccess(path) {
const circuitState = this.getCircuitState(path);
if (circuitState.state === 'HALF_OPEN') {
// Close circuit after successful request in half-open state
circuitState.state = 'CLOSED';
circuitState.failureCount = 0;
} else {
// Reset failure count in closed state
circuitState.failureCount = 0;
}
}
recordFailure(path) {
const circuitState = this.getCircuitState(path);
const now = Date.now();
circuitState.failureCount++;
circuitState.lastFailureTime = now;
if (circuitState.state === 'HALF_OPEN') {
// Open circuit again if failure in half-open state
circuitState.state = 'OPEN';
} else if (circuitState.failureCount >= this.failureThreshold) {
// Open circuit if failure threshold exceeded
circuitState.state = 'OPEN';
}
}
getCircuitStates() {
const states = {};
this.circuitStates.forEach((state, path) => {
states[path] = { ...state };
});
return states;
}
}
// Interceptor factory
class InterceptorFactory {
static createAuth(options) {
return new AuthInterceptor(options);
}
static createLogging(options) {
return new LoggingInterceptor(options);
}
static createRateLimit(options) {
return new RateLimitInterceptor(options);
}
static createMetrics(options) {
return new MetricsInterceptor(options);
}
static createCircuitBreaker(options) {
return new CircuitBreakerInterceptor(options);
}
// Create chained interceptor
static chain(...interceptors) {
return {
intercept: (call, next) => {
let pipeline = next;
// Apply interceptors in reverse order (last interceptor runs first)
for (let i = interceptors.length - 1; i >= 0; i--) {
const interceptor = interceptors[i];
const currentPipeline = pipeline;
pipeline = () => interceptor.intercept(call, currentPipeline);
}
return pipeline();
}
};
}
}
module.exports = {
BaseInterceptor,
AuthInterceptor,
LoggingInterceptor,
RateLimitInterceptor,
MetricsInterceptor,
CircuitBreakerInterceptor,
InterceptorFactory
};
💻 Gestion des Erreurs gRPC javascript
🔴 complex
Gestion des statuts d'erreur et exceptions gRPC
// gRPC Error Handling Implementation
const grpc = require('@grpc/grpc-js');
// Custom gRPC error class
class GrpcError extends Error {
constructor(code, message, details = null, metadata = {}) {
super(message);
this.name = 'GrpcError';
this.code = code;
this.details = details;
this.metadata = metadata;
this.timestamp = new Date().toISOString();
this.requestId = this.generateRequestId();
}
generateRequestId() {
return 'req-' + Math.random().toString(36).substr(2, 9);
}
toGrpcError() {
return {
code: this.code,
message: this.message,
details: this.details || this.message,
metadata: this.metadata
};
}
}
// Validation error
class ValidationError extends GrpcError {
constructor(field, value, constraint) {
super(
grpc.status.INVALID_ARGUMENT,
`Validation failed for field ${field}`,
`Field '${field}' with value '${value}' violates constraint: ${constraint}`,
{
field: field,
value: String(value),
constraint: constraint,
errorType: 'VALIDATION_ERROR'
}
);
this.field = field;
this.value = value;
this.constraint = constraint;
}
}
// Business logic error
class BusinessError extends GrpcError {
constructor(message, businessCode, details = null) {
super(
grpc.status.FAILED_PRECONDITION,
message,
details,
{
businessCode: businessCode,
errorType: 'BUSINESS_ERROR'
}
);
this.businessCode = businessCode;
}
}
// Resource not found error
class NotFoundError extends GrpcError {
constructor(resourceType, resourceId) {
super(
grpc.status.NOT_FOUND,
`${resourceType} not found`,
`${resourceType} with ID ${resourceId} does not exist`,
{
resourceType: resourceType,
resourceId: String(resourceId),
errorType: 'NOT_FOUND_ERROR'
}
);
this.resourceType = resourceType;
this.resourceId = resourceId;
}
}
// Permission denied error
class PermissionError extends GrpcError {
constructor(action, resource, userId) {
super(
grpc.status.PERMISSION_DENIED,
`Permission denied for ${action}`,
`User ${userId} does not have permission to perform ${action} on ${resource}`,
{
action: action,
resource: resource,
userId: String(userId),
errorType: 'PERMISSION_ERROR'
}
);
this.action = action;
this.resource = resource;
this.userId = userId;
}
}
// Resource conflict error
class ConflictError extends GrpcError {
constructor(resourceType, conflictReason) {
super(
grpc.status.ALREADY_EXISTS,
`Resource conflict`,
`${resourceType} conflict: ${conflictReason}`,
{
resourceType: resourceType,
conflictReason: conflictReason,
errorType: 'CONFLICT_ERROR'
}
);
this.resourceType = resourceType;
this.conflictReason = conflictReason;
}
}
// Rate limit error
class RateLimitError extends GrpcError {
constructor(limit, windowMs, retryAfter) {
super(
grpc.status.RESOURCE_EXHAUSTED,
'Rate limit exceeded',
`Rate limit of ${limit} requests per ${windowMs}ms exceeded`,
{
limit: String(limit),
windowMs: String(windowMs),
retryAfter: String(retryAfter),
errorType: 'RATE_LIMIT_ERROR'
}
);
this.limit = limit;
this.windowMs = windowMs;
this.retryAfter = retryAfter;
}
}
// Service unavailable error
class ServiceUnavailableError extends GrpcError {
constructor(serviceName, retryAfter = null) {
super(
grpc.status.UNAVAILABLE,
`Service ${serviceName} is currently unavailable`,
`The ${serviceName} service is temporarily unavailable. Please try again later.`,
{
serviceName: serviceName,
errorType: 'SERVICE_UNAVAILABLE_ERROR'
}
);
this.serviceName = serviceName;
if (retryAfter) {
this.metadata.retryAfter = String(retryAfter);
}
}
}
// Timeout error
class TimeoutError extends GrpcError {
constructor(operation, timeoutMs) {
super(
grpc.status.DEADLINE_EXCEEDED,
`Operation ${operation} timed out`,
`Operation ${operation} did not complete within ${timeoutMs}ms`,
{
operation: operation,
timeoutMs: String(timeoutMs),
errorType: 'TIMEOUT_ERROR'
}
);
this.operation = operation;
this.timeoutMs = timeoutMs;
}
}
// Error handler class
class ErrorHandler {
constructor(options = {}) {
this.includeStackTrace = options.includeStackTrace || false;
this.logErrors = options.logErrors || true;
this.errorLogger = options.errorLogger || console.error;
this.customHandlers = new Map();
}
// Register custom error handler
registerHandler(errorType, handler) {
this.customHandlers.set(errorType, handler);
}
// Handle and convert errors
handleError(error, context = {}) {
const enhancedError = this.enhanceError(error, context);
if (this.logErrors) {
this.logError(enhancedError, context);
}
// Try custom handlers first
if (enhancedError.metadata && enhancedError.metadata.errorType) {
const customHandler = this.customHandlers.get(enhancedError.metadata.errorType);
if (customHandler) {
return customHandler(enhancedError, context);
}
}
// Default handling
return enhancedError.toGrpcError();
}
enhanceError(error, context) {
// If it's already a GrpcError, just enhance context
if (error instanceof GrpcError) {
error.metadata = {
...error.metadata,
context: context
};
return error;
}
// Convert common JavaScript errors to gRPC errors
if (error.name === 'ValidationError' || error.message.includes('validation')) {
return new GrpcError(
grpc.status.INVALID_ARGUMENT,
'Validation failed',
error.message,
{ originalError: error.name, context }
);
}
if (error.name === 'CastError' || error.message.includes('Cast to')) {
return new GrpcError(
grpc.status.INVALID_ARGUMENT,
'Invalid input format',
error.message,
{ originalError: error.name, context }
);
}
if (error.name === 'MongoError' && error.code === 11000) {
return new GrpcError(
grpc.status.ALREADY_EXISTS,
'Resource already exists',
'Duplicate key violation',
{ originalError: error.name, mongoCode: error.code, context }
);
}
if (error.name === 'TokenExpiredError') {
return new GrpcError(
grpc.status.UNAUTHENTICATED,
'Token expired',
'Authentication token has expired',
{ originalError: error.name, context }
);
}
if (error.name === 'JsonWebTokenError') {
return new GrpcError(
grpc.status.UNAUTHENTICATED,
'Invalid token',
'Authentication token is invalid',
{ originalError: error.name, context }
);
}
// Database connection errors
if (error.message.includes('ECONNREFUSED') || error.message.includes('connection')) {
return new ServiceUnavailableError('Database');
}
// Timeout errors
if (error.message.includes('timeout') || error.code === 'TIMEOUT') {
return new TimeoutError(context.operation || 'unknown', context.timeout || 30000);
}
// Default internal error
return new GrpcError(
grpc.status.INTERNAL,
'Internal server error',
this.includeStackTrace ? error.stack : error.message,
{
originalError: error.name,
originalMessage: error.message,
context,
stackTrace: this.includeStackTrace ? error.stack : undefined
}
);
}
logError(error, context) {
const logData = {
requestId: error.requestId,
timestamp: error.timestamp,
code: error.code,
message: error.message,
details: error.details,
metadata: error.metadata,
context: context
};
if (this.includeStackTrace && error.stack) {
logData.stackTrace = error.stack;
}
this.errorLogger('gRPC Error:', JSON.stringify(logData, null, 2));
}
}
// Retry mechanism for failed operations
class RetryHandler {
constructor(options = {}) {
this.maxRetries = options.maxRetries || 3;
this.baseDelay = options.baseDelay || 1000;
this.maxDelay = options.maxDelay || 30000;
this.backoffMultiplier = options.backoffMultiplier || 2;
this.retryableErrors = options.retryableErrors || new Set([
grpc.status.UNAVAILABLE,
grpc.status.DEADLINE_EXCEEDED,
grpc.status.RESOURCE_EXHAUSTED
]);
}
async executeWithRetry(operation, context = {}) {
let lastError;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
// Check if error is retryable
if (!this.isRetryableError(error) || attempt === this.maxRetries) {
throw error;
}
// Calculate delay for next attempt
const delay = this.calculateDelay(attempt);
console.warn(`Attempt ${attempt} failed, retrying in ${delay}ms:`, error.message);
// Wait before retry
await this.sleep(delay);
}
}
throw lastError;
}
isRetryableError(error) {
const errorCode = error.code || error.status;
return this.retryableErrors.has(errorCode);
}
calculateDelay(attempt) {
const delay = this.baseDelay * Math.pow(this.backoffMultiplier, attempt - 1);
const jitter = delay * 0.1 * Math.random(); // Add 10% jitter
return Math.min(delay + jitter, this.maxDelay);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Circuit breaker pattern for error handling
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.recoveryTimeout = options.recoveryTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.successCount = 0;
this.lastFailureTime = 0;
this.requestCount = 0;
}
async execute(operation) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
this.state = 'HALF_OPEN';
this.successCount = 0;
} else {
throw new ServiceUnavailableError('Circuit breaker is open', Math.ceil((this.recoveryTimeout - (Date.now() - this.lastFailureTime)) / 1000));
}
}
try {
const result = await operation();
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
recordSuccess() {
this.failureCount = 0;
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= 3) { // Require 3 consecutive successes to close circuit
this.state = 'CLOSED';
console.log('Circuit breaker closed after successful recovery');
}
}
}
recordFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
console.log(`Circuit breaker opened after ${this.failureCount} failures`);
}
}
getState() {
return {
state: this.state,
failureCount: this.failureCount,
successCount: this.successCount,
lastFailureTime: this.lastFailureTime
};
}
}
// Error recovery strategies
class ErrorRecovery {
constructor() {
this.recoveryStrategies = new Map();
this.setupDefaultStrategies();
}
setupDefaultStrategies() {
// Validation error recovery - provide helpful suggestions
this.recoveryStrategies.set('VALIDATION_ERROR', (error, context) => {
return {
handled: true,
suggestion: `Please check the ${error.field} field. Expected: ${error.constraint}`,
correctedValue: this.suggestCorrection(error.value, error.constraint)
};
});
// Rate limit error recovery - suggest retry with backoff
this.recoveryStrategies.set('RATE_LIMIT_ERROR', (error, context) => {
return {
handled: true,
suggestion: `Rate limit exceeded. Retry after ${error.retryAfter} seconds`,
retryAfter: parseInt(error.retryAfter),
backoffStrategy: 'exponential'
};
});
// Service unavailable recovery - suggest alternative endpoints
this.recoveryStrategies.set('SERVICE_UNAVAILABLE_ERROR', (error, context) => {
return {
handled: true,
suggestion: `Service ${error.serviceName} is unavailable. Please try again later`,
alternativeEndpoints: this.getAlternativeEndpoints(error.serviceName),
fallbackAvailable: true
};
});
}
suggestCorrection(value, constraint) {
// Simple correction suggestions
if (constraint.includes('email') && !value.includes('@')) {
return `${value}@example.com`;
}
if (constraint.includes('positive') && isNaN(value)) {
return '0';
}
return null;
}
getAlternativeEndpoints(serviceName) {
const alternatives = {
'user-service': ['user-service-backup', 'user-service-v2'],
'order-service': ['order-service-cluster', 'legacy-order-service'],
'payment-service': ['payment-service-primary', 'payment-service-secondary']
};
return alternatives[serviceName] || [];
}
registerStrategy(errorType, strategy) {
this.recoveryStrategies.set(errorType, strategy);
}
attemptRecovery(error, context) {
if (error.metadata && error.metadata.errorType) {
const strategy = this.recoveryStrategies.get(error.metadata.errorType);
if (strategy) {
return strategy(error, context);
}
}
return {
handled: false,
suggestion: 'No recovery strategy available for this error type'
};
}
}
// Utility functions for error handling
const ErrorUtils = {
// Convert gRPC status code to string
getStatusString: (code) => {
const statusStrings = {
[grpc.status.OK]: 'OK',
[grpc.status.CANCELLED]: 'CANCELLED',
[grpc.status.UNKNOWN]: 'UNKNOWN',
[grpc.status.INVALID_ARGUMENT]: 'INVALID_ARGUMENT',
[grpc.status.DEADLINE_EXCEEDED]: 'DEADLINE_EXCEEDED',
[grpc.status.NOT_FOUND]: 'NOT_FOUND',
[grpc.status.ALREADY_EXISTS]: 'ALREADY_EXISTS',
[grpc.status.PERMISSION_DENIED]: 'PERMISSION_DENIED',
[grpc.status.UNAUTHENTICATED]: 'UNAUTHENTICATED',
[grpc.status.RESOURCE_EXHAUSTED]: 'RESOURCE_EXHAUSTED',
[grpc.status.FAILED_PRECONDITION]: 'FAILED_PRECONDITION',
[grpc.status.ABORTED]: 'ABORTED',
[grpc.status.OUT_OF_RANGE]: 'OUT_OF_RANGE',
[grpc.status.UNIMPLEMENTED]: 'UNIMPLEMENTED',
[grpc.status.INTERNAL]: 'INTERNAL',
[grpc.status.UNAVAILABLE]: 'UNAVAILABLE',
[grpc.status.DATA_LOSS]: 'DATA_LOSS'
};
return statusStrings[code] || `UNKNOWN_STATUS_${code}`;
},
// Check if error is retryable
isRetryableError: (error) => {
const retryableCodes = new Set([
grpc.status.UNAVAILABLE,
grpc.status.DEADLINE_EXCEEDED,
grpc.status.RESOURCE_EXHAUSTED,
grpc.status.ABORTED
]);
return retryableCodes.has(error.code || error.status);
},
// Extract error information for logging
extractErrorInfo: (error, context = {}) => {
return {
code: error.code || error.status,
message: error.message,
details: error.details,
requestId: error.requestId,
timestamp: error.timestamp || new Date().toISOString(),
context: context,
metadata: error.metadata || {},
stackTrace: error.stack
};
},
// Create error response for client
createErrorResponse: (error, includeDetails = false) => {
const response = {
success: false,
error: {
code: error.code,
message: error.message
}
};
if (includeDetails) {
response.error.details = error.details;
response.error.metadata = error.metadata;
response.error.requestId = error.requestId;
response.error.timestamp = error.timestamp;
}
return response;
}
};
module.exports = {
GrpcError,
ValidationError,
BusinessError,
NotFoundError,
PermissionError,
ConflictError,
RateLimitError,
ServiceUnavailableError,
TimeoutError,
ErrorHandler,
RetryHandler,
CircuitBreaker,
ErrorRecovery,
ErrorUtils
};