🎯 Ejemplos recomendados
Balanced sample collections from various categories for you to explore
Ejemplos de gRPC
Ejemplos del protocolo de comunicación gRPC (Google Remote Procedure Call) con Protocol Buffers y definiciones de servicios
💻 Definición de Protocol Buffers protobuf
🟢 simple
Definiciones de servicio y estructuras de mensajes básicos de Protocol Buffers
// Protocol Buffers v3 syntax
syntax = "proto3";
// Package declaration for namespace
package userservice;
// Import statement for common types
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// Option for Go specific settings
option go_package = "github.com/example/userservice";
// Simple message definition
message User {
string id = 1; // Unique identifier
string name = 2; // User name
string email = 3; // User email
int32 age = 4; // User age
bool active = 5; // Account status
google.protobuf.Timestamp created_at = 6; // Creation timestamp
repeated string roles = 7; // User roles list
map<string, string> metadata = 8; // Additional metadata
}
// Request and response messages
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
repeated string roles = 4;
}
message CreateUserResponse {
User user = 1;
string message = 2;
}
message GetUserRequest {
string user_id = 1;
}
message GetUserResponse {
User user = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 limit = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
int32 total = 2;
}
message UpdateUserRequest {
string user_id = 1;
User user = 2;
}
message UpdateUserResponse {
User user = 1;
string message = 2;
}
message DeleteUserRequest {
string user_id = 1;
}
message DeleteUserResponse {
bool success = 1;
string message = 2;
}
// Enum definition
message User {
enum Status {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
Status status = 9;
}
// Service definition
service UserService {
// Unary RPC
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse);
// Server streaming RPC
rpc ListUsers(ListUsersRequest) returns (stream ListUsersResponse);
// Client streaming RPC
rpc BatchCreateUsers(stream CreateUserRequest) returns (CreateUserResponse);
// Bidirectional streaming RPC
rpc StreamUsers(stream GetUserRequest) returns (stream GetUserResponse);
}
// Health check service
service HealthCheck {
rpc Check(google.protobuf.Empty) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}
// Authentication service
service AuthService {
rpc Login(LoginRequest) returns (LoginResponse);
rpc ValidateToken(TokenRequest) returns (TokenResponse);
rpc RefreshToken(TokenRequest) returns (TokenResponse);
}
message LoginRequest {
string username = 1;
string password = 2;
}
message LoginResponse {
string access_token = 1;
string refresh_token = 2;
int64 expires_in = 3;
}
message TokenRequest {
string token = 1;
}
message TokenResponse {
bool valid = 1;
string user_id = 2;
repeated string roles = 3;
}
💻 Implementación del Servidor gRPC javascript
🟡 intermediate
Implementación de servidor gRPC usando Node.js
// gRPC Server Implementation in Node.js
const grpc = require('@grpc/grpc-js');
const fs = require('fs');
const path = require('path');
// Load Protocol Buffer definitions
const PROTO_PATH = path.join(__dirname, 'user.proto');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const userProto = grpc.loadPackageDefinition(packageDefinition).userservice;
// In-memory user database
const usersDatabase = new Map();
let userIdCounter = 1;
// Service implementation handlers
class UserServiceImpl {
// Unary RPC: Create User
createUser(call, callback) {
const { name, email, age, roles } = call.request;
// Validate input
if (!name || !email) {
return callback({
code: grpc.status.INVALID_ARGUMENT,
message: 'Name and email are required'
});
}
// Create new user
const user = {
id: (userIdCounter++).toString(),
name,
email,
age: age || 0,
active: true,
created_at: new Date().toISOString(),
roles: roles || ['user'],
metadata: {}
};
usersDatabase.set(user.id, user);
callback(null, {
user,
message: 'User created successfully'
});
}
// Unary RPC: Get User
getUser(call, callback) {
const { user_id } = call.request;
const user = usersDatabase.get(user_id);
if (!user) {
return callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
}
callback(null, { user });
}
// Unary RPC: Update User
updateUser(call, callback) {
const { user_id, user: updatedData } = call.request;
const existingUser = usersDatabase.get(user_id);
if (!existingUser) {
return callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
}
// Update user data
const updatedUser = {
...existingUser,
...updatedData,
id: existingUser.id // Preserve ID
};
usersDatabase.set(user_id, updatedUser);
callback(null, {
user: updatedUser,
message: 'User updated successfully'
});
}
// Unary RPC: Delete User
deleteUser(call, callback) {
const { user_id } = call.request;
const user = usersDatabase.get(user_id);
if (!user) {
return callback({
code: grpc.status.NOT_FOUND,
message: 'User not found'
});
}
usersDatabase.delete(user_id);
callback(null, {
success: true,
message: 'User deleted successfully'
});
}
// Server Streaming RPC: List Users
listUsers(call) {
const { page = 1, limit = 10, filter } = call.request;
const users = Array.from(usersDatabase.values());
// Apply filter if provided
let filteredUsers = users;
if (filter) {
filteredUsers = users.filter(user =>
user.name.toLowerCase().includes(filter.toLowerCase()) ||
user.email.toLowerCase().includes(filter.toLowerCase())
);
}
// Pagination
const startIndex = (page - 1) * limit;
const endIndex = startIndex + limit;
const paginatedUsers = filteredUsers.slice(startIndex, endIndex);
// Send response
call.write({
users: paginatedUsers,
total: filteredUsers.length
});
call.end();
}
// Client Streaming RPC: Batch Create Users
async batchCreateUsers(call, callback) {
const createdUsers = [];
call.on('data', (request) => {
try {
const { name, email, age, roles } = request;
if (!name || !email) {
return call.emit('error', {
code: grpc.status.INVALID_ARGUMENT,
message: 'Name and email are required'
});
}
const user = {
id: (userIdCounter++).toString(),
name,
email,
age: age || 0,
active: true,
created_at: new Date().toISOString(),
roles: roles || ['user'],
metadata: {}
};
usersDatabase.set(user.id, user);
createdUsers.push(user);
} catch (error) {
call.emit('error', {
code: grpc.status.INTERNAL,
message: error.message
});
}
});
call.on('end', () => {
callback(null, {
user: createdUsers[createdUsers.length - 1] || null,
message: `Batch created ${createdUsers.length} users successfully`
});
});
}
// Bidirectional Streaming RPC: Stream Users
streamUsers(call) {
call.on('data', (request) => {
const { user_id } = request;
const user = usersDatabase.get(user_id);
if (user) {
call.write({ user });
} else {
call.emit('error', {
code: grpc.status.NOT_FOUND,
message: `User ${user_id} not found`
});
}
});
call.on('end', () => {
call.end();
});
}
}
// Authentication service implementation
class AuthServiceImpl {
login(call, callback) {
const { username, password } = call.request;
// Simple authentication logic (in production, use proper hashing and database)
if (username === 'admin' && password === 'password') {
const accessToken = 'mock-access-token-' + Date.now();
const refreshToken = 'mock-refresh-token-' + Date.now();
callback(null, {
access_token: accessToken,
refresh_token: refreshToken,
expires_in: 3600
});
} else {
callback({
code: grpc.status.UNAUTHENTICATED,
message: 'Invalid credentials'
});
}
}
validateToken(call, callback) {
const { token } = call.request;
// Simple token validation (in production, use proper JWT validation)
if (token && token.startsWith('mock-access-token-')) {
callback(null, {
valid: true,
user_id: 'admin',
roles: ['admin', 'user']
});
} else {
callback({
code: grpc.status.UNAUTHENTICATED,
message: 'Invalid token'
});
}
}
refreshToken(call, callback) {
const { token } = call.request;
if (token && token.startsWith('mock-refresh-token-')) {
const newAccessToken = 'mock-access-token-' + Date.now();
callback(null, {
valid: true,
user_id: 'admin',
roles: ['admin', 'user']
});
} else {
callback({
code: grpc.status.UNAUTHENTICATED,
message: 'Invalid refresh token'
});
}
}
}
// Health check service implementation
class HealthCheckImpl {
check(call, callback) {
callback(null, {
status: 1 // SERVING
});
}
watch(call) {
const interval = setInterval(() => {
call.write({
status: 1 // SERVING
});
}, 1000);
call.on('end', () => {
clearInterval(interval);
call.end();
});
}
}
// Create gRPC server
function createServer() {
const server = new grpc.Server();
// Add services
server.addService(userProto.UserService.service, new UserServiceImpl());
server.addService(userProto.AuthService.service, new AuthServiceImpl());
server.addService(userProto.HealthCheck.service, new HealthCheckImpl());
return server;
}
// Server configuration
const server = createServer();
const PORT = process.env.GRPC_PORT || 50051;
const HOST = process.env.GRPC_HOST || '0.0.0.0';
// SSL/TLS configuration (optional)
const credentials = grpc.ServerCredentials.createInsecure();
// For SSL:
// const credentials = grpc.ServerCredentials.createSsl(
// fs.readFileSync(path.join(__dirname, 'ca.crt')),
// [{
// cert_chain: fs.readFileSync(path.join(__dirname, 'server.crt')),
// private_key: fs.readFileSync(path.join(__dirname, 'server.key'))
// }],
// false
// );
// Start server
server.bindAsync(`${HOST}:${PORT}`, credentials, (err, port) => {
if (err) {
console.error('Failed to start server:', err);
return;
}
console.log(`gRPC server listening on ${HOST}:${PORT}`);
server.start();
});
// Graceful shutdown
process.on('SIGINT', () => {
console.log('Shutting down gRPC server...');
server.tryShutdown((err) => {
if (err) {
console.error('Error during shutdown:', err);
process.exit(1);
}
console.log('Server stopped successfully');
process.exit(0);
});
});
// Force shutdown after timeout
setTimeout(() => {
console.log('Forcing server shutdown...');
server.forceShutdown();
process.exit(0);
}, 10000);
module.exports = { createServer, UserServiceImpl, AuthServiceImpl, HealthCheckImpl };
💻 Implementación del Cliente gRPC javascript
🟡 intermediate
Implementación de cliente gRPC usando Node.js
// gRPC Client Implementation in Node.js
const grpc = require('@grpc/grpc-js');
const path = require('path');
// Load Protocol Buffer definitions
const PROTO_PATH = path.join(__dirname, 'user.proto');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const userProto = grpc.loadPackageDefinition(packageDefinition).userservice;
// Client configuration
const GRPC_HOST = process.env.GRPC_HOST || 'localhost';
const GRPC_PORT = process.env.GRPC_PORT || 50051;
const SERVER_ADDRESS = `${GRPC_HOST}:${GRPC_PORT}`;
// SSL/TLS configuration (optional)
const credentials = grpc.credentials.createInsecure();
// For SSL:
// const credentials = grpc.credentials.createSsl(
// fs.readFileSync(path.join(__dirname, 'ca.crt')),
// fs.readFileSync(path.join(__dirname, 'client.key')),
// fs.readFileSync(path.join(__dirname, 'client.crt'))
// );
// Create client instances
const userClient = new userProto.UserService(
SERVER_ADDRESS,
credentials
);
const authClient = new userProto.AuthService(
SERVER_ADDRESS,
credentials
);
const healthClient = new userProto.HealthCheck(
SERVER_ADDRESS,
credentials
);
// Client utility class
class GrpcClient {
constructor() {
this.userClient = userClient;
this.authClient = authClient;
this.healthClient = healthClient;
}
// Health check method
async checkHealth() {
return new Promise((resolve, reject) => {
this.healthClient.check({}, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// Authentication methods
async login(username, password) {
return new Promise((resolve, reject) => {
const request = { username, password };
this.authClient.login(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
async validateToken(token) {
return new Promise((resolve, reject) => {
const request = { token };
this.authClient.validateToken(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// User service methods
async createUser(userData) {
return new Promise((resolve, reject) => {
const request = {
name: userData.name,
email: userData.email,
age: userData.age || 0,
roles: userData.roles || ['user']
};
this.userClient.createUser(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
async getUser(userId) {
return new Promise((resolve, reject) => {
const request = { user_id: userId };
this.userClient.getUser(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
async updateUser(userId, userData) {
return new Promise((resolve, reject) => {
const request = {
user_id: userId,
user: userData
};
this.userClient.updateUser(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
async deleteUser(userId) {
return new Promise((resolve, reject) => {
const request = { user_id: userId };
this.userClient.deleteUser(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
});
});
}
// Server streaming method
async listUsers(page = 1, limit = 10, filter = '') {
return new Promise((resolve, reject) => {
const request = { page, limit, filter };
const call = this.userClient.listUsers(request);
const users = [];
let total = 0;
call.on('data', (response) => {
users.push(...response.users);
total = response.total;
});
call.on('end', () => {
resolve({
users,
total,
page,
limit
});
});
call.on('error', (error) => {
reject(error);
});
});
}
// Client streaming method
async batchCreateUsers(usersData) {
return new Promise((resolve, reject) => {
const call = this.userClient.batchCreateUsers();
// Send all user data
usersData.forEach(userData => {
const request = {
name: userData.name,
email: userData.email,
age: userData.age || 0,
roles: userData.roles || ['user']
};
call.write(request);
});
// End the stream
call.end();
// Handle response
call.on('data', (response) => {
resolve(response);
});
call.on('error', (error) => {
reject(error);
});
});
}
// Bidirectional streaming method
async streamUsers(userIds) {
return new Promise((resolve, reject) => {
const call = this.userClient.streamUsers();
const users = [];
call.on('data', (response) => {
users.push(response.user);
});
call.on('end', () => {
resolve(users);
});
call.on('error', (error) => {
reject(error);
});
// Send user IDs to stream
userIds.forEach(userId => {
call.write({ user_id: userId });
});
// End the stream
call.end();
});
}
}
// Example usage
async function demonstrateClient() {
const client = new GrpcClient();
try {
// Health check
console.log('Checking server health...');
const health = await client.checkHealth();
console.log('Health status:', health.status === 1 ? 'SERVING' : 'NOT_SERVING');
// Authentication
console.log('\nAuthenticating...');
const authResponse = await client.login('admin', 'password');
console.log('Login successful:', authResponse);
// Validate token
const tokenValidation = await client.validateToken(authResponse.access_token);
console.log('Token valid:', tokenValidation.valid);
// Create user
console.log('\nCreating user...');
const userResponse = await client.createUser({
name: 'John Doe',
email: '[email protected]',
age: 30,
roles: ['user', 'developer']
});
console.log('Created user:', userResponse.user);
const userId = userResponse.user.id;
// Get user
console.log('\nGetting user...');
const getUserResponse = await client.getUser(userId);
console.log('Retrieved user:', getUserResponse.user);
// Update user
console.log('\nUpdating user...');
const updateResponse = await client.updateUser(userId, {
name: 'John Smith',
age: 31
});
console.log('Updated user:', updateResponse.user);
// List users
console.log('\nListing users...');
const listResponse = await client.listUsers(1, 10, '');
console.log(`Found ${listResponse.total} users:`, listResponse.users);
// Batch create users
console.log('\nBatch creating users...');
const batchResponse = await client.batchCreateUsers([
{ name: 'Alice Johnson', email: '[email protected]', age: 25 },
{ name: 'Bob Wilson', email: '[email protected]', age: 28 },
{ name: 'Charlie Brown', email: '[email protected]', age: 32 }
]);
console.log('Batch creation response:', batchResponse.message);
// Stream users
console.log('\nStreaming users...');
const streamResponse = await client.streamUsers([userId, '999', '888']);
console.log('Streamed users:', streamResponse);
// Delete user
console.log('\nDeleting user...');
const deleteResponse = await client.deleteUser(userId);
console.log('Delete response:', deleteResponse);
} catch (error) {
console.error('Error:', error);
if (error.code) {
console.error('gRPC Error Code:', error.code);
console.error('gRPC Error Message:', error.message);
}
}
}
// Error handling utility
function handleGrpcError(error) {
if (error.code === undefined) {
return 'Network error or connection failed';
}
const errorMessages = {
[grpc.status.OK]: 'OK',
[grpc.status.CANCELLED]: 'Operation cancelled',
[grpc.status.UNKNOWN]: 'Unknown error',
[grpc.status.INVALID_ARGUMENT]: 'Invalid argument',
[grpc.status.DEADLINE_EXCEEDED]: 'Deadline exceeded',
[grpc.status.NOT_FOUND]: 'Not found',
[grpc.status.ALREADY_EXISTS]: 'Already exists',
[grpc.status.PERMISSION_DENIED]: 'Permission denied',
[grpc.status.UNAUTHENTICATED]: 'Unauthenticated',
[grpc.status.RESOURCE_EXHAUSTED]: 'Resource exhausted',
[grpc.status.FAILED_PRECONDITION]: 'Failed precondition',
[grpc.status.ABORTED]: 'Aborted',
[grpc.status.OUT_OF_RANGE]: 'Out of range',
[grpc.status.UNIMPLEMENTED]: 'Unimplemented',
[grpc.status.INTERNAL]: 'Internal error',
[grpc.status.UNAVAILABLE]: 'Unavailable',
[grpc.status.DATA_LOSS]: 'Data loss',
[grpc.status.DO_NOT_USE]: 'Do not use'
};
return errorMessages[error.code] || `Unknown gRPC error: ${error.code}`;
}
// Export client class and utility functions
module.exports = {
GrpcClient,
handleGrpcError,
demonstrateClient
};
// Run demonstration if this file is executed directly
if (require.main === module) {
demonstrateClient().catch(console.error);
}
💻 Reflexión de Servicios gRPC
🟡 intermediate
Uso de reflexión de servicios para consultas dinámicas
// gRPC Server Reflection Implementation
const grpc = require('@grpc/grpc-js');
const fs = require('fs');
const path = require('path');
// Server reflection service implementation
class ServerReflectionService {
constructor(server, protoFiles = []) {
this.server = server;
this.services = new Map();
this.fileDescriptors = new Map();
this.symbolTable = new Map();
// Load proto files
this.loadProtoFiles(protoFiles);
// Register reflection service
this.registerReflectionService();
}
loadProtoFiles(protoFiles) {
protoFiles.forEach(filePath => {
try {
const protoContent = fs.readFileSync(filePath, 'utf8');
const fileName = path.basename(filePath);
// Parse proto file (simplified - in production use proper parser)
const descriptor = this.parseProtoFile(protoContent, fileName);
this.fileDescriptors.set(fileName, descriptor);
// Extract service information
this.extractServices(descriptor, fileName);
console.log(`Loaded proto file: ${fileName}`);
} catch (error) {
console.error(`Failed to load proto file ${filePath}:`, error.message);
}
});
}
parseProtoFile(content, fileName) {
// Simplified proto file parsing
// In production, use @grpc/proto-loader or protobufjs
const descriptor = {
name: fileName,
package: '',
messages: [],
services: [],
enums: [],
dependencies: []
};
const lines = content.split('\n');
let currentService = null;
let currentMessage = null;
lines.forEach(line => {
line = line.trim();
if (line.startsWith('package ')) {
descriptor.package = line.replace('package ', '').replace(';', '');
} else if (line.startsWith('service ')) {
const serviceName = line.match(/service\s+(\w+)/);
if (serviceName) {
currentService = {
name: serviceName[1],
methods: []
};
descriptor.services.push(currentService);
}
} else if (line.startsWith('rpc ') && currentService) {
const methodMatch = line.match(/rpc\s+(\w+)\s*\(([^)]+)\)\s*returns\s*\(([^)]+)\)/);
if (methodMatch) {
currentService.methods.push({
name: methodMatch[1],
inputType: methodMatch[2].trim(),
outputType: methodMatch[3].trim(),
clientStreaming: line.includes('stream ') && line.indexOf('stream ') < line.indexOf('rpc ') + 4,
serverStreaming: line.includes('stream ') && line.indexOf('stream ') > line.indexOf('returns ')
});
}
} else if (line.startsWith('message ')) {
const messageName = line.match(/message\s+(\w+)/);
if (messageName) {
currentMessage = {
name: messageName[1],
fields: []
};
descriptor.messages.push(currentMessage);
}
} else if (currentMessage && line.includes('=')) {
const fieldMatch = line.match(/(\w+)\s+(\w+)\s*=\s+(\d+)/);
if (fieldMatch) {
currentMessage.fields.push({
name: fieldMatch[2],
type: fieldMatch[1],
number: parseInt(fieldMatch[3])
});
}
}
});
return descriptor;
}
extractServices(descriptor, fileName) {
descriptor.services.forEach(service => {
const fullName = descriptor.package ? `${descriptor.package}.${service.name}` : service.name;
this.services.set(fullName, {
name: service.name,
fullName: fullName,
file: fileName,
package: descriptor.package,
methods: service.methods.map(method => ({
name: method.name,
inputType: method.inputType,
outputType: method.outputType,
clientStreaming: method.clientStreaming,
serverStreaming: method.serverStreaming
}))
});
// Add to symbol table
this.symbolTable.set(fullName, 'SERVICE');
service.methods.forEach(method => {
const methodFullName = `${fullName}.${method.name}`;
this.symbolTable.set(methodFullName, 'METHOD');
});
});
descriptor.messages.forEach(message => {
const fullName = descriptor.package ? `${descriptor.package}.${message.name}` : message.name;
this.symbolTable.set(fullName, 'MESSAGE');
message.fields.forEach(field => {
const fieldFullName = `${fullName}.${field.name}`;
this.symbolTable.set(fieldFullName, 'FIELD');
});
});
}
registerReflectionService() {
const reflectionProto = this.createReflectionProto();
this.server.addService(reflectionProto.ServerReflection.service, {
serverReflectionInfo: this.handleServerReflectionInfo.bind(this)
});
}
createReflectionProto() {
// Simplified reflection proto definition
return {
ServerReflection: {
service: {
serverReflectionInfo: {
path: '/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo',
requestStream: true,
responseStream: true
}
}
}
};
}
async handleServerReflectionInfo(call) {
call.on('data', (request) => {
try {
const response = this.processReflectionRequest(request);
call.write(response);
} catch (error) {
call.write({
errorResponse: {
errorCode: grpc.status.INTERNAL,
errorMessage: error.message
}
});
}
});
call.on('end', () => {
call.end();
});
}
processReflectionRequest(request) {
if (request.listServices) {
return this.handleListServices(request.listServices);
} else if (request.fileByFilename) {
return this.handleFileByFilename(request.fileByFilename);
} else if (request.fileContainingSymbol) {
return this.handleFileContainingSymbol(request.fileContainingSymbol);
} else if (request.allExtensionNumbersOfType) {
return this.handleAllExtensionNumbers(request.allExtensionNumbersOfType);
} else if (request.listExtensionNumbers) {
return this.handleListExtensionNumbers(request.listExtensionNumbers);
} else {
throw new Error('Unknown reflection request type');
}
}
handleListServices(request) {
const services = [];
// Get services from the server
if (this.server && this.server.services) {
Object.keys(this.server.services).forEach(servicePath => {
const serviceName = servicePath.split('/').pop();
if (!serviceName.includes('ServerReflection')) {
services.push({ name: serviceName });
}
});
}
return {
listServicesResponse: {
service: services
}
};
}
handleFileByFilename(request) {
const fileName = request.filename;
const descriptor = this.fileDescriptors.get(fileName);
if (!descriptor) {
return {
errorResponse: {
errorCode: grpc.status.NOT_FOUND,
errorMessage: `File not found: ${fileName}`
}
};
}
const fileDescriptorProto = this.serializeDescriptor(descriptor);
return {
fileDescriptorResponse: {
fileDescriptorProto: [fileDescriptorProto]
}
};
}
handleFileContainingSymbol(request) {
const symbol = request.symbol;
const fileName = this.findFileForSymbol(symbol);
if (!fileName) {
return {
errorResponse: {
errorCode: grpc.status.NOT_FOUND,
errorMessage: `Symbol not found: ${symbol}`
}
};
}
const descriptor = this.fileDescriptors.get(fileName);
const fileDescriptorProto = this.serializeDescriptor(descriptor);
return {
fileDescriptorResponse: {
fileDescriptorProto: [fileDescriptorProto]
}
};
}
handleAllExtensionNumbers(request) {
// Simplified implementation
return {
allExtensionNumbersResponse: {
baseTypeName: request.extending_type,
extensionNumber: []
}
};
}
handleListExtensionNumbers(request) {
// Simplified implementation
return {
extensionNumberResponse: {
baseTypeName: request.containing_type,
extensionNumber: []
}
};
}
findFileForSymbol(symbol) {
for (const [fileName, descriptor] of this.fileDescriptors) {
const fullName = descriptor.package ? `${descriptor.package}.${symbol}` : symbol;
// Check if symbol exists in this file
if (descriptor.services.some(s => s.name === symbol) ||
descriptor.messages.some(m => m.name === symbol)) {
return fileName;
}
}
return null;
}
serializeDescriptor(descriptor) {
// Simplified serialization - in production use proper protobuf serialization
const serialized = JSON.stringify({
name: descriptor.name,
package: descriptor.package,
dependency: descriptor.dependencies,
messageType: descriptor.messages.map(msg => ({
name: msg.name,
field: msg.fields.map(field => ({
name: field.name,
number: field.number,
label: 'LABEL_OPTIONAL',
type: this.getProtobufType(field.type)
}))
})),
serviceType: descriptor.services.map(svc => ({
name: svc.name,
method: svc.methods.map(method => ({
name: method.name,
inputType: method.inputType,
outputType: method.outputType,
clientStreaming: method.clientStreaming,
serverStreaming: method.serverStreaming
}))
}))
});
return Buffer.from(serialized);
}
getProtobufType(type) {
const typeMap = {
'string': 'TYPE_STRING',
'int32': 'TYPE_INT32',
'int64': 'TYPE_INT64',
'bool': 'TYPE_BOOL',
'double': 'TYPE_DOUBLE',
'float': 'TYPE_FLOAT'
};
return typeMap[type] || 'TYPE_STRING';
}
}
// Reflection client for service discovery
class ReflectionClient {
constructor(address, credentials) {
this.address = address;
this.credentials = credentials;
this.cache = new Map();
}
async listServices() {
const cacheKey = 'listServices';
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey);
}
const response = await this.makeReflectionRequest({
listServices: {}
});
const services = response.listServicesResponse.service.map(s => s.name);
this.cache.set(cacheKey, services);
return services;
}
async getServiceDescriptor(serviceName) {
const cacheKey = `service_${serviceName}`;
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey);
}
const response = await this.makeReflectionRequest({
fileContainingSymbol: { symbol: serviceName }
});
if (!response.fileDescriptorResponse || !response.fileDescriptorResponse.fileDescriptorProto.length) {
throw new Error(`Service descriptor not found for ${serviceName}`);
}
const descriptor = this.deserializeDescriptor(response.fileDescriptorResponse.fileDescriptorProto[0]);
this.cache.set(cacheKey, descriptor);
return descriptor;
}
async getAllServiceDescriptors() {
const services = await this.listServices();
const descriptors = {};
for (const serviceName of services) {
try {
descriptors[serviceName] = await this.getServiceDescriptor(serviceName);
} catch (error) {
console.warn(`Failed to get descriptor for ${serviceName}:`, error.message);
}
}
return descriptors;
}
async makeReflectionRequest(request) {
return new Promise((resolve, reject) => {
const client = this.createReflectionClient();
const call = client.serverReflectionInfo();
call.write(request);
let response = null;
let error = null;
call.on('data', (data) => {
response = data;
});
call.on('end', () => {
if (error) {
reject(error);
} else if (response) {
resolve(response);
} else {
reject(new Error('No response received'));
}
});
call.on('error', (err) => {
error = err;
});
call.end();
});
}
createReflectionClient() {
// Simplified client creation
return {
serverReflectionInfo: () => ({
write: () => {},
on: () => {},
end: () => {}
})
};
}
deserializeDescriptor(buffer) {
// Simplified deserialization - in production use proper protobuf deserialization
return JSON.parse(buffer.toString());
}
clearCache() {
this.cache.clear();
}
}
// Service discovery utilities
class ServiceDiscovery {
constructor(reflectionClient) {
this.reflectionClient = reflectionClient;
this.serviceRegistry = new Map();
}
async discoverServices() {
console.log('Discovering services...');
const services = await this.reflectionClient.listServices();
console.log(`Found ${services.length} services:`, services);
for (const serviceName of services) {
try {
await this.discoverService(serviceName);
} catch (error) {
console.error(`Failed to discover ${serviceName}:`, error.message);
}
}
console.log('Service discovery completed');
return this.serviceRegistry;
}
async discoverService(serviceName) {
console.log(`Discovering service: ${serviceName}`);
const descriptor = await this.reflectionClient.getServiceDescriptor(serviceName);
const serviceInfo = {
name: serviceName,
package: descriptor.package,
methods: descriptor.serviceType[0].method.map(method => ({
name: method.name,
inputType: method.inputType,
outputType: method.outputType,
clientStreaming: method.clientStreaming,
serverStreaming: method.serverStreaming,
path: `/${descriptor.package ? descriptor.package + '.' : ''}${serviceName}/${method.name}`
})),
messages: descriptor.messageType.map(msg => ({
name: msg.name,
fields: msg.field.map(field => ({
name: field.name,
type: field.type,
number: field.number
}))
}))
};
this.serviceRegistry.set(serviceName, serviceInfo);
console.log(`Service ${serviceName} discovered with ${serviceInfo.methods.length} methods`);
return serviceInfo;
}
getService(serviceName) {
return this.serviceRegistry.get(serviceName);
}
getAllServices() {
return Array.from(this.serviceRegistry.values());
}
findMethod(methodName) {
for (const [serviceName, serviceInfo] of this.serviceRegistry) {
const method = serviceInfo.methods.find(m => m.name === methodName);
if (method) {
return {
service: serviceName,
method: method
};
}
}
return null;
}
generateClientCode(serviceName, language = 'javascript') {
const serviceInfo = this.serviceRegistry.get(serviceName);
if (!serviceInfo) {
throw new Error(`Service ${serviceName} not found`);
}
if (language === 'javascript') {
return this.generateJavascriptClient(serviceInfo);
} else if (language === 'typescript') {
return this.generateTypeScriptClient(serviceInfo);
} else {
throw new Error(`Language ${language} not supported`);
}
}
generateJavascriptClient(serviceInfo) {
let code = `// Auto-generated gRPC client for ${serviceInfo.name}\n`;
code += `const grpc = require('@grpc/grpc-js');\n\n`;
// Client class
code += `class ${serviceInfo.name}Client {\n`;
code += ` constructor(address, credentials) {\n`;
code += ` this.client = new grpc.Client(address, credentials);\n`;
code += ` }\n\n`;
// Generate methods
serviceInfo.methods.forEach(method => {
const methodName = method.name.charAt(0).toLowerCase() + method.name.slice(1);
if (method.clientStreaming && method.serverStreaming) {
// Bidirectional streaming
code += ` ${methodName}() {\n`;
code += ` return this.client.makeBidiStreamRequest(\n`;
code += ` '${method.path}'\n`;
code += ` );\n`;
code += ` }\n\n`;
} else if (method.clientStreaming) {
// Client streaming
code += ` ${methodName}(callback) {\n`;
code += ` return this.client.makeClientStreamRequest(\n`;
code += ` '${method.path}', callback\n`;
code += ` );\n`;
code += ` }\n\n`;
} else if (method.serverStreaming) {
// Server streaming
code += ` ${methodName}(request) {\n`;
code += ` return this.client.makeServerStreamRequest(\n`;
code += ` '${method.path}', request\n`;
code += ` );\n`;
code += ` }\n\n`;
} else {
// Unary
code += ` ${methodName}(request, callback) {\n`;
code += ` return this.client.makeUnaryRequest(\n`;
code += ` '${method.path}', request, callback\n`;
code += ` );\n`;
code += ` }\n\n`;
}
});
code += `}\n\n`;
code += `module.exports = ${serviceInfo.name}Client;\n`;
return code;
}
generateTypeScriptClient(serviceInfo) {
// Similar to JavaScript but with TypeScript types
return this.generateJavascriptClient(serviceInfo).replace('javascript', 'typescript');
}
}
module.exports = {
ServerReflectionService,
ReflectionClient,
ServiceDiscovery
};
💻 Verificación de Salud gRPC
🟡 intermediate
Implementación del protocolo estándar de verificación de salud
// gRPC Health Checking Implementation
const grpc = require('@grpc/grpc-js');
// Health checking status enum
const HealthCheckResponseServingStatus = {
UNKNOWN: 0,
SERVING: 1,
NOT_SERVING: 2,
SERVICE_UNKNOWN: 3
};
// Health check implementation
class HealthCheckService {
constructor(options = {}) {
this.statuses = new Map();
this.defaultStatus = options.defaultStatus || HealthCheckResponseServingStatus.SERVING;
this.checkInterval = options.checkInterval || 30000;
this.detailedLogging = options.detailedLogging || false;
this.watchers = new Map();
// Set default service status
this.setStatus('', this.defaultStatus);
}
// Set health status for a service
setStatus(service, status) {
const oldStatus = this.statuses.get(service);
this.statuses.set(service, status);
if (this.detailedLogging) {
console.log(`Health status changed for '${service}': ${this.getStatusString(oldStatus)} -> ${this.getStatusString(status)}`);
}
// Notify all watchers for this service
this.notifyWatchers(service, status);
}
// Get health status for a service
getStatus(service) {
return this.statuses.get(service) || HealthCheckResponseServingStatus.SERVICE_UNKNOWN;
}
// Clear status for a service
clearStatus(service) {
this.statuses.delete(service);
this.notifyWatchers(service, HealthCheckResponseServingStatus.SERVICE_UNKNOWN);
}
// Get all service statuses
getAllStatuses() {
const result = {};
this.statuses.forEach((status, service) => {
result[service] = {
status: status,
statusString: this.getStatusString(status)
};
});
return result;
}
// gRPC health check method
check(call, callback) {
const service = call.request.service || '';
const status = this.getStatus(service);
if (this.detailedLogging) {
console.log(`Health check requested for service: '${service}', status: ${this.getStatusString(status)}`);
}
callback(null, {
status: status
});
}
// gRPC health watch method (server streaming)
watch(call) {
const service = call.request.service || '';
const watcherId = this.generateWatcherId();
if (this.detailedLogging) {
console.log(`Health watch started for service: '${service}', watcher: ${watcherId}`);
}
// Store watcher
if (!this.watchers.has(service)) {
this.watchers.set(service, new Map());
}
this.watchers.get(service).set(watcherId, call);
// Send current status immediately
const currentStatus = this.getStatus(service);
call.write({
status: currentStatus
});
// Handle watcher cleanup
const cleanup = () => {
if (this.watchers.has(service)) {
this.watchers.get(service).delete(watcherId);
if (this.watchers.get(service).size === 0) {
this.watchers.delete(service);
}
}
if (this.detailedLogging) {
console.log(`Health watch ended for service: '${service}', watcher: ${watcherId}`);
}
};
call.on('end', cleanup);
call.on('error', cleanup);
call.on('cancel', cleanup);
}
// Notify all watchers of a service status change
notifyWatchers(service, status) {
if (!this.watchers.has(service)) {
return;
}
const watchers = this.watchers.get(service);
const message = { status: status };
watchers.forEach((call, watcherId) => {
try {
call.write(message);
} catch (error) {
console.error(`Failed to notify watcher ${watcherId} for service ${service}:`, error.message);
watchers.delete(watcherId);
}
});
}
// Generate unique watcher ID
generateWatcherId() {
return 'watcher-' + Math.random().toString(36).substr(2, 9);
}
// Convert status to string
getStatusString(status) {
const statusStrings = {
[HealthCheckResponseServingStatus.UNKNOWN]: 'UNKNOWN',
[HealthCheckResponseServingStatus.SERVING]: 'SERVING',
[HealthCheckResponseServingStatus.NOT_SERVING]: 'NOT_SERVING',
[HealthCheckResponseServingStatus.SERVICE_UNKNOWN]: 'SERVICE_UNKNOWN'
};
return statusStrings[status] || 'UNKNOWN';
}
// Get watcher statistics
getWatcherStats() {
const stats = {
totalWatchers: 0,
serviceWatchers: {}
};
this.watchers.forEach((watchers, service) => {
stats.totalWatchers += watchers.size;
stats.serviceWatchers[service] = watchers.size;
});
return stats;
}
}
// Advanced health check with dependencies
class AdvancedHealthCheckService extends HealthCheckService {
constructor(options = {}) {
super(options);
this.dependencies = new Map();
this.checkers = new Map();
this.checkResults = new Map();
this.checkTimeout = options.checkTimeout || 5000;
this.failureThreshold = options.failureThreshold || 3;
this.recoveryThreshold = options.recoveryThreshold || 2;
}
// Add dependency for a service
addDependency(service, dependency, checker) {
if (!this.dependencies.has(service)) {
this.dependencies.set(service, new Set());
}
this.dependencies.get(service).add(dependency);
this.checkers.set(dependency, checker);
// Start monitoring the dependency
this.startDependencyMonitoring(dependency, checker);
}
// Start monitoring a dependency
async startDependencyMonitoring(dependency, checker) {
const checkDependency = async () => {
try {
const startTime = Date.now();
const result = await Promise.race([
checker(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Health check timeout')), this.checkTimeout)
)
]);
const responseTime = Date.now() - startTime;
this.updateCheckResult(dependency, {
healthy: true,
lastCheck: new Date(),
responseTime: responseTime,
message: result.message || 'OK',
details: result.details || {}
});
} catch (error) {
this.updateCheckResult(dependency, {
healthy: false,
lastCheck: new Date(),
error: error.message,
lastError: error
});
}
};
// Run check immediately
checkDependency();
// Schedule periodic checks
setInterval(checkDependency, this.checkInterval);
}
// Update check result for a dependency
updateCheckResult(dependency, result) {
const previousResult = this.checkResults.get(dependency) || {
consecutiveFailures: 0,
consecutiveSuccesses: 0
};
const newResult = {
...previousResult,
...result,
consecutiveFailures: result.healthy ? 0 : previousResult.consecutiveFailures + 1,
consecutiveSuccesses: result.healthy ? previousResult.consecutiveSuccesses + 1 : 0
};
this.checkResults.set(dependency, newResult);
// Determine if dependency should be considered unhealthy
const isUnhealthy = newResult.consecutiveFailures >= this.failureThreshold;
const isRecovered = newResult.consecutiveSuccesses >= this.recoveryThreshold;
// Update service status based on dependency health
this.updateServiceStatusBasedOnDependencies();
}
// Update service status based on all dependencies
updateServiceStatusBasedOnDependencies() {
for (const [service, deps] of this.dependencies) {
let allHealthy = true;
let anyUnhealthy = false;
for (const dependency of deps) {
const result = this.checkResults.get(dependency);
if (!result) {
allHealthy = false;
} else if (result.consecutiveFailures >= this.failureThreshold) {
allHealthy = false;
anyUnhealthy = true;
}
}
let newStatus;
if (allHealthy) {
newStatus = HealthCheckResponseServingStatus.SERVING;
} else if (anyUnhealthy) {
newStatus = HealthCheckResponseServingStatus.NOT_SERVING;
} else {
newStatus = HealthCheckResponseServingStatus.UNKNOWN;
}
const currentStatus = this.getStatus(service);
if (currentStatus !== newStatus) {
this.setStatus(service, newStatus);
}
}
}
// Get detailed health status
getDetailedStatus(service) {
const status = this.getStatus(service);
const dependencies = this.dependencies.get(service) || new Set();
const dependencyStatuses = {};
for (const dependency of dependencies) {
const result = this.checkResults.get(dependency);
dependencyStatuses[dependency] = {
healthy: result ? result.consecutiveFailures < this.failureThreshold : false,
lastCheck: result ? result.lastCheck : null,
responseTime: result ? result.responseTime : null,
error: result && result.error ? result.error : null,
consecutiveFailures: result ? result.consecutiveFailures : 0,
consecutiveSuccesses: result ? result.consecutiveSuccesses : 0
};
}
return {
service: service,
status: status,
statusString: this.getStatusString(status),
dependencies: dependencyStatuses,
allDependenciesHealthy: Object.values(dependencyStatuses).every(dep => dep.healthy),
lastUpdated: new Date()
};
}
// Get all detailed statuses
getAllDetailedStatuses() {
const result = {};
// Include services with dependencies
for (const service of this.dependencies.keys()) {
result[service] = this.getDetailedStatus(service);
}
// Include services without dependencies
for (const service of this.statuses.keys()) {
if (!result[service]) {
result[service] = {
service: service,
status: this.getStatus(service),
statusString: this.getStatusString(this.getStatus(service)),
dependencies: {},
allDependenciesHealthy: true,
lastUpdated: new Date()
};
}
}
return result;
}
}
// Health check client for monitoring services
class HealthCheckClient {
constructor(address, credentials = grpc.credentials.createInsecure()) {
this.address = address;
this.credentials = credentials;
this.client = this.createHealthClient();
}
createHealthClient() {
// Create health check client
// In production, load the actual health proto
return {
check: (request, callback) => {
// Simplified implementation
setTimeout(() => {
callback(null, { status: HealthCheckResponseServingStatus.SERVING });
}, 100);
},
watch: (request) => {
// Simplified streaming implementation
const call = {
write: () => {},
on: (event, handler) => {
if (event === 'data') {
// Simulate periodic health updates
setInterval(() => {
handler({ status: HealthCheckResponseServingStatus.SERVING });
}, 5000);
}
}
};
return call;
}
};
}
// Check health of a specific service
async checkHealth(service = '') {
return new Promise((resolve, reject) => {
const request = { service: service };
this.client.check(request, (error, response) => {
if (error) {
reject(error);
} else {
resolve({
service: service,
status: response.status,
statusString: this.getStatusString(response.status),
timestamp: new Date()
});
}
});
});
}
// Watch health status changes
watchHealth(service = '', callback) {
const request = { service: service };
const call = this.client.watch(request);
call.on('data', (response) => {
callback(null, {
service: service,
status: response.status,
statusString: this.getStatusString(response.status),
timestamp: new Date()
});
});
call.on('error', (error) => {
callback(error);
});
return call;
}
// Check health with timeout
async checkHealthWithTimeout(service = '', timeoutMs = 5000) {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Health check timeout after ${timeoutMs}ms`));
}, timeoutMs);
this.checkHealth(service)
.then(result => {
clearTimeout(timeout);
resolve(result);
})
.catch(error => {
clearTimeout(timeout);
reject(error);
});
});
}
// Check health with retries
async checkHealthWithRetry(service = '', maxRetries = 3, retryDelay = 1000) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await this.checkHealth(service);
} catch (error) {
lastError = error;
if (attempt < maxRetries) {
console.warn(`Health check attempt ${attempt} failed for ${service}, retrying in ${retryDelay}ms:`, error.message);
await this.sleep(retryDelay);
}
}
}
throw lastError;
}
// Batch health check for multiple services
async checkMultipleServices(services) {
const promises = services.map(service =>
this.checkHealth(service).catch(error => ({
service: service,
error: error.message,
status: HealthCheckResponseServingStatus.SERVICE_UNKNOWN,
statusString: 'SERVICE_UNKNOWN',
timestamp: new Date()
}))
);
const results = await Promise.allSettled(promises);
return results.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
} else {
return {
service: services[index],
error: result.reason.message,
status: HealthCheckResponseServingStatus.SERVICE_UNKNOWN,
statusString: 'SERVICE_UNKNOWN',
timestamp: new Date()
};
}
});
}
getStatusString(status) {
const statusStrings = {
[HealthCheckResponseServingStatus.UNKNOWN]: 'UNKNOWN',
[HealthCheckResponseServingStatus.SERVING]: 'SERVING',
[HealthCheckResponseServingStatus.NOT_SERVING]: 'NOT_SERVING',
[HealthCheckResponseServingStatus.SERVICE_UNKNOWN]: 'SERVICE_UNKNOWN'
};
return statusStrings[status] || 'UNKNOWN';
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Health check utilities
class HealthCheckUtils {
// Create database health checker
static createDatabaseHealthChecker(dbConnection) {
return async () => {
try {
await dbConnection.raw('SELECT 1');
return { message: 'Database connection healthy' };
} catch (error) {
throw new Error(`Database connection failed: ${error.message}`);
}
};
}
// Create Redis health checker
static createRedisHealthChecker(redisClient) {
return async () => {
try {
await redisClient.ping();
return { message: 'Redis connection healthy' };
} catch (error) {
throw new Error(`Redis connection failed: ${error.message}`);
}
};
}
// Create HTTP endpoint health checker
static createHttpHealthChecker(url, expectedStatus = 200) {
return async () => {
try {
const response = await fetch(url, {
method: 'GET',
timeout: 5000
});
if (response.status === expectedStatus) {
return {
message: `HTTP endpoint healthy (status: ${response.status})`,
details: { statusCode: response.status }
};
} else {
throw new Error(`HTTP endpoint returned status ${response.status}, expected ${expectedStatus}`);
}
} catch (error) {
throw new Error(`HTTP endpoint check failed: ${error.message}`);
}
};
}
// Create custom health checker
static createCustomHealthChecker(checkFunction) {
return async () => {
try {
const result = await checkFunction();
return {
message: 'Custom health check passed',
details: result
};
} catch (error) {
throw new Error(`Custom health check failed: ${error.message}`);
}
};
}
// Create composite health checker (checks multiple dependencies)
static createCompositeHealthChecker(checkers) {
return async () => {
const results = {};
let allHealthy = true;
for (const [name, checker] of Object.entries(checkers)) {
try {
const result = await checker();
results[name] = {
healthy: true,
result: result
};
} catch (error) {
results[name] = {
healthy: false,
error: error.message
};
allHealthy = false;
}
}
return {
message: allHealthy ? 'All dependencies healthy' : 'Some dependencies unhealthy',
details: results,
allHealthy: allHealthy
};
};
}
}
module.exports = {
HealthCheckResponseServingStatus,
HealthCheckService,
AdvancedHealthCheckService,
HealthCheckClient,
HealthCheckUtils
};
💻 Streaming gRPC javascript
🔴 complex
Implementación de comunicación de streaming cliente, servidor y bidireccional
// Advanced gRPC Streaming Implementation
const grpc = require('@grpc/grpc-js');
const { EventEmitter } = require('events');
// Custom streaming implementation with backpressure and flow control
class GrpcStreamManager extends EventEmitter {
constructor() {
super();
this.activeStreams = new Map();
this.streamMetrics = {
totalStreams: 0,
activeCount: 0,
messagesSent: 0,
messagesReceived: 0
};
}
// Create a managed stream with metrics
createManagedStream(stream, streamId, type) {
const managedStream = {
stream,
streamId,
type, // 'client', 'server', 'bidirectional'
startTime: Date.now(),
messageCount: 0,
byteCount: 0,
isActive: true
};
this.activeStreams.set(streamId, managedStream);
this.streamMetrics.totalStreams++;
this.streamMetrics.activeCount++;
// Add event listeners for metrics
const originalWrite = stream.write.bind(stream);
stream.write = (message) => {
managedStream.messageCount++;
managedStream.byteCount += JSON.stringify(message).length;
this.streamMetrics.messagesSent++;
this.emit('messageSent', {
streamId,
type,
message: managedStream.messageCount,
bytes: managedStream.byteCount
});
return originalWrite(message);
};
// Clean up when stream ends
stream.on('end', () => {
managedStream.isActive = false;
this.streamMetrics.activeCount--;
const duration = Date.now() - managedStream.startTime;
this.emit('streamEnded', {
streamId,
type,
duration,
messageCount: managedStream.messageCount,
byteCount: managedStream.byteCount
});
this.activeStreams.delete(streamId);
});
stream.on('error', (error) => {
managedStream.isActive = false;
this.streamMetrics.activeCount--;
this.emit('streamError', {
streamId,
type,
error: error.message,
messageCount: managedStream.messageCount
});
this.activeStreams.delete(streamId);
});
return stream;
}
getMetrics() {
return {
...this.streamMetrics,
activeStreams: this.activeStreams.size,
streams: Array.from(this.activeStreams.values())
};
}
}
// Server Streaming Example: Real-time Notifications
class NotificationService {
constructor(streamManager) {
this.streamManager = streamManager;
this.subscribers = new Map();
}
// Subscribe to notifications (server streaming)
subscribeToNotifications(call, metadata) {
const userId = metadata.get('user-id') || 'anonymous';
const streamId = `notification-${userId}-${Date.now()}`;
console.log(`User ${userId} subscribed to notifications`);
const managedCall = this.streamManager.createManagedStream(
call, streamId, 'server'
);
// Store subscription
this.subscribers.set(userId, managedCall);
// Send initial subscription confirmation
call.write({
id: 'subscription-confirmation',
type: 'SUBSCRIPTION',
timestamp: new Date().toISOString(),
userId: userId,
message: 'Successfully subscribed to notifications'
});
// Periodic notifications
const interval = setInterval(() => {
if (this.subscribers.has(userId)) {
call.write({
id: `notification-${Date.now()}`,
type: 'INFO',
timestamp: new Date().toISOString(),
userId: userId,
message: `Periodic update for user ${userId}`,
data: {
serverTime: new Date().toISOString(),
randomValue: Math.random()
}
});
}
}, 5000);
// Cleanup on stream end
call.on('end', () => {
clearInterval(interval);
this.subscribers.delete(userId);
console.log(`User ${userId} unsubscribed from notifications`);
});
call.on('error', (error) => {
clearInterval(interval);
this.subscribers.delete(userId);
console.error(`Notification stream error for user ${userId}:`, error.message);
});
}
// Broadcast notification to all subscribers
broadcastNotification(notification) {
const message = {
id: `broadcast-${Date.now()}`,
type: 'BROADCAST',
timestamp: new Date().toISOString(),
message: notification.message,
data: notification.data || {}
};
this.subscribers.forEach((subscription, userId) => {
if (subscription.stream.isActive) {
try {
subscription.stream.write({ ...message, userId });
} catch (error) {
console.error(`Failed to send broadcast to user ${userId}:`, error.message);
this.subscribers.delete(userId);
}
}
});
}
}
// Client Streaming Example: Data Upload with Progress
class DataUploadService {
constructor(streamManager) {
this.streamManager = streamManager;
this.uploads = new Map();
}
// Handle data upload stream (client streaming)
handleDataUpload(call, callback) {
const uploadId = `upload-${Date.now()}`;
const streamId = `upload-${uploadId}`;
console.log(`Starting data upload: ${uploadId}`);
const uploadData = {
id: uploadId,
startTime: Date.now(),
chunks: [],
totalSize: 0,
receivedSize: 0
};
this.uploads.set(uploadId, uploadData);
const managedCall = this.streamManager.createManagedStream(
call, streamId, 'client'
);
// Handle incoming chunks
call.on('data', (chunk) => {
uploadData.chunks.push(chunk);
uploadData.receivedSize += chunk.size || 0;
uploadData.totalSize = chunk.totalSize || uploadData.totalSize;
const progress = uploadData.totalSize > 0
? (uploadData.receivedSize / uploadData.totalSize) * 100
: 0;
console.log(`Upload ${uploadId}: ${progress.toFixed(2)}% complete`);
// Send progress update
call.write({
type: 'PROGRESS',
uploadId,
progress: progress,
receivedSize: uploadData.receivedSize,
totalSize: uploadData.totalSize,
timestamp: new Date().toISOString()
});
});
// Handle stream completion
call.on('end', () => {
const duration = Date.now() - uploadData.startTime;
const averageSpeed = uploadData.receivedSize / (duration / 1000); // bytes/sec
console.log(`Upload ${uploadId} completed. Duration: ${duration}ms, Speed: ${averageSpeed.toFixed(2)} bytes/sec`);
callback(null, {
success: true,
uploadId,
totalChunks: uploadData.chunks.length,
totalSize: uploadData.receivedSize,
duration: duration,
averageSpeed: averageSpeed
});
this.uploads.delete(uploadId);
});
// Handle stream errors
call.on('error', (error) => {
console.error(`Upload ${uploadId} failed:`, error.message);
this.uploads.delete(uploadId);
callback({
code: grpc.status.INTERNAL,
message: `Upload failed: ${error.message}`
});
});
}
getUploadStatus(uploadId) {
return this.uploads.get(uploadId);
}
}
// Bidirectional Streaming Example: Real-time Chat
class ChatService {
constructor(streamManager) {
this.streamManager = streamManager;
this.rooms = new Map();
this.users = new Map();
}
// Handle chat room (bidirectional streaming)
joinChatRoom(call) {
let currentRoom = null;
let currentUser = null;
const streamId = `chat-${Date.now()}`;
const managedCall = this.streamManager.createManagedStream(
call, streamId, 'bidirectional'
);
// Handle incoming messages
call.on('data', (message) => {
const timestamp = new Date().toISOString();
switch (message.type) {
case 'JOIN':
currentUser = {
id: message.userId,
name: message.userName || `User-${message.userId}`,
joinTime: timestamp
};
currentRoom = message.roomId || 'default';
// Join room
if (!this.rooms.has(currentRoom)) {
this.rooms.set(currentRoom, new Set());
}
this.rooms.get(currentRoom).add(managedCall);
this.users.set(currentUser.id, currentUser);
console.log(`User ${currentUser.name} joined room ${currentRoom}`);
// Send join confirmation
call.write({
type: 'SYSTEM',
roomId: currentRoom,
timestamp,
message: `Welcome to room ${currentRoom}, ${currentUser.name}!`,
userCount: this.rooms.get(currentRoom).size
});
// Notify other users
this.broadcastToRoom(currentRoom, {
type: 'SYSTEM',
roomId: currentRoom,
timestamp,
message: `${currentUser.name} joined the room`
}, managedCall);
break;
case 'MESSAGE':
if (currentRoom && currentUser) {
const chatMessage = {
id: `msg-${Date.now()}`,
type: 'MESSAGE',
roomId: currentRoom,
userId: currentUser.id,
userName: currentUser.name,
content: message.content,
timestamp
};
// Broadcast to all users in the room
this.broadcastToRoom(currentRoom, chatMessage);
}
break;
case 'LEAVE':
if (currentRoom && currentUser) {
this.leaveRoom(currentRoom, currentUser, managedCall);
}
break;
case 'TYPING':
if (currentRoom && currentUser) {
this.broadcastToRoom(currentRoom, {
type: 'TYPING',
roomId: currentRoom,
userId: currentUser.id,
userName: currentUser.name,
timestamp
}, managedCall);
}
break;
}
});
// Handle stream end
call.on('end', () => {
if (currentRoom && currentUser) {
this.leaveRoom(currentRoom, currentUser, managedCall);
}
});
call.on('error', (error) => {
console.error(`Chat stream error:`, error.message);
if (currentRoom && currentUser) {
this.leaveRoom(currentRoom, currentUser, managedCall);
}
});
}
// Broadcast message to room
broadcastToRoom(roomId, message, excludeCall = null) {
const room = this.rooms.get(roomId);
if (!room) return;
room.forEach(stream => {
if (stream !== excludeCall && stream.stream.isActive) {
try {
stream.stream.write(message);
} catch (error) {
console.error(`Failed to broadcast to room ${roomId}:`, error.message);
room.delete(stream);
}
}
});
}
// Leave room
leaveRoom(roomId, user, stream) {
const room = this.rooms.get(roomId);
if (room) {
room.delete(stream);
console.log(`User ${user.name} left room ${roomId}`);
// Notify other users
this.broadcastToRoom(roomId, {
type: 'SYSTEM',
roomId,
timestamp: new Date().toISOString(),
message: `${user.name} left the room`,
userCount: room.size
});
// Clean up empty rooms
if (room.size === 0) {
this.rooms.delete(roomId);
console.log(`Room ${roomId} deleted (empty)`);
}
}
this.users.delete(user.id);
}
getRoomStatus(roomId) {
const room = this.rooms.get(roomId);
return {
roomId,
userCount: room ? room.size : 0,
exists: !!room
};
}
getAllRooms() {
const rooms = {};
this.rooms.forEach((users, roomId) => {
rooms[roomId] = users.size;
});
return rooms;
}
}
// Streaming utility functions
class StreamingUtils {
// Apply backpressure to slow down streaming
static async applyBackpressure(stream, messages, delay = 100) {
for (const message of messages) {
if (stream.destroyed || stream.closed) {
break;
}
const ready = stream.write(message);
if (!ready) {
console.log('Backpressure applied, waiting for drain...');
await new Promise(resolve => {
stream.once('drain', resolve);
});
}
// Add delay between messages
if (delay > 0) {
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// Create a retry mechanism for streaming operations
static async retryStreamingOperation(operation, maxRetries = 3, delay = 1000) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
console.warn(`Streaming operation attempt ${attempt} failed:`, error.message);
if (attempt < maxRetries) {
await new Promise(resolve => setTimeout(resolve, delay * attempt));
}
}
}
throw lastError;
}
// Compress large messages
static compressMessage(message) {
const messageStr = JSON.stringify(message);
if (messageStr.length > 1024 * 1024) { // 1MB threshold
// In production, use proper compression like gzip
return {
...message,
compressed: true,
originalSize: messageStr.length
};
}
return message;
}
// Batch multiple messages
static batchMessages(messages, batchSize = 10) {
const batches = [];
for (let i = 0; i < messages.length; i += batchSize) {
batches.push(messages.slice(i, i + batchSize));
}
return batches;
}
}
module.exports = {
GrpcStreamManager,
NotificationService,
DataUploadService,
ChatService,
StreamingUtils
};
💻 Interceptores gRPC javascript
🔴 complex
Implementación de interceptores de autenticación, logging y monitoreo
// gRPC Interceptors Implementation
const grpc = require('@grpc/grpc-js');
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
// Base interceptor class
class BaseInterceptor {
constructor(options = {}) {
this.options = options;
}
// Interceptor method to be implemented by subclasses
intercept(call, next) {
throw new Error('intercept method must be implemented');
}
}
// Authentication Interceptor
class AuthInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.jwtSecret = options.jwtSecret || 'default-secret';
this.excludePaths = options.excludePaths || [];
this.apiKeyHeader = options.apiKeyHeader || 'x-api-key';
this.apiKeys = new Set(options.apiKeys || []);
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
// Skip authentication for excluded paths
if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
return next();
}
const metadata = call.metadata.getMap();
// Try JWT authentication
const authorization = metadata['authorization'];
if (authorization) {
const token = authorization[0].replace('Bearer ', '');
try {
const decoded = jwt.verify(token, this.jwtSecret);
call.call.user = {
id: decoded.userId,
email: decoded.email,
roles: decoded.roles || []
};
return next();
} catch (error) {
console.error('JWT verification failed:', error.message);
}
}
// Try API key authentication
const apiKey = metadata[this.apiKeyHeader];
if (apiKey && this.apiKeys.has(apiKey[0])) {
call.call.user = {
id: 'api-key-user',
roles: ['api']
};
return next();
}
// Authentication failed
throw {
code: grpc.status.UNAUTHENTICATED,
message: 'Authentication required'
};
}
}
// Logging Interceptor
class LoggingInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.logLevel = options.logLevel || 'info';
this.logBody = options.logBody || false;
this.excludePaths = options.excludePaths || [];
this.logger = options.logger || console;
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
const startTime = Date.now();
// Skip logging for excluded paths
if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
return next();
}
// Log request
const requestId = crypto.randomUUID();
const logData = {
requestId,
method: path,
type: 'request',
timestamp: new Date().toISOString(),
metadata: this.sanitizeMetadata(call.metadata.getMap())
};
if (this.logBody && call.request) {
logData.body = this.sanitizeBody(call.request);
}
this.logger.log(this.logLevel, `gRPC Request: ${JSON.stringify(logData)}`);
// Intercept response
const originalCallback = call.call.sendMetadata.bind(call.call);
call.call.sendMetadata = function(response, callback) {
const duration = Date.now() - startTime;
const responseLogData = {
requestId,
method: path,
type: 'response',
duration: duration,
timestamp: new Date().toISOString()
};
if (call.interceptorResponse) {
responseLogData.status = call.interceptorResponse.code || grpc.status.OK;
responseLogData.message = call.interceptorResponse.message || 'Success';
}
logger.log(logLevel, `gRPC Response: ${JSON.stringify(responseLogData)}`);
return originalCallback(response, callback);
};
// Handle streaming calls
if (call.call.write && call.call.read) {
this.interceptStreamingCall(call, requestId, path, startTime);
}
return next().catch(error => {
const duration = Date.now() - startTime;
const errorLogData = {
requestId,
method: path,
type: 'error',
duration: duration,
timestamp: new Date().toISOString(),
error: {
code: error.code,
message: error.message,
details: error.details
}
};
this.logger.error(`gRPC Error: ${JSON.stringify(errorLogData)}`);
throw error;
});
}
interceptStreamingCall(call, requestId, path, startTime) {
const originalWrite = call.call.write.bind(call.call);
const originalRead = call.call.read.bind(call.call);
let messagesWritten = 0;
let messagesRead = 0;
call.call.write = function(message, callback) {
messagesWritten++;
if (this.logBody) {
this.logger.log(this.logLevel, `gRPC Stream Write [${requestId}]: ${JSON.stringify({
requestId,
method: path,
messageType: 'write',
messageCount: messagesWritten,
body: this.sanitizeBody(message)
})}`);
}
return originalWrite(message, callback);
}.bind(this);
call.call.read = function(callback) {
return originalRead((error, response) => {
if (response) {
messagesRead++;
if (this.logBody) {
this.logger.log(this.logLevel, `gRPC Stream Read [${requestId}]: ${JSON.stringify({
requestId,
method: path,
messageType: 'read',
messageCount: messagesRead,
body: this.sanitizeBody(response)
})}`);
}
}
callback(error, response);
});
}.bind(this);
}
sanitizeMetadata(metadata) {
const sanitized = {};
Object.keys(metadata).forEach(key => {
if (key.toLowerCase().includes('authorization') ||
key.toLowerCase().includes('token') ||
key.toLowerCase().includes('password') ||
key.toLowerCase().includes('secret')) {
sanitized[key] = '[REDACTED]';
} else {
sanitized[key] = metadata[key];
}
});
return sanitized;
}
sanitizeBody(body) {
if (!body || typeof body !== 'object') {
return body;
}
const sanitized = JSON.parse(JSON.stringify(body));
// Remove sensitive fields
const sensitiveKeys = ['password', 'token', 'secret', 'key', 'creditCard'];
const removeSensitiveKeys = (obj) => {
if (typeof obj !== 'object' || obj === null) {
return;
}
Object.keys(obj).forEach(key => {
if (sensitiveKeys.some(sensitive => key.toLowerCase().includes(sensitive.toLowerCase()))) {
obj[key] = '[REDACTED]';
} else if (typeof obj[key] === 'object') {
removeSensitiveKeys(obj[key]);
}
});
};
removeSensitiveKeys(sanitized);
return sanitized;
}
}
// Rate Limiting Interceptor
class RateLimitInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.windowMs = options.windowMs || 60000; // 1 minute
this.maxRequests = options.maxRequests || 100;
this.requests = new Map();
this.excludePaths = options.excludePaths || [];
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
// Skip rate limiting for excluded paths
if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
return next();
}
const clientId = this.getClientId(call);
const now = Date.now();
const windowStart = now - this.windowMs;
// Clean old requests
if (!this.requests.has(clientId)) {
this.requests.set(clientId, []);
}
const clientRequests = this.requests.get(clientId);
// Remove requests outside the current window
const validRequests = clientRequests.filter(timestamp => timestamp > windowStart);
this.requests.set(clientId, validRequests);
// Check if rate limit exceeded
if (validRequests.length >= this.maxRequests) {
const headers = {
'X-RateLimit-Limit': this.maxRequests.toString(),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': new Date(now + this.windowMs).toISOString()
};
throw {
code: grpc.status.RESOURCE_EXHAUSTED,
message: `Rate limit exceeded. Maximum ${this.maxRequests} requests per ${this.windowMs}ms allowed.`,
metadata: headers
};
}
// Add current request timestamp
validRequests.push(now);
const remainingRequests = this.maxRequests - validRequests.length;
const headers = {
'X-RateLimit-Limit': this.maxRequests.toString(),
'X-RateLimit-Remaining': remainingRequests.toString(),
'X-RateLimit-Reset': new Date(now + this.windowMs).toISOString()
};
// Add headers to response
const originalCallback = call.call.sendMetadata.bind(call.call);
call.call.sendMetadata = function(response, callback) {
// Merge rate limit headers into response metadata
if (response) {
Object.keys(headers).forEach(key => {
response[key] = [headers[key]];
});
}
return originalCallback(response, callback);
};
return next();
}
getClientId(call) {
// Try to get client ID from user
if (call.call.user && call.call.user.id) {
return `user:${call.call.user.id}`;
}
// Try to get client ID from IP
const metadata = call.metadata.getMap();
const xForwardedFor = metadata['x-forwarded-for'];
const xRealIp = metadata['x-real-ip'];
const ip = (xForwardedFor && xForwardedFor[0]) ||
(xRealIp && xRealIp[0]) ||
'unknown';
return `ip:${ip}`;
}
}
// Metrics Interceptor
class MetricsInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.metrics = {
totalRequests: 0,
totalErrors: 0,
methodMetrics: new Map(),
responseTime: []
};
this.startTime = Date.now();
this.reportInterval = options.reportInterval || 60000;
if (options.autoReport !== false) {
this.startReporting();
}
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
const startTime = Date.now();
this.metrics.totalRequests++;
// Track method-specific metrics
if (!this.metrics.methodMetrics.has(path)) {
this.metrics.methodMetrics.set(path, {
count: 0,
errors: 0,
totalResponseTime: 0
});
}
const methodMetrics = this.metrics.methodMetrics.get(path);
methodMetrics.count++;
return next().then(response => {
const duration = Date.now() - startTime;
// Update metrics
methodMetrics.totalResponseTime += duration;
this.metrics.responseTime.push(duration);
// Keep only last 1000 response times
if (this.metrics.responseTime.length > 1000) {
this.metrics.responseTime = this.metrics.responseTime.slice(-1000);
}
return response;
}).catch(error => {
const duration = Date.now() - startTime;
// Update error metrics
this.metrics.totalErrors++;
methodMetrics.errors++;
methodMetrics.totalResponseTime += duration;
throw error;
});
}
startReporting() {
setInterval(() => {
const report = this.generateReport();
console.log('gRPC Metrics Report:', JSON.stringify(report, null, 2));
}, this.reportInterval);
}
generateReport() {
const now = Date.now();
const uptime = now - this.startTime;
// Calculate average response time
const avgResponseTime = this.metrics.responseTime.length > 0
? this.metrics.responseTime.reduce((a, b) => a + b, 0) / this.metrics.responseTime.length
: 0;
// Calculate percentiles
const sortedResponseTimes = [...this.metrics.responseTime].sort((a, b) => a - b);
const p50 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.5)] || 0;
const p95 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.95)] || 0;
const p99 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.99)] || 0;
// Calculate method statistics
const methodStats = {};
this.metrics.methodMetrics.forEach((metrics, method) => {
methodStats[method] = {
...metrics,
avgResponseTime: metrics.count > 0 ? metrics.totalResponseTime / metrics.count : 0,
errorRate: metrics.count > 0 ? (metrics.errors / metrics.count) * 100 : 0
};
});
return {
uptime: uptime,
totalRequests: this.metrics.totalRequests,
totalErrors: this.metrics.totalErrors,
errorRate: this.metrics.totalRequests > 0
? (this.metrics.totalErrors / this.metrics.totalRequests) * 100
: 0,
avgResponseTime: Math.round(avgResponseTime),
p50ResponseTime: p50,
p95ResponseTime: p95,
p99ResponseTime: p99,
methodStats: methodStats,
timestamp: new Date().toISOString()
};
}
getMetrics() {
return this.generateReport();
}
}
// Circuit Breaker Interceptor
class CircuitBreakerInterceptor extends BaseInterceptor {
constructor(options = {}) {
super(options);
this.failureThreshold = options.failureThreshold || 5;
this.recoveryTimeout = options.recoveryTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.excludePaths = options.excludePaths || [];
this.circuitStates = new Map();
}
intercept(call, next) {
const method = call.getMethod();
const path = method.path;
// Skip circuit breaking for excluded paths
if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
return next();
}
const circuitState = this.getCircuitState(path);
const now = Date.now();
// Check if circuit is open
if (circuitState.state === 'OPEN') {
if (now - circuitState.lastFailureTime > this.recoveryTimeout) {
// Try to close circuit (half-open state)
circuitState.state = 'HALF_OPEN';
circuitState.requestCount = 0;
} else {
// Circuit is still open
throw {
code: grpc.status.UNAVAILABLE,
message: 'Service temporarily unavailable (circuit breaker open)'
};
}
}
// Execute request and update circuit state
return next().then(response => {
// Success - reset circuit
this.recordSuccess(path);
return response;
}).catch(error => {
// Failure - update circuit state
this.recordFailure(path);
throw error;
});
}
getCircuitState(path) {
if (!this.circuitStates.has(path)) {
this.circuitStates.set(path, {
state: 'CLOSED', // CLOSED, OPEN, HALF_OPEN
failureCount: 0,
requestCount: 0,
lastFailureTime: 0
});
}
return this.circuitStates.get(path);
}
recordSuccess(path) {
const circuitState = this.getCircuitState(path);
if (circuitState.state === 'HALF_OPEN') {
// Close circuit after successful request in half-open state
circuitState.state = 'CLOSED';
circuitState.failureCount = 0;
} else {
// Reset failure count in closed state
circuitState.failureCount = 0;
}
}
recordFailure(path) {
const circuitState = this.getCircuitState(path);
const now = Date.now();
circuitState.failureCount++;
circuitState.lastFailureTime = now;
if (circuitState.state === 'HALF_OPEN') {
// Open circuit again if failure in half-open state
circuitState.state = 'OPEN';
} else if (circuitState.failureCount >= this.failureThreshold) {
// Open circuit if failure threshold exceeded
circuitState.state = 'OPEN';
}
}
getCircuitStates() {
const states = {};
this.circuitStates.forEach((state, path) => {
states[path] = { ...state };
});
return states;
}
}
// Interceptor factory
class InterceptorFactory {
static createAuth(options) {
return new AuthInterceptor(options);
}
static createLogging(options) {
return new LoggingInterceptor(options);
}
static createRateLimit(options) {
return new RateLimitInterceptor(options);
}
static createMetrics(options) {
return new MetricsInterceptor(options);
}
static createCircuitBreaker(options) {
return new CircuitBreakerInterceptor(options);
}
// Create chained interceptor
static chain(...interceptors) {
return {
intercept: (call, next) => {
let pipeline = next;
// Apply interceptors in reverse order (last interceptor runs first)
for (let i = interceptors.length - 1; i >= 0; i--) {
const interceptor = interceptors[i];
const currentPipeline = pipeline;
pipeline = () => interceptor.intercept(call, currentPipeline);
}
return pipeline();
}
};
}
}
module.exports = {
BaseInterceptor,
AuthInterceptor,
LoggingInterceptor,
RateLimitInterceptor,
MetricsInterceptor,
CircuitBreakerInterceptor,
InterceptorFactory
};
💻 Manejo de Errores gRPC javascript
🔴 complex
Manejo de estados de error y excepciones gRPC
// gRPC Error Handling Implementation
const grpc = require('@grpc/grpc-js');
// Custom gRPC error class
class GrpcError extends Error {
constructor(code, message, details = null, metadata = {}) {
super(message);
this.name = 'GrpcError';
this.code = code;
this.details = details;
this.metadata = metadata;
this.timestamp = new Date().toISOString();
this.requestId = this.generateRequestId();
}
generateRequestId() {
return 'req-' + Math.random().toString(36).substr(2, 9);
}
toGrpcError() {
return {
code: this.code,
message: this.message,
details: this.details || this.message,
metadata: this.metadata
};
}
}
// Validation error
class ValidationError extends GrpcError {
constructor(field, value, constraint) {
super(
grpc.status.INVALID_ARGUMENT,
`Validation failed for field ${field}`,
`Field '${field}' with value '${value}' violates constraint: ${constraint}`,
{
field: field,
value: String(value),
constraint: constraint,
errorType: 'VALIDATION_ERROR'
}
);
this.field = field;
this.value = value;
this.constraint = constraint;
}
}
// Business logic error
class BusinessError extends GrpcError {
constructor(message, businessCode, details = null) {
super(
grpc.status.FAILED_PRECONDITION,
message,
details,
{
businessCode: businessCode,
errorType: 'BUSINESS_ERROR'
}
);
this.businessCode = businessCode;
}
}
// Resource not found error
class NotFoundError extends GrpcError {
constructor(resourceType, resourceId) {
super(
grpc.status.NOT_FOUND,
`${resourceType} not found`,
`${resourceType} with ID ${resourceId} does not exist`,
{
resourceType: resourceType,
resourceId: String(resourceId),
errorType: 'NOT_FOUND_ERROR'
}
);
this.resourceType = resourceType;
this.resourceId = resourceId;
}
}
// Permission denied error
class PermissionError extends GrpcError {
constructor(action, resource, userId) {
super(
grpc.status.PERMISSION_DENIED,
`Permission denied for ${action}`,
`User ${userId} does not have permission to perform ${action} on ${resource}`,
{
action: action,
resource: resource,
userId: String(userId),
errorType: 'PERMISSION_ERROR'
}
);
this.action = action;
this.resource = resource;
this.userId = userId;
}
}
// Resource conflict error
class ConflictError extends GrpcError {
constructor(resourceType, conflictReason) {
super(
grpc.status.ALREADY_EXISTS,
`Resource conflict`,
`${resourceType} conflict: ${conflictReason}`,
{
resourceType: resourceType,
conflictReason: conflictReason,
errorType: 'CONFLICT_ERROR'
}
);
this.resourceType = resourceType;
this.conflictReason = conflictReason;
}
}
// Rate limit error
class RateLimitError extends GrpcError {
constructor(limit, windowMs, retryAfter) {
super(
grpc.status.RESOURCE_EXHAUSTED,
'Rate limit exceeded',
`Rate limit of ${limit} requests per ${windowMs}ms exceeded`,
{
limit: String(limit),
windowMs: String(windowMs),
retryAfter: String(retryAfter),
errorType: 'RATE_LIMIT_ERROR'
}
);
this.limit = limit;
this.windowMs = windowMs;
this.retryAfter = retryAfter;
}
}
// Service unavailable error
class ServiceUnavailableError extends GrpcError {
constructor(serviceName, retryAfter = null) {
super(
grpc.status.UNAVAILABLE,
`Service ${serviceName} is currently unavailable`,
`The ${serviceName} service is temporarily unavailable. Please try again later.`,
{
serviceName: serviceName,
errorType: 'SERVICE_UNAVAILABLE_ERROR'
}
);
this.serviceName = serviceName;
if (retryAfter) {
this.metadata.retryAfter = String(retryAfter);
}
}
}
// Timeout error
class TimeoutError extends GrpcError {
constructor(operation, timeoutMs) {
super(
grpc.status.DEADLINE_EXCEEDED,
`Operation ${operation} timed out`,
`Operation ${operation} did not complete within ${timeoutMs}ms`,
{
operation: operation,
timeoutMs: String(timeoutMs),
errorType: 'TIMEOUT_ERROR'
}
);
this.operation = operation;
this.timeoutMs = timeoutMs;
}
}
// Error handler class
class ErrorHandler {
constructor(options = {}) {
this.includeStackTrace = options.includeStackTrace || false;
this.logErrors = options.logErrors || true;
this.errorLogger = options.errorLogger || console.error;
this.customHandlers = new Map();
}
// Register custom error handler
registerHandler(errorType, handler) {
this.customHandlers.set(errorType, handler);
}
// Handle and convert errors
handleError(error, context = {}) {
const enhancedError = this.enhanceError(error, context);
if (this.logErrors) {
this.logError(enhancedError, context);
}
// Try custom handlers first
if (enhancedError.metadata && enhancedError.metadata.errorType) {
const customHandler = this.customHandlers.get(enhancedError.metadata.errorType);
if (customHandler) {
return customHandler(enhancedError, context);
}
}
// Default handling
return enhancedError.toGrpcError();
}
enhanceError(error, context) {
// If it's already a GrpcError, just enhance context
if (error instanceof GrpcError) {
error.metadata = {
...error.metadata,
context: context
};
return error;
}
// Convert common JavaScript errors to gRPC errors
if (error.name === 'ValidationError' || error.message.includes('validation')) {
return new GrpcError(
grpc.status.INVALID_ARGUMENT,
'Validation failed',
error.message,
{ originalError: error.name, context }
);
}
if (error.name === 'CastError' || error.message.includes('Cast to')) {
return new GrpcError(
grpc.status.INVALID_ARGUMENT,
'Invalid input format',
error.message,
{ originalError: error.name, context }
);
}
if (error.name === 'MongoError' && error.code === 11000) {
return new GrpcError(
grpc.status.ALREADY_EXISTS,
'Resource already exists',
'Duplicate key violation',
{ originalError: error.name, mongoCode: error.code, context }
);
}
if (error.name === 'TokenExpiredError') {
return new GrpcError(
grpc.status.UNAUTHENTICATED,
'Token expired',
'Authentication token has expired',
{ originalError: error.name, context }
);
}
if (error.name === 'JsonWebTokenError') {
return new GrpcError(
grpc.status.UNAUTHENTICATED,
'Invalid token',
'Authentication token is invalid',
{ originalError: error.name, context }
);
}
// Database connection errors
if (error.message.includes('ECONNREFUSED') || error.message.includes('connection')) {
return new ServiceUnavailableError('Database');
}
// Timeout errors
if (error.message.includes('timeout') || error.code === 'TIMEOUT') {
return new TimeoutError(context.operation || 'unknown', context.timeout || 30000);
}
// Default internal error
return new GrpcError(
grpc.status.INTERNAL,
'Internal server error',
this.includeStackTrace ? error.stack : error.message,
{
originalError: error.name,
originalMessage: error.message,
context,
stackTrace: this.includeStackTrace ? error.stack : undefined
}
);
}
logError(error, context) {
const logData = {
requestId: error.requestId,
timestamp: error.timestamp,
code: error.code,
message: error.message,
details: error.details,
metadata: error.metadata,
context: context
};
if (this.includeStackTrace && error.stack) {
logData.stackTrace = error.stack;
}
this.errorLogger('gRPC Error:', JSON.stringify(logData, null, 2));
}
}
// Retry mechanism for failed operations
class RetryHandler {
constructor(options = {}) {
this.maxRetries = options.maxRetries || 3;
this.baseDelay = options.baseDelay || 1000;
this.maxDelay = options.maxDelay || 30000;
this.backoffMultiplier = options.backoffMultiplier || 2;
this.retryableErrors = options.retryableErrors || new Set([
grpc.status.UNAVAILABLE,
grpc.status.DEADLINE_EXCEEDED,
grpc.status.RESOURCE_EXHAUSTED
]);
}
async executeWithRetry(operation, context = {}) {
let lastError;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
// Check if error is retryable
if (!this.isRetryableError(error) || attempt === this.maxRetries) {
throw error;
}
// Calculate delay for next attempt
const delay = this.calculateDelay(attempt);
console.warn(`Attempt ${attempt} failed, retrying in ${delay}ms:`, error.message);
// Wait before retry
await this.sleep(delay);
}
}
throw lastError;
}
isRetryableError(error) {
const errorCode = error.code || error.status;
return this.retryableErrors.has(errorCode);
}
calculateDelay(attempt) {
const delay = this.baseDelay * Math.pow(this.backoffMultiplier, attempt - 1);
const jitter = delay * 0.1 * Math.random(); // Add 10% jitter
return Math.min(delay + jitter, this.maxDelay);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Circuit breaker pattern for error handling
class CircuitBreaker {
constructor(options = {}) {
this.failureThreshold = options.failureThreshold || 5;
this.recoveryTimeout = options.recoveryTimeout || 60000;
this.monitoringPeriod = options.monitoringPeriod || 10000;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.failureCount = 0;
this.successCount = 0;
this.lastFailureTime = 0;
this.requestCount = 0;
}
async execute(operation) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
this.state = 'HALF_OPEN';
this.successCount = 0;
} else {
throw new ServiceUnavailableError('Circuit breaker is open', Math.ceil((this.recoveryTimeout - (Date.now() - this.lastFailureTime)) / 1000));
}
}
try {
const result = await operation();
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
recordSuccess() {
this.failureCount = 0;
if (this.state === 'HALF_OPEN') {
this.successCount++;
if (this.successCount >= 3) { // Require 3 consecutive successes to close circuit
this.state = 'CLOSED';
console.log('Circuit breaker closed after successful recovery');
}
}
}
recordFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
console.log(`Circuit breaker opened after ${this.failureCount} failures`);
}
}
getState() {
return {
state: this.state,
failureCount: this.failureCount,
successCount: this.successCount,
lastFailureTime: this.lastFailureTime
};
}
}
// Error recovery strategies
class ErrorRecovery {
constructor() {
this.recoveryStrategies = new Map();
this.setupDefaultStrategies();
}
setupDefaultStrategies() {
// Validation error recovery - provide helpful suggestions
this.recoveryStrategies.set('VALIDATION_ERROR', (error, context) => {
return {
handled: true,
suggestion: `Please check the ${error.field} field. Expected: ${error.constraint}`,
correctedValue: this.suggestCorrection(error.value, error.constraint)
};
});
// Rate limit error recovery - suggest retry with backoff
this.recoveryStrategies.set('RATE_LIMIT_ERROR', (error, context) => {
return {
handled: true,
suggestion: `Rate limit exceeded. Retry after ${error.retryAfter} seconds`,
retryAfter: parseInt(error.retryAfter),
backoffStrategy: 'exponential'
};
});
// Service unavailable recovery - suggest alternative endpoints
this.recoveryStrategies.set('SERVICE_UNAVAILABLE_ERROR', (error, context) => {
return {
handled: true,
suggestion: `Service ${error.serviceName} is unavailable. Please try again later`,
alternativeEndpoints: this.getAlternativeEndpoints(error.serviceName),
fallbackAvailable: true
};
});
}
suggestCorrection(value, constraint) {
// Simple correction suggestions
if (constraint.includes('email') && !value.includes('@')) {
return `${value}@example.com`;
}
if (constraint.includes('positive') && isNaN(value)) {
return '0';
}
return null;
}
getAlternativeEndpoints(serviceName) {
const alternatives = {
'user-service': ['user-service-backup', 'user-service-v2'],
'order-service': ['order-service-cluster', 'legacy-order-service'],
'payment-service': ['payment-service-primary', 'payment-service-secondary']
};
return alternatives[serviceName] || [];
}
registerStrategy(errorType, strategy) {
this.recoveryStrategies.set(errorType, strategy);
}
attemptRecovery(error, context) {
if (error.metadata && error.metadata.errorType) {
const strategy = this.recoveryStrategies.get(error.metadata.errorType);
if (strategy) {
return strategy(error, context);
}
}
return {
handled: false,
suggestion: 'No recovery strategy available for this error type'
};
}
}
// Utility functions for error handling
const ErrorUtils = {
// Convert gRPC status code to string
getStatusString: (code) => {
const statusStrings = {
[grpc.status.OK]: 'OK',
[grpc.status.CANCELLED]: 'CANCELLED',
[grpc.status.UNKNOWN]: 'UNKNOWN',
[grpc.status.INVALID_ARGUMENT]: 'INVALID_ARGUMENT',
[grpc.status.DEADLINE_EXCEEDED]: 'DEADLINE_EXCEEDED',
[grpc.status.NOT_FOUND]: 'NOT_FOUND',
[grpc.status.ALREADY_EXISTS]: 'ALREADY_EXISTS',
[grpc.status.PERMISSION_DENIED]: 'PERMISSION_DENIED',
[grpc.status.UNAUTHENTICATED]: 'UNAUTHENTICATED',
[grpc.status.RESOURCE_EXHAUSTED]: 'RESOURCE_EXHAUSTED',
[grpc.status.FAILED_PRECONDITION]: 'FAILED_PRECONDITION',
[grpc.status.ABORTED]: 'ABORTED',
[grpc.status.OUT_OF_RANGE]: 'OUT_OF_RANGE',
[grpc.status.UNIMPLEMENTED]: 'UNIMPLEMENTED',
[grpc.status.INTERNAL]: 'INTERNAL',
[grpc.status.UNAVAILABLE]: 'UNAVAILABLE',
[grpc.status.DATA_LOSS]: 'DATA_LOSS'
};
return statusStrings[code] || `UNKNOWN_STATUS_${code}`;
},
// Check if error is retryable
isRetryableError: (error) => {
const retryableCodes = new Set([
grpc.status.UNAVAILABLE,
grpc.status.DEADLINE_EXCEEDED,
grpc.status.RESOURCE_EXHAUSTED,
grpc.status.ABORTED
]);
return retryableCodes.has(error.code || error.status);
},
// Extract error information for logging
extractErrorInfo: (error, context = {}) => {
return {
code: error.code || error.status,
message: error.message,
details: error.details,
requestId: error.requestId,
timestamp: error.timestamp || new Date().toISOString(),
context: context,
metadata: error.metadata || {},
stackTrace: error.stack
};
},
// Create error response for client
createErrorResponse: (error, includeDetails = false) => {
const response = {
success: false,
error: {
code: error.code,
message: error.message
}
};
if (includeDetails) {
response.error.details = error.details;
response.error.metadata = error.metadata;
response.error.requestId = error.requestId;
response.error.timestamp = error.timestamp;
}
return response;
}
};
module.exports = {
GrpcError,
ValidationError,
BusinessError,
NotFoundError,
PermissionError,
ConflictError,
RateLimitError,
ServiceUnavailableError,
TimeoutError,
ErrorHandler,
RetryHandler,
CircuitBreaker,
ErrorRecovery,
ErrorUtils
};