gRPC Samples

gRPC (Google Remote Procedure Call) communication protocol examples with Protocol Buffers and service definitions

Key Facts

Category
Communication Protocols
Items
8
Format Families
audio

Sample Overview

gRPC (Google Remote Procedure Call) communication protocol examples with Protocol Buffers and service definitions This sample set belongs to Communication Protocols and can be used to test related workflows inside Elysia Tools.

💻 Protocol Buffers Definition protobuf

🟢 simple

Basic Protocol Buffers service definitions and message structures

// Protocol Buffers v3 syntax
syntax = "proto3";

// Package declaration for namespace
package userservice;

// Import statement for common types
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// Option for Go specific settings
option go_package = "github.com/example/userservice";

// Simple message definition
message User {
  string id = 1;                    // Unique identifier
  string name = 2;                  // User name
  string email = 3;                 // User email
  int32 age = 4;                    // User age
  bool active = 5;                  // Account status
  google.protobuf.Timestamp created_at = 6;  // Creation timestamp
  repeated string roles = 7;        // User roles list
  map<string, string> metadata = 8; // Additional metadata
  UserStatus status = 9;            // User lifecycle status
}

// Request and response messages
message CreateUserRequest {
  string name = 1;
  string email = 2;
  int32 age = 3;
  repeated string roles = 4;
}

message CreateUserResponse {
  User user = 1;
  string message = 2;
}

message GetUserRequest {
  string user_id = 1;
}

message GetUserResponse {
  User user = 1;
}

message ListUsersRequest {
  int32 page = 1;
  int32 limit = 2;
  string filter = 3;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total = 2;
}

message UpdateUserRequest {
  string user_id = 1;
  User user = 2;
}

message UpdateUserResponse {
  User user = 1;
  string message = 2;
}

message DeleteUserRequest {
  string user_id = 1;
}

message DeleteUserResponse {
  bool success = 1;
  string message = 2;
}

// Enum definition
enum UserStatus {
  USER_STATUS_UNKNOWN = 0;
  USER_STATUS_ACTIVE = 1;
  USER_STATUS_INACTIVE = 2;
  USER_STATUS_SUSPENDED = 3;
}

// Service definition
service UserService {
  // Unary RPC
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
  rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse);

  // Server streaming RPC
  rpc ListUsers(ListUsersRequest) returns (stream ListUsersResponse);

  // Client streaming RPC
  rpc BatchCreateUsers(stream CreateUserRequest) returns (CreateUserResponse);

  // Bidirectional streaming RPC
  rpc StreamUsers(stream GetUserRequest) returns (stream GetUserResponse);
}

// Health check service
service HealthCheck {
  rpc Check(google.protobuf.Empty) returns (HealthCheckResponse);
  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

message HealthCheckRequest {
  string service = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
    SERVICE_UNKNOWN = 3;
  }
  ServingStatus status = 1;
}

// Authentication service
service AuthService {
  rpc Login(LoginRequest) returns (LoginResponse);
  rpc ValidateToken(TokenRequest) returns (TokenResponse);
  rpc RefreshToken(TokenRequest) returns (TokenResponse);
}

message LoginRequest {
  string username = 1;
  string password = 2;
}

message LoginResponse {
  string access_token = 1;
  string refresh_token = 2;
  int64 expires_in = 3;
}

message TokenRequest {
  string token = 1;
}

message TokenResponse {
  bool valid = 1;
  string user_id = 2;
  repeated string roles = 3;
}

💻 gRPC Server Implementation javascript

🟡 intermediate

Node.js implementation of a gRPC server with service handlers

// gRPC Server Implementation in Node.js
const grpc = require('@grpc/grpc-js');
const fs = require('fs');
const path = require('path');

// Load Protocol Buffer definitions
const PROTO_PATH = path.join(__dirname, 'user.proto');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});

const userProto = grpc.loadPackageDefinition(packageDefinition).userservice;

// In-memory user database
const usersDatabase = new Map();
let userIdCounter = 1;

// Service implementation handlers
class UserServiceImpl {
  // Unary RPC: Create User
  createUser(call, callback) {
    const { name, email, age, roles } = call.request;

    // Validate input
    if (!name || !email) {
      return callback({
        code: grpc.status.INVALID_ARGUMENT,
        message: 'Name and email are required'
      });
    }

    // Create new user
    const user = {
      id: (userIdCounter++).toString(),
      name,
      email,
      age: age || 0,
      active: true,
      created_at: new Date().toISOString(),
      roles: roles || ['user'],
      metadata: {}
    };

    usersDatabase.set(user.id, user);

    callback(null, {
      user,
      message: 'User created successfully'
    });
  }

  // Unary RPC: Get User
  getUser(call, callback) {
    const { user_id } = call.request;
    const user = usersDatabase.get(user_id);

    if (!user) {
      return callback({
        code: grpc.status.NOT_FOUND,
        message: 'User not found'
      });
    }

    callback(null, { user });
  }

  // Unary RPC: Update User
  updateUser(call, callback) {
    const { user_id, user: updatedData } = call.request;
    const existingUser = usersDatabase.get(user_id);

    if (!existingUser) {
      return callback({
        code: grpc.status.NOT_FOUND,
        message: 'User not found'
      });
    }

    // Update user data
    const updatedUser = {
      ...existingUser,
      ...updatedData,
      id: existingUser.id // Preserve ID
    };

    usersDatabase.set(user_id, updatedUser);

    callback(null, {
      user: updatedUser,
      message: 'User updated successfully'
    });
  }

  // Unary RPC: Delete User
  deleteUser(call, callback) {
    const { user_id } = call.request;
    const user = usersDatabase.get(user_id);

    if (!user) {
      return callback({
        code: grpc.status.NOT_FOUND,
        message: 'User not found'
      });
    }

    usersDatabase.delete(user_id);

    callback(null, {
      success: true,
      message: 'User deleted successfully'
    });
  }

  // Server Streaming RPC: List Users
  listUsers(call) {
    const { page = 1, limit = 10, filter } = call.request;
    const users = Array.from(usersDatabase.values());

    // Apply filter if provided
    let filteredUsers = users;
    if (filter) {
      filteredUsers = users.filter(user =>
        user.name.toLowerCase().includes(filter.toLowerCase()) ||
        user.email.toLowerCase().includes(filter.toLowerCase())
      );
    }

    // Pagination
    const startIndex = (page - 1) * limit;
    const endIndex = startIndex + limit;
    const paginatedUsers = filteredUsers.slice(startIndex, endIndex);

    // Send response
    call.write({
      users: paginatedUsers,
      total: filteredUsers.length
    });

    call.end();
  }

  // Client Streaming RPC: Batch Create Users
  async batchCreateUsers(call, callback) {
    const createdUsers = [];

    call.on('data', (request) => {
      try {
        const { name, email, age, roles } = request;

        if (!name || !email) {
          return call.emit('error', {
            code: grpc.status.INVALID_ARGUMENT,
            message: 'Name and email are required'
          });
        }

        const user = {
          id: (userIdCounter++).toString(),
          name,
          email,
          age: age || 0,
          active: true,
          created_at: new Date().toISOString(),
          roles: roles || ['user'],
          metadata: {}
        };

        usersDatabase.set(user.id, user);
        createdUsers.push(user);
      } catch (error) {
        call.emit('error', {
          code: grpc.status.INTERNAL,
          message: error.message
        });
      }
    });

    call.on('end', () => {
      callback(null, {
        user: createdUsers[createdUsers.length - 1] || null,
        message: `Batch created ${createdUsers.length} users successfully`
      });
    });
  }

  // Bidirectional Streaming RPC: Stream Users
  streamUsers(call) {
    call.on('data', (request) => {
      const { user_id } = request;
      const user = usersDatabase.get(user_id);

      if (user) {
        call.write({ user });
      } else {
        call.emit('error', {
          code: grpc.status.NOT_FOUND,
          message: `User ${user_id} not found`
        });
      }
    });

    call.on('end', () => {
      call.end();
    });
  }
}

// Authentication service implementation
class AuthServiceImpl {
  login(call, callback) {
    const { username, password } = call.request;

    // Simple authentication logic (in production, use proper hashing and database)
    if (username === 'admin' && password === 'password') {
      const accessToken = 'mock-access-token-' + Date.now();
      const refreshToken = 'mock-refresh-token-' + Date.now();

      callback(null, {
        access_token: accessToken,
        refresh_token: refreshToken,
        expires_in: 3600
      });
    } else {
      callback({
        code: grpc.status.UNAUTHENTICATED,
        message: 'Invalid credentials'
      });
    }
  }

  validateToken(call, callback) {
    const { token } = call.request;

    // Simple token validation (in production, use proper JWT validation)
    if (token && token.startsWith('mock-access-token-')) {
      callback(null, {
        valid: true,
        user_id: 'admin',
        roles: ['admin', 'user']
      });
    } else {
      callback({
        code: grpc.status.UNAUTHENTICATED,
        message: 'Invalid token'
      });
    }
  }

  refreshToken(call, callback) {
    const { token } = call.request;

    if (token && token.startsWith('mock-refresh-token-')) {
      const newAccessToken = 'mock-access-token-' + Date.now();
      callback(null, {
        valid: true,
        user_id: 'admin',
        roles: ['admin', 'user']
      });
    } else {
      callback({
        code: grpc.status.UNAUTHENTICATED,
        message: 'Invalid refresh token'
      });
    }
  }
}

// Health check service implementation
class HealthCheckImpl {
  check(call, callback) {
    callback(null, {
      status: 1 // SERVING
    });
  }

  watch(call) {
    const interval = setInterval(() => {
      call.write({
        status: 1 // SERVING
      });
    }, 1000);

    call.on('end', () => {
      clearInterval(interval);
      call.end();
    });
  }
}

// Create gRPC server
function createServer() {
  const server = new grpc.Server();

  // Add services
  server.addService(userProto.UserService.service, new UserServiceImpl());
  server.addService(userProto.AuthService.service, new AuthServiceImpl());
  server.addService(userProto.HealthCheck.service, new HealthCheckImpl());

  return server;
}

// Server configuration
const server = createServer();
const PORT = process.env.GRPC_PORT || 50051;
const HOST = process.env.GRPC_HOST || '0.0.0.0';

// SSL/TLS configuration (optional)
const credentials = grpc.ServerCredentials.createInsecure();
// For SSL:
// const credentials = grpc.ServerCredentials.createSsl(
//   fs.readFileSync(path.join(__dirname, 'ca.crt')),
//   [{
//     cert_chain: fs.readFileSync(path.join(__dirname, 'server.crt')),
//     private_key: fs.readFileSync(path.join(__dirname, 'server.key'))
//   }],
//   false
// );

// Start server
server.bindAsync(`${HOST}:${PORT}`, credentials, (err, port) => {
  if (err) {
    console.error('Failed to start server:', err);
    return;
  }

  console.log(`gRPC server listening on ${HOST}:${PORT}`);
  server.start();
});

// Graceful shutdown
process.on('SIGINT', () => {
  console.log('Shutting down gRPC server...');
  server.tryShutdown((err) => {
    if (err) {
      console.error('Error during shutdown:', err);
      process.exit(1);
    }
    console.log('Server stopped successfully');
    process.exit(0);
  });
});

// Force shutdown after timeout
setTimeout(() => {
  console.log('Forcing server shutdown...');
  server.forceShutdown();
  process.exit(0);
}, 10000);

module.exports = { createServer, UserServiceImpl, AuthServiceImpl, HealthCheckImpl };

💻 gRPC Client Implementation javascript

🟡 intermediate

Node.js implementation of a gRPC client with service calls

// gRPC Client Implementation in Node.js
const grpc = require('@grpc/grpc-js');
const path = require('path');

// Load Protocol Buffer definitions
const PROTO_PATH = path.join(__dirname, 'user.proto');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});

const userProto = grpc.loadPackageDefinition(packageDefinition).userservice;

// Client configuration
const GRPC_HOST = process.env.GRPC_HOST || 'localhost';
const GRPC_PORT = process.env.GRPC_PORT || 50051;
const SERVER_ADDRESS = `${GRPC_HOST}:${GRPC_PORT}`;

// SSL/TLS configuration (optional)
const credentials = grpc.credentials.createInsecure();
// For SSL:
// const credentials = grpc.credentials.createSsl(
//   fs.readFileSync(path.join(__dirname, 'ca.crt')),
//   fs.readFileSync(path.join(__dirname, 'client.key')),
//   fs.readFileSync(path.join(__dirname, 'client.crt'))
// );

// Create client instances
const userClient = new userProto.UserService(
  SERVER_ADDRESS,
  credentials
);

const authClient = new userProto.AuthService(
  SERVER_ADDRESS,
  credentials
);

const healthClient = new userProto.HealthCheck(
  SERVER_ADDRESS,
  credentials
);

// Client utility class
class GrpcClient {
  constructor() {
    this.userClient = userClient;
    this.authClient = authClient;
    this.healthClient = healthClient;
  }

  // Health check method
  async checkHealth() {
    return new Promise((resolve, reject) => {
      this.healthClient.check({}, (error, response) => {
        if (error) {
          reject(error);
        } else {
          resolve(response);
        }
      });
    });
  }

  // Authentication methods
  async login(username, password) {
    return new Promise((resolve, reject) => {
      const request = { username, password };
      this.authClient.login(request, (error, response) => {
        if (error) {
          reject(error);
        } else {
          resolve(response);
        }
      });
    });
  }

  async validateToken(token) {
    return new Promise((resolve, reject) => {
      const request = { token };
      this.authClient.validateToken(request, (error, response) => {
        if (error) {
          reject(error);
        } else {
          resolve(response);
        }
      });
    });
  }

  // User service methods
  async createUser(userData) {
    return new Promise((resolve, reject) => {
      const request = {
        name: userData.name,
        email: userData.email,
        age: userData.age || 0,
        roles: userData.roles || ['user']
      };

      this.userClient.createUser(request, (error, response) => {
        if (error) {
          reject(error);
        } else {
          resolve(response);
        }
      });
    });
  }

  async getUser(userId) {
    return new Promise((resolve, reject) => {
      const request = { user_id: userId };
      this.userClient.getUser(request, (error, response) => {
        if (error) {
          reject(error);
        } else {
          resolve(response);
        }
      });
    });
  }

  async updateUser(userId, userData) {
    return new Promise((resolve, reject) => {
      const request = {
        user_id: userId,
        user: userData
      };

      this.userClient.updateUser(request, (error, response) => {
        if (error) {
          reject(error);
        } else {
          resolve(response);
        }
      });
    });
  }

  async deleteUser(userId) {
    return new Promise((resolve, reject) => {
      const request = { user_id: userId };
      this.userClient.deleteUser(request, (error, response) => {
        if (error) {
          reject(error);
        } else {
          resolve(response);
        }
      });
    });
  }

  // Server streaming method
  async listUsers(page = 1, limit = 10, filter = '') {
    return new Promise((resolve, reject) => {
      const request = { page, limit, filter };
      const call = this.userClient.listUsers(request);

      const users = [];
      let total = 0;

      call.on('data', (response) => {
        users.push(...response.users);
        total = response.total;
      });

      call.on('end', () => {
        resolve({
          users,
          total,
          page,
          limit
        });
      });

      call.on('error', (error) => {
        reject(error);
      });
    });
  }

  // Client streaming method
  async batchCreateUsers(usersData) {
    return new Promise((resolve, reject) => {
      const call = this.userClient.batchCreateUsers();

      // Send all user data
      usersData.forEach(userData => {
        const request = {
          name: userData.name,
          email: userData.email,
          age: userData.age || 0,
          roles: userData.roles || ['user']
        };
        call.write(request);
      });

      // End the stream
      call.end();

      // Handle response
      call.on('data', (response) => {
        resolve(response);
      });

      call.on('error', (error) => {
        reject(error);
      });
    });
  }

  // Bidirectional streaming method
  async streamUsers(userIds) {
    return new Promise((resolve, reject) => {
      const call = this.userClient.streamUsers();

      const users = [];

      call.on('data', (response) => {
        users.push(response.user);
      });

      call.on('end', () => {
        resolve(users);
      });

      call.on('error', (error) => {
        reject(error);
      });

      // Send user IDs to stream
      userIds.forEach(userId => {
        call.write({ user_id: userId });
      });

      // End the stream
      call.end();
    });
  }
}

// Example usage
async function demonstrateClient() {
  const client = new GrpcClient();

  try {
    // Health check
    console.log('Checking server health...');
    const health = await client.checkHealth();
    console.log('Health status:', health.status === 1 ? 'SERVING' : 'NOT_SERVING');

    // Authentication
    console.log('\nAuthenticating...');
    const authResponse = await client.login('admin', 'password');
    console.log('Login successful:', authResponse);

    // Validate token
    const tokenValidation = await client.validateToken(authResponse.access_token);
    console.log('Token valid:', tokenValidation.valid);

    // Create user
    console.log('\nCreating user...');
    const userResponse = await client.createUser({
      name: 'John Doe',
      email: '[email protected]',
      age: 30,
      roles: ['user', 'developer']
    });
    console.log('Created user:', userResponse.user);

    const userId = userResponse.user.id;

    // Get user
    console.log('\nGetting user...');
    const getUserResponse = await client.getUser(userId);
    console.log('Retrieved user:', getUserResponse.user);

    // Update user
    console.log('\nUpdating user...');
    const updateResponse = await client.updateUser(userId, {
      name: 'John Smith',
      age: 31
    });
    console.log('Updated user:', updateResponse.user);

    // List users
    console.log('\nListing users...');
    const listResponse = await client.listUsers(1, 10, '');
    console.log(`Found ${listResponse.total} users:`, listResponse.users);

    // Batch create users
    console.log('\nBatch creating users...');
    const batchResponse = await client.batchCreateUsers([
      { name: 'Alice Johnson', email: '[email protected]', age: 25 },
      { name: 'Bob Wilson', email: '[email protected]', age: 28 },
      { name: 'Charlie Brown', email: '[email protected]', age: 32 }
    ]);
    console.log('Batch creation response:', batchResponse.message);

    // Stream users
    console.log('\nStreaming users...');
    const streamResponse = await client.streamUsers([userId, '999', '888']);
    console.log('Streamed users:', streamResponse);

    // Delete user
    console.log('\nDeleting user...');
    const deleteResponse = await client.deleteUser(userId);
    console.log('Delete response:', deleteResponse);

  } catch (error) {
    console.error('Error:', error);
    if (error.code) {
      console.error('gRPC Error Code:', error.code);
      console.error('gRPC Error Message:', error.message);
    }
  }
}

// Error handling utility
function handleGrpcError(error) {
  if (error.code === undefined) {
    return 'Network error or connection failed';
  }

  const errorMessages = {
    [grpc.status.OK]: 'OK',
    [grpc.status.CANCELLED]: 'Operation cancelled',
    [grpc.status.UNKNOWN]: 'Unknown error',
    [grpc.status.INVALID_ARGUMENT]: 'Invalid argument',
    [grpc.status.DEADLINE_EXCEEDED]: 'Deadline exceeded',
    [grpc.status.NOT_FOUND]: 'Not found',
    [grpc.status.ALREADY_EXISTS]: 'Already exists',
    [grpc.status.PERMISSION_DENIED]: 'Permission denied',
    [grpc.status.UNAUTHENTICATED]: 'Unauthenticated',
    [grpc.status.RESOURCE_EXHAUSTED]: 'Resource exhausted',
    [grpc.status.FAILED_PRECONDITION]: 'Failed precondition',
    [grpc.status.ABORTED]: 'Aborted',
    [grpc.status.OUT_OF_RANGE]: 'Out of range',
    [grpc.status.UNIMPLEMENTED]: 'Unimplemented',
    [grpc.status.INTERNAL]: 'Internal error',
    [grpc.status.UNAVAILABLE]: 'Unavailable',
    [grpc.status.DATA_LOSS]: 'Data loss',
    [grpc.status.DO_NOT_USE]: 'Do not use'
  };

  return errorMessages[error.code] || `Unknown gRPC error: ${error.code}`;
}

// Export client class and utility functions
module.exports = {
  GrpcClient,
  handleGrpcError,
  demonstrateClient
};

// Run demonstration if this file is executed directly
if (require.main === module) {
  demonstrateClient().catch(console.error);
}

💻 gRPC Service Reflection

🟡 intermediate

Implementing gRPC server reflection for dynamic service discovery and tooling integration

// gRPC Server Reflection Implementation
const grpc = require('@grpc/grpc-js');
const fs = require('fs');
const path = require('path');

// Server reflection service implementation
class ServerReflectionService {
  constructor(server, protoFiles = []) {
    this.server = server;
    this.services = new Map();
    this.fileDescriptors = new Map();
    this.symbolTable = new Map();

    // Load proto files
    this.loadProtoFiles(protoFiles);

    // Register reflection service
    this.registerReflectionService();
  }

  loadProtoFiles(protoFiles) {
    protoFiles.forEach(filePath => {
      try {
        const protoContent = fs.readFileSync(filePath, 'utf8');
        const fileName = path.basename(filePath);

        // Parse proto file (simplified - in production use proper parser)
        const descriptor = this.parseProtoFile(protoContent, fileName);
        this.fileDescriptors.set(fileName, descriptor);

        // Extract service information
        this.extractServices(descriptor, fileName);

        console.log(`Loaded proto file: ${fileName}`);
      } catch (error) {
        console.error(`Failed to load proto file ${filePath}:`, error.message);
      }
    });
  }

  parseProtoFile(content, fileName) {
    // Simplified proto file parsing
    // In production, use @grpc/proto-loader or protobufjs
    const descriptor = {
      name: fileName,
      package: '',
      messages: [],
      services: [],
      enums: [],
      dependencies: []
    };

    const lines = content.split('\n');
    let currentService = null;
    let currentMessage = null;

    lines.forEach(line => {
      line = line.trim();

      if (line.startsWith('package ')) {
        descriptor.package = line.replace('package ', '').replace(';', '');
      } else if (line.startsWith('service ')) {
        const serviceName = line.match(/service\s+(\w+)/);
        if (serviceName) {
          currentService = {
            name: serviceName[1],
            methods: []
          };
          descriptor.services.push(currentService);
        }
      } else if (line.startsWith('rpc ') && currentService) {
        const methodMatch = line.match(/rpc\s+(\w+)\s*\(([^)]+)\)\s*returns\s*\(([^)]+)\)/);
        if (methodMatch) {
          currentService.methods.push({
            name: methodMatch[1],
            inputType: methodMatch[2].trim(),
            outputType: methodMatch[3].trim(),
            clientStreaming: line.includes('stream ') && line.indexOf('stream ') < line.indexOf('rpc ') + 4,
            serverStreaming: line.includes('stream ') && line.indexOf('stream ') > line.indexOf('returns ')
          });
        }
      } else if (line.startsWith('message ')) {
        const messageName = line.match(/message\s+(\w+)/);
        if (messageName) {
          currentMessage = {
            name: messageName[1],
            fields: []
          };
          descriptor.messages.push(currentMessage);
        }
      } else if (currentMessage && line.includes('=')) {
        const fieldMatch = line.match(/(\w+)\s+(\w+)\s*=\s+(\d+)/);
        if (fieldMatch) {
          currentMessage.fields.push({
            name: fieldMatch[2],
            type: fieldMatch[1],
            number: parseInt(fieldMatch[3])
          });
        }
      }
    });

    return descriptor;
  }

  extractServices(descriptor, fileName) {
    descriptor.services.forEach(service => {
      const fullName = descriptor.package ? `${descriptor.package}.${service.name}` : service.name;

      this.services.set(fullName, {
        name: service.name,
        fullName: fullName,
        file: fileName,
        package: descriptor.package,
        methods: service.methods.map(method => ({
          name: method.name,
          inputType: method.inputType,
          outputType: method.outputType,
          clientStreaming: method.clientStreaming,
          serverStreaming: method.serverStreaming
        }))
      });

      // Add to symbol table
      this.symbolTable.set(fullName, 'SERVICE');

      service.methods.forEach(method => {
        const methodFullName = `${fullName}.${method.name}`;
        this.symbolTable.set(methodFullName, 'METHOD');
      });
    });

    descriptor.messages.forEach(message => {
      const fullName = descriptor.package ? `${descriptor.package}.${message.name}` : message.name;
      this.symbolTable.set(fullName, 'MESSAGE');

      message.fields.forEach(field => {
        const fieldFullName = `${fullName}.${field.name}`;
        this.symbolTable.set(fieldFullName, 'FIELD');
      });
    });
  }

  registerReflectionService() {
    const reflectionProto = this.createReflectionProto();

    this.server.addService(reflectionProto.ServerReflection.service, {
      serverReflectionInfo: this.handleServerReflectionInfo.bind(this)
    });
  }

  createReflectionProto() {
    // Simplified reflection proto definition
    return {
      ServerReflection: {
        service: {
          serverReflectionInfo: {
            path: '/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo',
            requestStream: true,
            responseStream: true
          }
        }
      }
    };
  }

  async handleServerReflectionInfo(call) {
    call.on('data', (request) => {
      try {
        const response = this.processReflectionRequest(request);
        call.write(response);
      } catch (error) {
        call.write({
          errorResponse: {
            errorCode: grpc.status.INTERNAL,
            errorMessage: error.message
          }
        });
      }
    });

    call.on('end', () => {
      call.end();
    });
  }

  processReflectionRequest(request) {
    if (request.listServices) {
      return this.handleListServices(request.listServices);
    } else if (request.fileByFilename) {
      return this.handleFileByFilename(request.fileByFilename);
    } else if (request.fileContainingSymbol) {
      return this.handleFileContainingSymbol(request.fileContainingSymbol);
    } else if (request.allExtensionNumbersOfType) {
      return this.handleAllExtensionNumbers(request.allExtensionNumbersOfType);
    } else if (request.listExtensionNumbers) {
      return this.handleListExtensionNumbers(request.listExtensionNumbers);
    } else {
      throw new Error('Unknown reflection request type');
    }
  }

  handleListServices(request) {
    const services = [];

    // Get services from the server
    if (this.server && this.server.services) {
      Object.keys(this.server.services).forEach(servicePath => {
        const serviceName = servicePath.split('/').pop();
        if (!serviceName.includes('ServerReflection')) {
          services.push({ name: serviceName });
        }
      });
    }

    return {
      listServicesResponse: {
        service: services
      }
    };
  }

  handleFileByFilename(request) {
    const fileName = request.filename;
    const descriptor = this.fileDescriptors.get(fileName);

    if (!descriptor) {
      return {
        errorResponse: {
          errorCode: grpc.status.NOT_FOUND,
          errorMessage: `File not found: ${fileName}`
        }
      };
    }

    const fileDescriptorProto = this.serializeDescriptor(descriptor);

    return {
      fileDescriptorResponse: {
        fileDescriptorProto: [fileDescriptorProto]
      }
    };
  }

  handleFileContainingSymbol(request) {
    const symbol = request.symbol;
    const fileName = this.findFileForSymbol(symbol);

    if (!fileName) {
      return {
        errorResponse: {
          errorCode: grpc.status.NOT_FOUND,
          errorMessage: `Symbol not found: ${symbol}`
        }
      };
    }

    const descriptor = this.fileDescriptors.get(fileName);
    const fileDescriptorProto = this.serializeDescriptor(descriptor);

    return {
      fileDescriptorResponse: {
        fileDescriptorProto: [fileDescriptorProto]
      }
    };
  }

  handleAllExtensionNumbers(request) {
    // Simplified implementation
    return {
      allExtensionNumbersResponse: {
        baseTypeName: request.extending_type,
        extensionNumber: []
      }
    };
  }

  handleListExtensionNumbers(request) {
    // Simplified implementation
    return {
      extensionNumberResponse: {
        baseTypeName: request.containing_type,
        extensionNumber: []
      }
    };
  }

  findFileForSymbol(symbol) {
    for (const [fileName, descriptor] of this.fileDescriptors) {
      const fullName = descriptor.package ? `${descriptor.package}.${symbol}` : symbol;

      // Check if symbol exists in this file
      if (descriptor.services.some(s => s.name === symbol) ||
          descriptor.messages.some(m => m.name === symbol)) {
        return fileName;
      }
    }
    return null;
  }

  serializeDescriptor(descriptor) {
    // Simplified serialization - in production use proper protobuf serialization
    const serialized = JSON.stringify({
      name: descriptor.name,
      package: descriptor.package,
      dependency: descriptor.dependencies,
      messageType: descriptor.messages.map(msg => ({
        name: msg.name,
        field: msg.fields.map(field => ({
          name: field.name,
          number: field.number,
          label: 'LABEL_OPTIONAL',
          type: this.getProtobufType(field.type)
        }))
      })),
      serviceType: descriptor.services.map(svc => ({
        name: svc.name,
        method: svc.methods.map(method => ({
          name: method.name,
          inputType: method.inputType,
          outputType: method.outputType,
          clientStreaming: method.clientStreaming,
          serverStreaming: method.serverStreaming
        }))
      }))
    });

    return Buffer.from(serialized);
  }

  getProtobufType(type) {
    const typeMap = {
      'string': 'TYPE_STRING',
      'int32': 'TYPE_INT32',
      'int64': 'TYPE_INT64',
      'bool': 'TYPE_BOOL',
      'double': 'TYPE_DOUBLE',
      'float': 'TYPE_FLOAT'
    };

    return typeMap[type] || 'TYPE_STRING';
  }
}

// Reflection client for service discovery
class ReflectionClient {
  constructor(address, credentials) {
    this.address = address;
    this.credentials = credentials;
    this.cache = new Map();
  }

  async listServices() {
    const cacheKey = 'listServices';

    if (this.cache.has(cacheKey)) {
      return this.cache.get(cacheKey);
    }

    const response = await this.makeReflectionRequest({
      listServices: {}
    });

    const services = response.listServicesResponse.service.map(s => s.name);
    this.cache.set(cacheKey, services);

    return services;
  }

  async getServiceDescriptor(serviceName) {
    const cacheKey = `service_${serviceName}`;

    if (this.cache.has(cacheKey)) {
      return this.cache.get(cacheKey);
    }

    const response = await this.makeReflectionRequest({
      fileContainingSymbol: { symbol: serviceName }
    });

    if (!response.fileDescriptorResponse || !response.fileDescriptorResponse.fileDescriptorProto.length) {
      throw new Error(`Service descriptor not found for ${serviceName}`);
    }

    const descriptor = this.deserializeDescriptor(response.fileDescriptorResponse.fileDescriptorProto[0]);
    this.cache.set(cacheKey, descriptor);

    return descriptor;
  }

  async getAllServiceDescriptors() {
    const services = await this.listServices();
    const descriptors = {};

    for (const serviceName of services) {
      try {
        descriptors[serviceName] = await this.getServiceDescriptor(serviceName);
      } catch (error) {
        console.warn(`Failed to get descriptor for ${serviceName}:`, error.message);
      }
    }

    return descriptors;
  }

  async makeReflectionRequest(request) {
    return new Promise((resolve, reject) => {
      const client = this.createReflectionClient();

      const call = client.serverReflectionInfo();

      call.write(request);

      let response = null;
      let error = null;

      call.on('data', (data) => {
        response = data;
      });

      call.on('end', () => {
        if (error) {
          reject(error);
        } else if (response) {
          resolve(response);
        } else {
          reject(new Error('No response received'));
        }
      });

      call.on('error', (err) => {
        error = err;
      });

      call.end();
    });
  }

  createReflectionClient() {
    // Simplified client creation
    return {
      serverReflectionInfo: () => ({
        write: () => {},
        on: () => {},
        end: () => {}
      })
    };
  }

  deserializeDescriptor(buffer) {
    // Simplified deserialization - in production use proper protobuf deserialization
    return JSON.parse(buffer.toString());
  }

  clearCache() {
    this.cache.clear();
  }
}

// Service discovery utilities
class ServiceDiscovery {
  constructor(reflectionClient) {
    this.reflectionClient = reflectionClient;
    this.serviceRegistry = new Map();
  }

  async discoverServices() {
    console.log('Discovering services...');

    const services = await this.reflectionClient.listServices();
    console.log(`Found ${services.length} services:`, services);

    for (const serviceName of services) {
      try {
        await this.discoverService(serviceName);
      } catch (error) {
        console.error(`Failed to discover ${serviceName}:`, error.message);
      }
    }

    console.log('Service discovery completed');
    return this.serviceRegistry;
  }

  async discoverService(serviceName) {
    console.log(`Discovering service: ${serviceName}`);

    const descriptor = await this.reflectionClient.getServiceDescriptor(serviceName);

    const serviceInfo = {
      name: serviceName,
      package: descriptor.package,
      methods: descriptor.serviceType[0].method.map(method => ({
        name: method.name,
        inputType: method.inputType,
        outputType: method.outputType,
        clientStreaming: method.clientStreaming,
        serverStreaming: method.serverStreaming,
        path: `/${descriptor.package ? descriptor.package + '.' : ''}${serviceName}/${method.name}`
      })),
      messages: descriptor.messageType.map(msg => ({
        name: msg.name,
        fields: msg.field.map(field => ({
          name: field.name,
          type: field.type,
          number: field.number
        }))
      }))
    };

    this.serviceRegistry.set(serviceName, serviceInfo);

    console.log(`Service ${serviceName} discovered with ${serviceInfo.methods.length} methods`);

    return serviceInfo;
  }

  getService(serviceName) {
    return this.serviceRegistry.get(serviceName);
  }

  getAllServices() {
    return Array.from(this.serviceRegistry.values());
  }

  findMethod(methodName) {
    for (const [serviceName, serviceInfo] of this.serviceRegistry) {
      const method = serviceInfo.methods.find(m => m.name === methodName);
      if (method) {
        return {
          service: serviceName,
          method: method
        };
      }
    }
    return null;
  }

  generateClientCode(serviceName, language = 'javascript') {
    const serviceInfo = this.serviceRegistry.get(serviceName);
    if (!serviceInfo) {
      throw new Error(`Service ${serviceName} not found`);
    }

    if (language === 'javascript') {
      return this.generateJavascriptClient(serviceInfo);
    } else if (language === 'typescript') {
      return this.generateTypeScriptClient(serviceInfo);
    } else {
      throw new Error(`Language ${language} not supported`);
    }
  }

  generateJavascriptClient(serviceInfo) {
    let code = `// Auto-generated gRPC client for ${serviceInfo.name}\n`;
    code += `const grpc = require('@grpc/grpc-js');\n\n`;

    // Client class
    code += `class ${serviceInfo.name}Client {\n`;
    code += `  constructor(address, credentials) {\n`;
    code += `    this.client = new grpc.Client(address, credentials);\n`;
    code += `  }\n\n`;

    // Generate methods
    serviceInfo.methods.forEach(method => {
      const methodName = method.name.charAt(0).toLowerCase() + method.name.slice(1);

      if (method.clientStreaming && method.serverStreaming) {
        // Bidirectional streaming
        code += `  ${methodName}() {\n`;
        code += `    return this.client.makeBidiStreamRequest(\n`;
        code += `      '${method.path}'\n`;
        code += `    );\n`;
        code += `  }\n\n`;
      } else if (method.clientStreaming) {
        // Client streaming
        code += `  ${methodName}(callback) {\n`;
        code += `    return this.client.makeClientStreamRequest(\n`;
        code += `      '${method.path}', callback\n`;
        code += `    );\n`;
        code += `  }\n\n`;
      } else if (method.serverStreaming) {
        // Server streaming
        code += `  ${methodName}(request) {\n`;
        code += `    return this.client.makeServerStreamRequest(\n`;
        code += `      '${method.path}', request\n`;
        code += `    );\n`;
        code += `  }\n\n`;
      } else {
        // Unary
        code += `  ${methodName}(request, callback) {\n`;
        code += `    return this.client.makeUnaryRequest(\n`;
        code += `      '${method.path}', request, callback\n`;
        code += `    );\n`;
        code += `  }\n\n`;
      }
    });

    code += `}\n\n`;
    code += `module.exports = ${serviceInfo.name}Client;\n`;

    return code;
  }

  generateTypeScriptClient(serviceInfo) {
    // Similar to JavaScript but with TypeScript types
    return this.generateJavascriptClient(serviceInfo).replace('javascript', 'typescript');
  }
}

module.exports = {
  ServerReflectionService,
  ReflectionClient,
  ServiceDiscovery
};

💻 gRPC Health Checking

🟡 intermediate

Implementing standardized gRPC health checking protocol for service monitoring

// gRPC Health Checking Implementation
const grpc = require('@grpc/grpc-js');

// Health checking status enum
const HealthCheckResponseServingStatus = {
  UNKNOWN: 0,
  SERVING: 1,
  NOT_SERVING: 2,
  SERVICE_UNKNOWN: 3
};

// Health check implementation
class HealthCheckService {
  constructor(options = {}) {
    this.statuses = new Map();
    this.defaultStatus = options.defaultStatus || HealthCheckResponseServingStatus.SERVING;
    this.checkInterval = options.checkInterval || 30000;
    this.detailedLogging = options.detailedLogging || false;
    this.watchers = new Map();

    // Set default service status
    this.setStatus('', this.defaultStatus);
  }

  // Set health status for a service
  setStatus(service, status) {
    const oldStatus = this.statuses.get(service);
    this.statuses.set(service, status);

    if (this.detailedLogging) {
      console.log(`Health status changed for '${service}': ${this.getStatusString(oldStatus)} -> ${this.getStatusString(status)}`);
    }

    // Notify all watchers for this service
    this.notifyWatchers(service, status);
  }

  // Get health status for a service
  getStatus(service) {
    return this.statuses.get(service) || HealthCheckResponseServingStatus.SERVICE_UNKNOWN;
  }

  // Clear status for a service
  clearStatus(service) {
    this.statuses.delete(service);
    this.notifyWatchers(service, HealthCheckResponseServingStatus.SERVICE_UNKNOWN);
  }

  // Get all service statuses
  getAllStatuses() {
    const result = {};
    this.statuses.forEach((status, service) => {
      result[service] = {
        status: status,
        statusString: this.getStatusString(status)
      };
    });
    return result;
  }

  // gRPC health check method
  check(call, callback) {
    const service = call.request.service || '';
    const status = this.getStatus(service);

    if (this.detailedLogging) {
      console.log(`Health check requested for service: '${service}', status: ${this.getStatusString(status)}`);
    }

    callback(null, {
      status: status
    });
  }

  // gRPC health watch method (server streaming)
  watch(call) {
    const service = call.request.service || '';
    const watcherId = this.generateWatcherId();

    if (this.detailedLogging) {
      console.log(`Health watch started for service: '${service}', watcher: ${watcherId}`);
    }

    // Store watcher
    if (!this.watchers.has(service)) {
      this.watchers.set(service, new Map());
    }

    this.watchers.get(service).set(watcherId, call);

    // Send current status immediately
    const currentStatus = this.getStatus(service);
    call.write({
      status: currentStatus
    });

    // Handle watcher cleanup
    const cleanup = () => {
      if (this.watchers.has(service)) {
        this.watchers.get(service).delete(watcherId);

        if (this.watchers.get(service).size === 0) {
          this.watchers.delete(service);
        }
      }

      if (this.detailedLogging) {
        console.log(`Health watch ended for service: '${service}', watcher: ${watcherId}`);
      }
    };

    call.on('end', cleanup);
    call.on('error', cleanup);
    call.on('cancel', cleanup);
  }

  // Notify all watchers of a service status change
  notifyWatchers(service, status) {
    if (!this.watchers.has(service)) {
      return;
    }

    const watchers = this.watchers.get(service);
    const message = { status: status };

    watchers.forEach((call, watcherId) => {
      try {
        call.write(message);
      } catch (error) {
        console.error(`Failed to notify watcher ${watcherId} for service ${service}:`, error.message);
        watchers.delete(watcherId);
      }
    });
  }

  // Generate unique watcher ID
  generateWatcherId() {
    return 'watcher-' + Math.random().toString(36).substr(2, 9);
  }

  // Convert status to string
  getStatusString(status) {
    const statusStrings = {
      [HealthCheckResponseServingStatus.UNKNOWN]: 'UNKNOWN',
      [HealthCheckResponseServingStatus.SERVING]: 'SERVING',
      [HealthCheckResponseServingStatus.NOT_SERVING]: 'NOT_SERVING',
      [HealthCheckResponseServingStatus.SERVICE_UNKNOWN]: 'SERVICE_UNKNOWN'
    };

    return statusStrings[status] || 'UNKNOWN';
  }

  // Get watcher statistics
  getWatcherStats() {
    const stats = {
      totalWatchers: 0,
      serviceWatchers: {}
    };

    this.watchers.forEach((watchers, service) => {
      stats.totalWatchers += watchers.size;
      stats.serviceWatchers[service] = watchers.size;
    });

    return stats;
  }
}

// Advanced health check with dependencies
class AdvancedHealthCheckService extends HealthCheckService {
  constructor(options = {}) {
    super(options);
    this.dependencies = new Map();
    this.checkers = new Map();
    this.checkResults = new Map();
    this.checkTimeout = options.checkTimeout || 5000;
    this.failureThreshold = options.failureThreshold || 3;
    this.recoveryThreshold = options.recoveryThreshold || 2;
  }

  // Add dependency for a service
  addDependency(service, dependency, checker) {
    if (!this.dependencies.has(service)) {
      this.dependencies.set(service, new Set());
    }

    this.dependencies.get(service).add(dependency);
    this.checkers.set(dependency, checker);

    // Start monitoring the dependency
    this.startDependencyMonitoring(dependency, checker);
  }

  // Start monitoring a dependency
  async startDependencyMonitoring(dependency, checker) {
    const checkDependency = async () => {
      try {
        const startTime = Date.now();
        const result = await Promise.race([
          checker(),
          new Promise((_, reject) =>
            setTimeout(() => reject(new Error('Health check timeout')), this.checkTimeout)
          )
        ]);

        const responseTime = Date.now() - startTime;

        this.updateCheckResult(dependency, {
          healthy: true,
          lastCheck: new Date(),
          responseTime: responseTime,
          message: result.message || 'OK',
          details: result.details || {}
        });
      } catch (error) {
        this.updateCheckResult(dependency, {
          healthy: false,
          lastCheck: new Date(),
          error: error.message,
          lastError: error
        });
      }
    };

    // Run check immediately
    checkDependency();

    // Schedule periodic checks
    setInterval(checkDependency, this.checkInterval);
  }

  // Update check result for a dependency
  updateCheckResult(dependency, result) {
    const previousResult = this.checkResults.get(dependency) || {
      consecutiveFailures: 0,
      consecutiveSuccesses: 0
    };

    const newResult = {
      ...previousResult,
      ...result,
      consecutiveFailures: result.healthy ? 0 : previousResult.consecutiveFailures + 1,
      consecutiveSuccesses: result.healthy ? previousResult.consecutiveSuccesses + 1 : 0
    };

    this.checkResults.set(dependency, newResult);

    // Determine if dependency should be considered unhealthy
    const isUnhealthy = newResult.consecutiveFailures >= this.failureThreshold;
    const isRecovered = newResult.consecutiveSuccesses >= this.recoveryThreshold;

    // Update service status based on dependency health
    this.updateServiceStatusBasedOnDependencies();
  }

  // Update service status based on all dependencies
  updateServiceStatusBasedOnDependencies() {
    for (const [service, deps] of this.dependencies) {
      let allHealthy = true;
      let anyUnhealthy = false;

      for (const dependency of deps) {
        const result = this.checkResults.get(dependency);

        if (!result) {
          allHealthy = false;
        } else if (result.consecutiveFailures >= this.failureThreshold) {
          allHealthy = false;
          anyUnhealthy = true;
        }
      }

      let newStatus;
      if (allHealthy) {
        newStatus = HealthCheckResponseServingStatus.SERVING;
      } else if (anyUnhealthy) {
        newStatus = HealthCheckResponseServingStatus.NOT_SERVING;
      } else {
        newStatus = HealthCheckResponseServingStatus.UNKNOWN;
      }

      const currentStatus = this.getStatus(service);
      if (currentStatus !== newStatus) {
        this.setStatus(service, newStatus);
      }
    }
  }

  // Get detailed health status
  getDetailedStatus(service) {
    const status = this.getStatus(service);
    const dependencies = this.dependencies.get(service) || new Set();

    const dependencyStatuses = {};
    for (const dependency of dependencies) {
      const result = this.checkResults.get(dependency);
      dependencyStatuses[dependency] = {
        healthy: result ? result.consecutiveFailures < this.failureThreshold : false,
        lastCheck: result ? result.lastCheck : null,
        responseTime: result ? result.responseTime : null,
        error: result && result.error ? result.error : null,
        consecutiveFailures: result ? result.consecutiveFailures : 0,
        consecutiveSuccesses: result ? result.consecutiveSuccesses : 0
      };
    }

    return {
      service: service,
      status: status,
      statusString: this.getStatusString(status),
      dependencies: dependencyStatuses,
      allDependenciesHealthy: Object.values(dependencyStatuses).every(dep => dep.healthy),
      lastUpdated: new Date()
    };
  }

  // Get all detailed statuses
  getAllDetailedStatuses() {
    const result = {};

    // Include services with dependencies
    for (const service of this.dependencies.keys()) {
      result[service] = this.getDetailedStatus(service);
    }

    // Include services without dependencies
    for (const service of this.statuses.keys()) {
      if (!result[service]) {
        result[service] = {
          service: service,
          status: this.getStatus(service),
          statusString: this.getStatusString(this.getStatus(service)),
          dependencies: {},
          allDependenciesHealthy: true,
          lastUpdated: new Date()
        };
      }
    }

    return result;
  }
}

// Health check client for monitoring services
class HealthCheckClient {
  constructor(address, credentials = grpc.credentials.createInsecure()) {
    this.address = address;
    this.credentials = credentials;
    this.client = this.createHealthClient();
  }

  createHealthClient() {
    // Create health check client
    // In production, load the actual health proto
    return {
      check: (request, callback) => {
        // Simplified implementation
        setTimeout(() => {
          callback(null, { status: HealthCheckResponseServingStatus.SERVING });
        }, 100);
      },

      watch: (request) => {
        // Simplified streaming implementation
        const call = {
          write: () => {},
          on: (event, handler) => {
            if (event === 'data') {
              // Simulate periodic health updates
              setInterval(() => {
                handler({ status: HealthCheckResponseServingStatus.SERVING });
              }, 5000);
            }
          }
        };
        return call;
      }
    };
  }

  // Check health of a specific service
  async checkHealth(service = '') {
    return new Promise((resolve, reject) => {
      const request = { service: service };

      this.client.check(request, (error, response) => {
        if (error) {
          reject(error);
        } else {
          resolve({
            service: service,
            status: response.status,
            statusString: this.getStatusString(response.status),
            timestamp: new Date()
          });
        }
      });
    });
  }

  // Watch health status changes
  watchHealth(service = '', callback) {
    const request = { service: service };
    const call = this.client.watch(request);

    call.on('data', (response) => {
      callback(null, {
        service: service,
        status: response.status,
        statusString: this.getStatusString(response.status),
        timestamp: new Date()
      });
    });

    call.on('error', (error) => {
      callback(error);
    });

    return call;
  }

  // Check health with timeout
  async checkHealthWithTimeout(service = '', timeoutMs = 5000) {
    return new Promise((resolve, reject) => {
      const timeout = setTimeout(() => {
        reject(new Error(`Health check timeout after ${timeoutMs}ms`));
      }, timeoutMs);

      this.checkHealth(service)
        .then(result => {
          clearTimeout(timeout);
          resolve(result);
        })
        .catch(error => {
          clearTimeout(timeout);
          reject(error);
        });
    });
  }

  // Check health with retries
  async checkHealthWithRetry(service = '', maxRetries = 3, retryDelay = 1000) {
    let lastError;

    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        return await this.checkHealth(service);
      } catch (error) {
        lastError = error;

        if (attempt < maxRetries) {
          console.warn(`Health check attempt ${attempt} failed for ${service}, retrying in ${retryDelay}ms:`, error.message);
          await this.sleep(retryDelay);
        }
      }
    }

    throw lastError;
  }

  // Batch health check for multiple services
  async checkMultipleServices(services) {
    const promises = services.map(service =>
      this.checkHealth(service).catch(error => ({
        service: service,
        error: error.message,
        status: HealthCheckResponseServingStatus.SERVICE_UNKNOWN,
        statusString: 'SERVICE_UNKNOWN',
        timestamp: new Date()
      }))
    );

    const results = await Promise.allSettled(promises);

    return results.map((result, index) => {
      if (result.status === 'fulfilled') {
        return result.value;
      } else {
        return {
          service: services[index],
          error: result.reason.message,
          status: HealthCheckResponseServingStatus.SERVICE_UNKNOWN,
          statusString: 'SERVICE_UNKNOWN',
          timestamp: new Date()
        };
      }
    });
  }

  getStatusString(status) {
    const statusStrings = {
      [HealthCheckResponseServingStatus.UNKNOWN]: 'UNKNOWN',
      [HealthCheckResponseServingStatus.SERVING]: 'SERVING',
      [HealthCheckResponseServingStatus.NOT_SERVING]: 'NOT_SERVING',
      [HealthCheckResponseServingStatus.SERVICE_UNKNOWN]: 'SERVICE_UNKNOWN'
    };

    return statusStrings[status] || 'UNKNOWN';
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// Health check utilities
class HealthCheckUtils {
  // Create database health checker
  static createDatabaseHealthChecker(dbConnection) {
    return async () => {
      try {
        await dbConnection.raw('SELECT 1');
        return { message: 'Database connection healthy' };
      } catch (error) {
        throw new Error(`Database connection failed: ${error.message}`);
      }
    };
  }

  // Create Redis health checker
  static createRedisHealthChecker(redisClient) {
    return async () => {
      try {
        await redisClient.ping();
        return { message: 'Redis connection healthy' };
      } catch (error) {
        throw new Error(`Redis connection failed: ${error.message}`);
      }
    };
  }

  // Create HTTP endpoint health checker
  static createHttpHealthChecker(url, expectedStatus = 200) {
    return async () => {
      try {
        const response = await fetch(url, {
          method: 'GET',
          timeout: 5000
        });

        if (response.status === expectedStatus) {
          return {
            message: `HTTP endpoint healthy (status: ${response.status})`,
            details: { statusCode: response.status }
          };
        } else {
          throw new Error(`HTTP endpoint returned status ${response.status}, expected ${expectedStatus}`);
        }
      } catch (error) {
        throw new Error(`HTTP endpoint check failed: ${error.message}`);
      }
    };
  }

  // Create custom health checker
  static createCustomHealthChecker(checkFunction) {
    return async () => {
      try {
        const result = await checkFunction();
        return {
          message: 'Custom health check passed',
          details: result
        };
      } catch (error) {
        throw new Error(`Custom health check failed: ${error.message}`);
      }
    };
  }

  // Create composite health checker (checks multiple dependencies)
  static createCompositeHealthChecker(checkers) {
    return async () => {
      const results = {};
      let allHealthy = true;

      for (const [name, checker] of Object.entries(checkers)) {
        try {
          const result = await checker();
          results[name] = {
            healthy: true,
            result: result
          };
        } catch (error) {
          results[name] = {
            healthy: false,
            error: error.message
          };
          allHealthy = false;
        }
      }

      return {
        message: allHealthy ? 'All dependencies healthy' : 'Some dependencies unhealthy',
        details: results,
        allHealthy: allHealthy
      };
    };
  }
}

module.exports = {
  HealthCheckResponseServingStatus,
  HealthCheckService,
  AdvancedHealthCheckService,
  HealthCheckClient,
  HealthCheckUtils
};

💻 gRPC Streaming Implementation javascript

🔴 complex

Advanced gRPC streaming patterns including client, server, and bidirectional streaming

// Advanced gRPC Streaming Implementation
const grpc = require('@grpc/grpc-js');
const { EventEmitter } = require('events');

// Custom streaming implementation with backpressure and flow control
class GrpcStreamManager extends EventEmitter {
  constructor() {
    super();
    this.activeStreams = new Map();
    this.streamMetrics = {
      totalStreams: 0,
      activeCount: 0,
      messagesSent: 0,
      messagesReceived: 0
    };
  }

  // Create a managed stream with metrics
  createManagedStream(stream, streamId, type) {
    const managedStream = {
      stream,
      streamId,
      type, // 'client', 'server', 'bidirectional'
      startTime: Date.now(),
      messageCount: 0,
      byteCount: 0,
      isActive: true
    };

    this.activeStreams.set(streamId, managedStream);
    this.streamMetrics.totalStreams++;
    this.streamMetrics.activeCount++;

    // Add event listeners for metrics
    const originalWrite = stream.write.bind(stream);
    stream.write = (message) => {
      managedStream.messageCount++;
      managedStream.byteCount += JSON.stringify(message).length;
      this.streamMetrics.messagesSent++;

      this.emit('messageSent', {
        streamId,
        type,
        message: managedStream.messageCount,
        bytes: managedStream.byteCount
      });

      return originalWrite(message);
    };

    // Clean up when stream ends
    stream.on('end', () => {
      managedStream.isActive = false;
      this.streamMetrics.activeCount--;

      const duration = Date.now() - managedStream.startTime;
      this.emit('streamEnded', {
        streamId,
        type,
        duration,
        messageCount: managedStream.messageCount,
        byteCount: managedStream.byteCount
      });

      this.activeStreams.delete(streamId);
    });

    stream.on('error', (error) => {
      managedStream.isActive = false;
      this.streamMetrics.activeCount--;

      this.emit('streamError', {
        streamId,
        type,
        error: error.message,
        messageCount: managedStream.messageCount
      });

      this.activeStreams.delete(streamId);
    });

    return stream;
  }

  getMetrics() {
    return {
      ...this.streamMetrics,
      activeStreams: this.activeStreams.size,
      streams: Array.from(this.activeStreams.values())
    };
  }
}

// Server Streaming Example: Real-time Notifications
class NotificationService {
  constructor(streamManager) {
    this.streamManager = streamManager;
    this.subscribers = new Map();
  }

  // Subscribe to notifications (server streaming)
  subscribeToNotifications(call, metadata) {
    const userId = metadata.get('user-id') || 'anonymous';
    const streamId = `notification-${userId}-${Date.now()}`;

    console.log(`User ${userId} subscribed to notifications`);

    const managedCall = this.streamManager.createManagedStream(
      call, streamId, 'server'
    );

    // Store subscription
    this.subscribers.set(userId, managedCall);

    // Send initial subscription confirmation
    call.write({
      id: 'subscription-confirmation',
      type: 'SUBSCRIPTION',
      timestamp: new Date().toISOString(),
      userId: userId,
      message: 'Successfully subscribed to notifications'
    });

    // Periodic notifications
    const interval = setInterval(() => {
      if (this.subscribers.has(userId)) {
        call.write({
          id: `notification-${Date.now()}`,
          type: 'INFO',
          timestamp: new Date().toISOString(),
          userId: userId,
          message: `Periodic update for user ${userId}`,
          data: {
            serverTime: new Date().toISOString(),
            randomValue: Math.random()
          }
        });
      }
    }, 5000);

    // Cleanup on stream end
    call.on('end', () => {
      clearInterval(interval);
      this.subscribers.delete(userId);
      console.log(`User ${userId} unsubscribed from notifications`);
    });

    call.on('error', (error) => {
      clearInterval(interval);
      this.subscribers.delete(userId);
      console.error(`Notification stream error for user ${userId}:`, error.message);
    });
  }

  // Broadcast notification to all subscribers
  broadcastNotification(notification) {
    const message = {
      id: `broadcast-${Date.now()}`,
      type: 'BROADCAST',
      timestamp: new Date().toISOString(),
      message: notification.message,
      data: notification.data || {}
    };

    this.subscribers.forEach((subscription, userId) => {
      if (subscription.stream.isActive) {
        try {
          subscription.stream.write({ ...message, userId });
        } catch (error) {
          console.error(`Failed to send broadcast to user ${userId}:`, error.message);
          this.subscribers.delete(userId);
        }
      }
    });
  }
}

// Client Streaming Example: Data Upload with Progress
class DataUploadService {
  constructor(streamManager) {
    this.streamManager = streamManager;
    this.uploads = new Map();
  }

  // Handle data upload stream (client streaming)
  handleDataUpload(call, callback) {
    const uploadId = `upload-${Date.now()}`;
    const streamId = `upload-${uploadId}`;

    console.log(`Starting data upload: ${uploadId}`);

    const uploadData = {
      id: uploadId,
      startTime: Date.now(),
      chunks: [],
      totalSize: 0,
      receivedSize: 0
    };

    this.uploads.set(uploadId, uploadData);

    const managedCall = this.streamManager.createManagedStream(
      call, streamId, 'client'
    );

    // Handle incoming chunks
    call.on('data', (chunk) => {
      uploadData.chunks.push(chunk);
      uploadData.receivedSize += chunk.size || 0;
      uploadData.totalSize = chunk.totalSize || uploadData.totalSize;

      const progress = uploadData.totalSize > 0
        ? (uploadData.receivedSize / uploadData.totalSize) * 100
        : 0;

      console.log(`Upload ${uploadId}: ${progress.toFixed(2)}% complete`);

      // Send progress update
      call.write({
        type: 'PROGRESS',
        uploadId,
        progress: progress,
        receivedSize: uploadData.receivedSize,
        totalSize: uploadData.totalSize,
        timestamp: new Date().toISOString()
      });
    });

    // Handle stream completion
    call.on('end', () => {
      const duration = Date.now() - uploadData.startTime;
      const averageSpeed = uploadData.receivedSize / (duration / 1000); // bytes/sec

      console.log(`Upload ${uploadId} completed. Duration: ${duration}ms, Speed: ${averageSpeed.toFixed(2)} bytes/sec`);

      callback(null, {
        success: true,
        uploadId,
        totalChunks: uploadData.chunks.length,
        totalSize: uploadData.receivedSize,
        duration: duration,
        averageSpeed: averageSpeed
      });

      this.uploads.delete(uploadId);
    });

    // Handle stream errors
    call.on('error', (error) => {
      console.error(`Upload ${uploadId} failed:`, error.message);
      this.uploads.delete(uploadId);

      callback({
        code: grpc.status.INTERNAL,
        message: `Upload failed: ${error.message}`
      });
    });
  }

  getUploadStatus(uploadId) {
    return this.uploads.get(uploadId);
  }
}

// Bidirectional Streaming Example: Real-time Chat
class ChatService {
  constructor(streamManager) {
    this.streamManager = streamManager;
    this.rooms = new Map();
    this.users = new Map();
  }

  // Handle chat room (bidirectional streaming)
  joinChatRoom(call) {
    let currentRoom = null;
    let currentUser = null;
    const streamId = `chat-${Date.now()}`;

    const managedCall = this.streamManager.createManagedStream(
      call, streamId, 'bidirectional'
    );

    // Handle incoming messages
    call.on('data', (message) => {
      const timestamp = new Date().toISOString();

      switch (message.type) {
        case 'JOIN':
          currentUser = {
            id: message.userId,
            name: message.userName || `User-${message.userId}`,
            joinTime: timestamp
          };

          currentRoom = message.roomId || 'default';

          // Join room
          if (!this.rooms.has(currentRoom)) {
            this.rooms.set(currentRoom, new Set());
          }

          this.rooms.get(currentRoom).add(managedCall);
          this.users.set(currentUser.id, currentUser);

          console.log(`User ${currentUser.name} joined room ${currentRoom}`);

          // Send join confirmation
          call.write({
            type: 'SYSTEM',
            roomId: currentRoom,
            timestamp,
            message: `Welcome to room ${currentRoom}, ${currentUser.name}!`,
            userCount: this.rooms.get(currentRoom).size
          });

          // Notify other users
          this.broadcastToRoom(currentRoom, {
            type: 'SYSTEM',
            roomId: currentRoom,
            timestamp,
            message: `${currentUser.name} joined the room`
          }, managedCall);
          break;

        case 'MESSAGE':
          if (currentRoom && currentUser) {
            const chatMessage = {
              id: `msg-${Date.now()}`,
              type: 'MESSAGE',
              roomId: currentRoom,
              userId: currentUser.id,
              userName: currentUser.name,
              content: message.content,
              timestamp
            };

            // Broadcast to all users in the room
            this.broadcastToRoom(currentRoom, chatMessage);
          }
          break;

        case 'LEAVE':
          if (currentRoom && currentUser) {
            this.leaveRoom(currentRoom, currentUser, managedCall);
          }
          break;

        case 'TYPING':
          if (currentRoom && currentUser) {
            this.broadcastToRoom(currentRoom, {
              type: 'TYPING',
              roomId: currentRoom,
              userId: currentUser.id,
              userName: currentUser.name,
              timestamp
            }, managedCall);
          }
          break;
      }
    });

    // Handle stream end
    call.on('end', () => {
      if (currentRoom && currentUser) {
        this.leaveRoom(currentRoom, currentUser, managedCall);
      }
    });

    call.on('error', (error) => {
      console.error(`Chat stream error:`, error.message);
      if (currentRoom && currentUser) {
        this.leaveRoom(currentRoom, currentUser, managedCall);
      }
    });
  }

  // Broadcast message to room
  broadcastToRoom(roomId, message, excludeCall = null) {
    const room = this.rooms.get(roomId);
    if (!room) return;

    room.forEach(stream => {
      if (stream !== excludeCall && stream.stream.isActive) {
        try {
          stream.stream.write(message);
        } catch (error) {
          console.error(`Failed to broadcast to room ${roomId}:`, error.message);
          room.delete(stream);
        }
      }
    });
  }

  // Leave room
  leaveRoom(roomId, user, stream) {
    const room = this.rooms.get(roomId);
    if (room) {
      room.delete(stream);

      console.log(`User ${user.name} left room ${roomId}`);

      // Notify other users
      this.broadcastToRoom(roomId, {
        type: 'SYSTEM',
        roomId,
        timestamp: new Date().toISOString(),
        message: `${user.name} left the room`,
        userCount: room.size
      });

      // Clean up empty rooms
      if (room.size === 0) {
        this.rooms.delete(roomId);
        console.log(`Room ${roomId} deleted (empty)`);
      }
    }

    this.users.delete(user.id);
  }

  getRoomStatus(roomId) {
    const room = this.rooms.get(roomId);
    return {
      roomId,
      userCount: room ? room.size : 0,
      exists: !!room
    };
  }

  getAllRooms() {
    const rooms = {};
    this.rooms.forEach((users, roomId) => {
      rooms[roomId] = users.size;
    });
    return rooms;
  }
}

// Streaming utility functions
class StreamingUtils {
  // Apply backpressure to slow down streaming
  static async applyBackpressure(stream, messages, delay = 100) {
    for (const message of messages) {
      if (stream.destroyed || stream.closed) {
        break;
      }

      const ready = stream.write(message);

      if (!ready) {
        console.log('Backpressure applied, waiting for drain...');
        await new Promise(resolve => {
          stream.once('drain', resolve);
        });
      }

      // Add delay between messages
      if (delay > 0) {
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }
  }

  // Create a retry mechanism for streaming operations
  static async retryStreamingOperation(operation, maxRetries = 3, delay = 1000) {
    let lastError;

    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        return await operation();
      } catch (error) {
        lastError = error;
        console.warn(`Streaming operation attempt ${attempt} failed:`, error.message);

        if (attempt < maxRetries) {
          await new Promise(resolve => setTimeout(resolve, delay * attempt));
        }
      }
    }

    throw lastError;
  }

  // Compress large messages
  static compressMessage(message) {
    const messageStr = JSON.stringify(message);
    if (messageStr.length > 1024 * 1024) { // 1MB threshold
      // In production, use proper compression like gzip
      return {
        ...message,
        compressed: true,
        originalSize: messageStr.length
      };
    }
    return message;
  }

  // Batch multiple messages
  static batchMessages(messages, batchSize = 10) {
    const batches = [];
    for (let i = 0; i < messages.length; i += batchSize) {
      batches.push(messages.slice(i, i + batchSize));
    }
    return batches;
  }
}

module.exports = {
  GrpcStreamManager,
  NotificationService,
  DataUploadService,
  ChatService,
  StreamingUtils
};

💻 gRPC Interceptors Implementation javascript

🔴 complex

Implementing authentication, logging, and monitoring interceptors for gRPC services

// gRPC Interceptors Implementation
const grpc = require('@grpc/grpc-js');
const jwt = require('jsonwebtoken');
const crypto = require('crypto');

// Base interceptor class
class BaseInterceptor {
  constructor(options = {}) {
    this.options = options;
  }

  // Interceptor method to be implemented by subclasses
  intercept(call, next) {
    throw new Error('intercept method must be implemented');
  }
}

// Authentication Interceptor
class AuthInterceptor extends BaseInterceptor {
  constructor(options = {}) {
    super(options);
    this.jwtSecret = options.jwtSecret || 'default-secret';
    this.excludePaths = options.excludePaths || [];
    this.apiKeyHeader = options.apiKeyHeader || 'x-api-key';
    this.apiKeys = new Set(options.apiKeys || []);
  }

  intercept(call, next) {
    const method = call.getMethod();
    const path = method.path;

    // Skip authentication for excluded paths
    if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
      return next();
    }

    const metadata = call.metadata.getMap();

    // Try JWT authentication
    const authorization = metadata['authorization'];
    if (authorization) {
      const token = authorization[0].replace('Bearer ', '');

      try {
        const decoded = jwt.verify(token, this.jwtSecret);
        call.call.user = {
          id: decoded.userId,
          email: decoded.email,
          roles: decoded.roles || []
        };
        return next();
      } catch (error) {
        console.error('JWT verification failed:', error.message);
      }
    }

    // Try API key authentication
    const apiKey = metadata[this.apiKeyHeader];
    if (apiKey && this.apiKeys.has(apiKey[0])) {
      call.call.user = {
        id: 'api-key-user',
        roles: ['api']
      };
      return next();
    }

    // Authentication failed
    throw {
      code: grpc.status.UNAUTHENTICATED,
      message: 'Authentication required'
    };
  }
}

// Logging Interceptor
class LoggingInterceptor extends BaseInterceptor {
  constructor(options = {}) {
    super(options);
    this.logLevel = options.logLevel || 'info';
    this.logBody = options.logBody || false;
    this.excludePaths = options.excludePaths || [];
    this.logger = options.logger || console;
  }

  intercept(call, next) {
    const method = call.getMethod();
    const path = method.path;
    const startTime = Date.now();

    // Skip logging for excluded paths
    if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
      return next();
    }

    // Log request
    const requestId = crypto.randomUUID();
    const logData = {
      requestId,
      method: path,
      type: 'request',
      timestamp: new Date().toISOString(),
      metadata: this.sanitizeMetadata(call.metadata.getMap())
    };

    if (this.logBody && call.request) {
      logData.body = this.sanitizeBody(call.request);
    }

    this.logger.log(this.logLevel, `gRPC Request: ${JSON.stringify(logData)}`);

    // Intercept response
    const originalCallback = call.call.sendMetadata.bind(call.call);
    call.call.sendMetadata = function(response, callback) {
      const duration = Date.now() - startTime;

      const responseLogData = {
        requestId,
        method: path,
        type: 'response',
        duration: duration,
        timestamp: new Date().toISOString()
      };

      if (call.interceptorResponse) {
        responseLogData.status = call.interceptorResponse.code || grpc.status.OK;
        responseLogData.message = call.interceptorResponse.message || 'Success';
      }

      logger.log(logLevel, `gRPC Response: ${JSON.stringify(responseLogData)}`);

      return originalCallback(response, callback);
    };

    // Handle streaming calls
    if (call.call.write && call.call.read) {
      this.interceptStreamingCall(call, requestId, path, startTime);
    }

    return next().catch(error => {
      const duration = Date.now() - startTime;

      const errorLogData = {
        requestId,
        method: path,
        type: 'error',
        duration: duration,
        timestamp: new Date().toISOString(),
        error: {
          code: error.code,
          message: error.message,
          details: error.details
        }
      };

      this.logger.error(`gRPC Error: ${JSON.stringify(errorLogData)}`);
      throw error;
    });
  }

  interceptStreamingCall(call, requestId, path, startTime) {
    const originalWrite = call.call.write.bind(call.call);
    const originalRead = call.call.read.bind(call.call);

    let messagesWritten = 0;
    let messagesRead = 0;

    call.call.write = function(message, callback) {
      messagesWritten++;

      if (this.logBody) {
        this.logger.log(this.logLevel, `gRPC Stream Write [${requestId}]: ${JSON.stringify({
          requestId,
          method: path,
          messageType: 'write',
          messageCount: messagesWritten,
          body: this.sanitizeBody(message)
        })}`);
      }

      return originalWrite(message, callback);
    }.bind(this);

    call.call.read = function(callback) {
      return originalRead((error, response) => {
        if (response) {
          messagesRead++;

          if (this.logBody) {
            this.logger.log(this.logLevel, `gRPC Stream Read [${requestId}]: ${JSON.stringify({
              requestId,
              method: path,
              messageType: 'read',
              messageCount: messagesRead,
              body: this.sanitizeBody(response)
            })}`);
          }
        }

        callback(error, response);
      });
    }.bind(this);
  }

  sanitizeMetadata(metadata) {
    const sanitized = {};

    Object.keys(metadata).forEach(key => {
      if (key.toLowerCase().includes('authorization') ||
          key.toLowerCase().includes('token') ||
          key.toLowerCase().includes('password') ||
          key.toLowerCase().includes('secret')) {
        sanitized[key] = '[REDACTED]';
      } else {
        sanitized[key] = metadata[key];
      }
    });

    return sanitized;
  }

  sanitizeBody(body) {
    if (!body || typeof body !== 'object') {
      return body;
    }

    const sanitized = JSON.parse(JSON.stringify(body));

    // Remove sensitive fields
    const sensitiveKeys = ['password', 'token', 'secret', 'key', 'creditCard'];

    const removeSensitiveKeys = (obj) => {
      if (typeof obj !== 'object' || obj === null) {
        return;
      }

      Object.keys(obj).forEach(key => {
        if (sensitiveKeys.some(sensitive => key.toLowerCase().includes(sensitive.toLowerCase()))) {
          obj[key] = '[REDACTED]';
        } else if (typeof obj[key] === 'object') {
          removeSensitiveKeys(obj[key]);
        }
      });
    };

    removeSensitiveKeys(sanitized);
    return sanitized;
  }
}

// Rate Limiting Interceptor
class RateLimitInterceptor extends BaseInterceptor {
  constructor(options = {}) {
    super(options);
    this.windowMs = options.windowMs || 60000; // 1 minute
    this.maxRequests = options.maxRequests || 100;
    this.requests = new Map();
    this.excludePaths = options.excludePaths || [];
  }

  intercept(call, next) {
    const method = call.getMethod();
    const path = method.path;

    // Skip rate limiting for excluded paths
    if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
      return next();
    }

    const clientId = this.getClientId(call);
    const now = Date.now();
    const windowStart = now - this.windowMs;

    // Clean old requests
    if (!this.requests.has(clientId)) {
      this.requests.set(clientId, []);
    }

    const clientRequests = this.requests.get(clientId);

    // Remove requests outside the current window
    const validRequests = clientRequests.filter(timestamp => timestamp > windowStart);
    this.requests.set(clientId, validRequests);

    // Check if rate limit exceeded
    if (validRequests.length >= this.maxRequests) {
      const headers = {
        'X-RateLimit-Limit': this.maxRequests.toString(),
        'X-RateLimit-Remaining': '0',
        'X-RateLimit-Reset': new Date(now + this.windowMs).toISOString()
      };

      throw {
        code: grpc.status.RESOURCE_EXHAUSTED,
        message: `Rate limit exceeded. Maximum ${this.maxRequests} requests per ${this.windowMs}ms allowed.`,
        metadata: headers
      };
    }

    // Add current request timestamp
    validRequests.push(now);

    const remainingRequests = this.maxRequests - validRequests.length;
    const headers = {
      'X-RateLimit-Limit': this.maxRequests.toString(),
      'X-RateLimit-Remaining': remainingRequests.toString(),
      'X-RateLimit-Reset': new Date(now + this.windowMs).toISOString()
    };

    // Add headers to response
    const originalCallback = call.call.sendMetadata.bind(call.call);
    call.call.sendMetadata = function(response, callback) {
      // Merge rate limit headers into response metadata
      if (response) {
        Object.keys(headers).forEach(key => {
          response[key] = [headers[key]];
        });
      }
      return originalCallback(response, callback);
    };

    return next();
  }

  getClientId(call) {
    // Try to get client ID from user
    if (call.call.user && call.call.user.id) {
      return `user:${call.call.user.id}`;
    }

    // Try to get client ID from IP
    const metadata = call.metadata.getMap();
    const xForwardedFor = metadata['x-forwarded-for'];
    const xRealIp = metadata['x-real-ip'];

    const ip = (xForwardedFor && xForwardedFor[0]) ||
               (xRealIp && xRealIp[0]) ||
               'unknown';

    return `ip:${ip}`;
  }
}

// Metrics Interceptor
class MetricsInterceptor extends BaseInterceptor {
  constructor(options = {}) {
    super(options);
    this.metrics = {
      totalRequests: 0,
      totalErrors: 0,
      methodMetrics: new Map(),
      responseTime: []
    };
    this.startTime = Date.now();
    this.reportInterval = options.reportInterval || 60000;

    if (options.autoReport !== false) {
      this.startReporting();
    }
  }

  intercept(call, next) {
    const method = call.getMethod();
    const path = method.path;
    const startTime = Date.now();

    this.metrics.totalRequests++;

    // Track method-specific metrics
    if (!this.metrics.methodMetrics.has(path)) {
      this.metrics.methodMetrics.set(path, {
        count: 0,
        errors: 0,
        totalResponseTime: 0
      });
    }

    const methodMetrics = this.metrics.methodMetrics.get(path);
    methodMetrics.count++;

    return next().then(response => {
      const duration = Date.now() - startTime;

      // Update metrics
      methodMetrics.totalResponseTime += duration;
      this.metrics.responseTime.push(duration);

      // Keep only last 1000 response times
      if (this.metrics.responseTime.length > 1000) {
        this.metrics.responseTime = this.metrics.responseTime.slice(-1000);
      }

      return response;
    }).catch(error => {
      const duration = Date.now() - startTime;

      // Update error metrics
      this.metrics.totalErrors++;
      methodMetrics.errors++;
      methodMetrics.totalResponseTime += duration;

      throw error;
    });
  }

  startReporting() {
    setInterval(() => {
      const report = this.generateReport();
      console.log('gRPC Metrics Report:', JSON.stringify(report, null, 2));
    }, this.reportInterval);
  }

  generateReport() {
    const now = Date.now();
    const uptime = now - this.startTime;

    // Calculate average response time
    const avgResponseTime = this.metrics.responseTime.length > 0
      ? this.metrics.responseTime.reduce((a, b) => a + b, 0) / this.metrics.responseTime.length
      : 0;

    // Calculate percentiles
    const sortedResponseTimes = [...this.metrics.responseTime].sort((a, b) => a - b);
    const p50 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.5)] || 0;
    const p95 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.95)] || 0;
    const p99 = sortedResponseTimes[Math.floor(sortedResponseTimes.length * 0.99)] || 0;

    // Calculate method statistics
    const methodStats = {};
    this.metrics.methodMetrics.forEach((metrics, method) => {
      methodStats[method] = {
        ...metrics,
        avgResponseTime: metrics.count > 0 ? metrics.totalResponseTime / metrics.count : 0,
        errorRate: metrics.count > 0 ? (metrics.errors / metrics.count) * 100 : 0
      };
    });

    return {
      uptime: uptime,
      totalRequests: this.metrics.totalRequests,
      totalErrors: this.metrics.totalErrors,
      errorRate: this.metrics.totalRequests > 0
        ? (this.metrics.totalErrors / this.metrics.totalRequests) * 100
        : 0,
      avgResponseTime: Math.round(avgResponseTime),
      p50ResponseTime: p50,
      p95ResponseTime: p95,
      p99ResponseTime: p99,
      methodStats: methodStats,
      timestamp: new Date().toISOString()
    };
  }

  getMetrics() {
    return this.generateReport();
  }
}

// Circuit Breaker Interceptor
class CircuitBreakerInterceptor extends BaseInterceptor {
  constructor(options = {}) {
    super(options);
    this.failureThreshold = options.failureThreshold || 5;
    this.recoveryTimeout = options.recoveryTimeout || 60000;
    this.monitoringPeriod = options.monitoringPeriod || 10000;
    this.excludePaths = options.excludePaths || [];

    this.circuitStates = new Map();
  }

  intercept(call, next) {
    const method = call.getMethod();
    const path = method.path;

    // Skip circuit breaking for excluded paths
    if (this.excludePaths.some(excludePath => path.includes(excludePath))) {
      return next();
    }

    const circuitState = this.getCircuitState(path);
    const now = Date.now();

    // Check if circuit is open
    if (circuitState.state === 'OPEN') {
      if (now - circuitState.lastFailureTime > this.recoveryTimeout) {
        // Try to close circuit (half-open state)
        circuitState.state = 'HALF_OPEN';
        circuitState.requestCount = 0;
      } else {
        // Circuit is still open
        throw {
          code: grpc.status.UNAVAILABLE,
          message: 'Service temporarily unavailable (circuit breaker open)'
        };
      }
    }

    // Execute request and update circuit state
    return next().then(response => {
      // Success - reset circuit
      this.recordSuccess(path);
      return response;
    }).catch(error => {
      // Failure - update circuit state
      this.recordFailure(path);
      throw error;
    });
  }

  getCircuitState(path) {
    if (!this.circuitStates.has(path)) {
      this.circuitStates.set(path, {
        state: 'CLOSED', // CLOSED, OPEN, HALF_OPEN
        failureCount: 0,
        requestCount: 0,
        lastFailureTime: 0
      });
    }
    return this.circuitStates.get(path);
  }

  recordSuccess(path) {
    const circuitState = this.getCircuitState(path);

    if (circuitState.state === 'HALF_OPEN') {
      // Close circuit after successful request in half-open state
      circuitState.state = 'CLOSED';
      circuitState.failureCount = 0;
    } else {
      // Reset failure count in closed state
      circuitState.failureCount = 0;
    }
  }

  recordFailure(path) {
    const circuitState = this.getCircuitState(path);
    const now = Date.now();

    circuitState.failureCount++;
    circuitState.lastFailureTime = now;

    if (circuitState.state === 'HALF_OPEN') {
      // Open circuit again if failure in half-open state
      circuitState.state = 'OPEN';
    } else if (circuitState.failureCount >= this.failureThreshold) {
      // Open circuit if failure threshold exceeded
      circuitState.state = 'OPEN';
    }
  }

  getCircuitStates() {
    const states = {};
    this.circuitStates.forEach((state, path) => {
      states[path] = { ...state };
    });
    return states;
  }
}

// Interceptor factory
class InterceptorFactory {
  static createAuth(options) {
    return new AuthInterceptor(options);
  }

  static createLogging(options) {
    return new LoggingInterceptor(options);
  }

  static createRateLimit(options) {
    return new RateLimitInterceptor(options);
  }

  static createMetrics(options) {
    return new MetricsInterceptor(options);
  }

  static createCircuitBreaker(options) {
    return new CircuitBreakerInterceptor(options);
  }

  // Create chained interceptor
  static chain(...interceptors) {
    return {
      intercept: (call, next) => {
        let pipeline = next;

        // Apply interceptors in reverse order (last interceptor runs first)
        for (let i = interceptors.length - 1; i >= 0; i--) {
          const interceptor = interceptors[i];
          const currentPipeline = pipeline;

          pipeline = () => interceptor.intercept(call, currentPipeline);
        }

        return pipeline();
      }
    };
  }
}

module.exports = {
  BaseInterceptor,
  AuthInterceptor,
  LoggingInterceptor,
  RateLimitInterceptor,
  MetricsInterceptor,
  CircuitBreakerInterceptor,
  InterceptorFactory
};

💻 gRPC Error Handling javascript

🔴 complex

Comprehensive error handling for gRPC services with custom error types and recovery strategies

// gRPC Error Handling Implementation
const grpc = require('@grpc/grpc-js');

// Custom gRPC error class
class GrpcError extends Error {
  constructor(code, message, details = null, metadata = {}) {
    super(message);
    this.name = 'GrpcError';
    this.code = code;
    this.details = details;
    this.metadata = metadata;
    this.timestamp = new Date().toISOString();
    this.requestId = this.generateRequestId();
  }

  generateRequestId() {
    return 'req-' + Math.random().toString(36).substr(2, 9);
  }

  toGrpcError() {
    return {
      code: this.code,
      message: this.message,
      details: this.details || this.message,
      metadata: this.metadata
    };
  }
}

// Validation error
class ValidationError extends GrpcError {
  constructor(field, value, constraint) {
    super(
      grpc.status.INVALID_ARGUMENT,
      `Validation failed for field ${field}`,
      `Field '${field}' with value '${value}' violates constraint: ${constraint}`,
      {
        field: field,
        value: String(value),
        constraint: constraint,
        errorType: 'VALIDATION_ERROR'
      }
    );
    this.field = field;
    this.value = value;
    this.constraint = constraint;
  }
}

// Business logic error
class BusinessError extends GrpcError {
  constructor(message, businessCode, details = null) {
    super(
      grpc.status.FAILED_PRECONDITION,
      message,
      details,
      {
        businessCode: businessCode,
        errorType: 'BUSINESS_ERROR'
      }
    );
    this.businessCode = businessCode;
  }
}

// Resource not found error
class NotFoundError extends GrpcError {
  constructor(resourceType, resourceId) {
    super(
      grpc.status.NOT_FOUND,
      `${resourceType} not found`,
      `${resourceType} with ID ${resourceId} does not exist`,
      {
        resourceType: resourceType,
        resourceId: String(resourceId),
        errorType: 'NOT_FOUND_ERROR'
      }
    );
    this.resourceType = resourceType;
    this.resourceId = resourceId;
  }
}

// Permission denied error
class PermissionError extends GrpcError {
  constructor(action, resource, userId) {
    super(
      grpc.status.PERMISSION_DENIED,
      `Permission denied for ${action}`,
      `User ${userId} does not have permission to perform ${action} on ${resource}`,
      {
        action: action,
        resource: resource,
        userId: String(userId),
        errorType: 'PERMISSION_ERROR'
      }
    );
    this.action = action;
    this.resource = resource;
    this.userId = userId;
  }
}

// Resource conflict error
class ConflictError extends GrpcError {
  constructor(resourceType, conflictReason) {
    super(
      grpc.status.ALREADY_EXISTS,
      `Resource conflict`,
      `${resourceType} conflict: ${conflictReason}`,
      {
        resourceType: resourceType,
        conflictReason: conflictReason,
        errorType: 'CONFLICT_ERROR'
      }
    );
    this.resourceType = resourceType;
    this.conflictReason = conflictReason;
  }
}

// Rate limit error
class RateLimitError extends GrpcError {
  constructor(limit, windowMs, retryAfter) {
    super(
      grpc.status.RESOURCE_EXHAUSTED,
      'Rate limit exceeded',
      `Rate limit of ${limit} requests per ${windowMs}ms exceeded`,
      {
        limit: String(limit),
        windowMs: String(windowMs),
        retryAfter: String(retryAfter),
        errorType: 'RATE_LIMIT_ERROR'
      }
    );
    this.limit = limit;
    this.windowMs = windowMs;
    this.retryAfter = retryAfter;
  }
}

// Service unavailable error
class ServiceUnavailableError extends GrpcError {
  constructor(serviceName, retryAfter = null) {
    super(
      grpc.status.UNAVAILABLE,
      `Service ${serviceName} is currently unavailable`,
      `The ${serviceName} service is temporarily unavailable. Please try again later.`,
      {
        serviceName: serviceName,
        errorType: 'SERVICE_UNAVAILABLE_ERROR'
      }
    );
    this.serviceName = serviceName;

    if (retryAfter) {
      this.metadata.retryAfter = String(retryAfter);
    }
  }
}

// Timeout error
class TimeoutError extends GrpcError {
  constructor(operation, timeoutMs) {
    super(
      grpc.status.DEADLINE_EXCEEDED,
      `Operation ${operation} timed out`,
      `Operation ${operation} did not complete within ${timeoutMs}ms`,
      {
        operation: operation,
        timeoutMs: String(timeoutMs),
        errorType: 'TIMEOUT_ERROR'
      }
    );
    this.operation = operation;
    this.timeoutMs = timeoutMs;
  }
}

// Error handler class
class ErrorHandler {
  constructor(options = {}) {
    this.includeStackTrace = options.includeStackTrace || false;
    this.logErrors = options.logErrors || true;
    this.errorLogger = options.errorLogger || console.error;
    this.customHandlers = new Map();
  }

  // Register custom error handler
  registerHandler(errorType, handler) {
    this.customHandlers.set(errorType, handler);
  }

  // Handle and convert errors
  handleError(error, context = {}) {
    const enhancedError = this.enhanceError(error, context);

    if (this.logErrors) {
      this.logError(enhancedError, context);
    }

    // Try custom handlers first
    if (enhancedError.metadata && enhancedError.metadata.errorType) {
      const customHandler = this.customHandlers.get(enhancedError.metadata.errorType);
      if (customHandler) {
        return customHandler(enhancedError, context);
      }
    }

    // Default handling
    return enhancedError.toGrpcError();
  }

  enhanceError(error, context) {
    // If it's already a GrpcError, just enhance context
    if (error instanceof GrpcError) {
      error.metadata = {
        ...error.metadata,
        context: context
      };
      return error;
    }

    // Convert common JavaScript errors to gRPC errors
    if (error.name === 'ValidationError' || error.message.includes('validation')) {
      return new GrpcError(
        grpc.status.INVALID_ARGUMENT,
        'Validation failed',
        error.message,
        { originalError: error.name, context }
      );
    }

    if (error.name === 'CastError' || error.message.includes('Cast to')) {
      return new GrpcError(
        grpc.status.INVALID_ARGUMENT,
        'Invalid input format',
        error.message,
        { originalError: error.name, context }
      );
    }

    if (error.name === 'MongoError' && error.code === 11000) {
      return new GrpcError(
        grpc.status.ALREADY_EXISTS,
        'Resource already exists',
        'Duplicate key violation',
        { originalError: error.name, mongoCode: error.code, context }
      );
    }

    if (error.name === 'TokenExpiredError') {
      return new GrpcError(
        grpc.status.UNAUTHENTICATED,
        'Token expired',
        'Authentication token has expired',
        { originalError: error.name, context }
      );
    }

    if (error.name === 'JsonWebTokenError') {
      return new GrpcError(
        grpc.status.UNAUTHENTICATED,
        'Invalid token',
        'Authentication token is invalid',
        { originalError: error.name, context }
      );
    }

    // Database connection errors
    if (error.message.includes('ECONNREFUSED') || error.message.includes('connection')) {
      return new ServiceUnavailableError('Database');
    }

    // Timeout errors
    if (error.message.includes('timeout') || error.code === 'TIMEOUT') {
      return new TimeoutError(context.operation || 'unknown', context.timeout || 30000);
    }

    // Default internal error
    return new GrpcError(
      grpc.status.INTERNAL,
      'Internal server error',
      this.includeStackTrace ? error.stack : error.message,
      {
        originalError: error.name,
        originalMessage: error.message,
        context,
        stackTrace: this.includeStackTrace ? error.stack : undefined
      }
    );
  }

  logError(error, context) {
    const logData = {
      requestId: error.requestId,
      timestamp: error.timestamp,
      code: error.code,
      message: error.message,
      details: error.details,
      metadata: error.metadata,
      context: context
    };

    if (this.includeStackTrace && error.stack) {
      logData.stackTrace = error.stack;
    }

    this.errorLogger('gRPC Error:', JSON.stringify(logData, null, 2));
  }
}

// Retry mechanism for failed operations
class RetryHandler {
  constructor(options = {}) {
    this.maxRetries = options.maxRetries || 3;
    this.baseDelay = options.baseDelay || 1000;
    this.maxDelay = options.maxDelay || 30000;
    this.backoffMultiplier = options.backoffMultiplier || 2;
    this.retryableErrors = options.retryableErrors || new Set([
      grpc.status.UNAVAILABLE,
      grpc.status.DEADLINE_EXCEEDED,
      grpc.status.RESOURCE_EXHAUSTED
    ]);
  }

  async executeWithRetry(operation, context = {}) {
    let lastError;

    for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
      try {
        return await operation();
      } catch (error) {
        lastError = error;

        // Check if error is retryable
        if (!this.isRetryableError(error) || attempt === this.maxRetries) {
          throw error;
        }

        // Calculate delay for next attempt
        const delay = this.calculateDelay(attempt);

        console.warn(`Attempt ${attempt} failed, retrying in ${delay}ms:`, error.message);

        // Wait before retry
        await this.sleep(delay);
      }
    }

    throw lastError;
  }

  isRetryableError(error) {
    const errorCode = error.code || error.status;
    return this.retryableErrors.has(errorCode);
  }

  calculateDelay(attempt) {
    const delay = this.baseDelay * Math.pow(this.backoffMultiplier, attempt - 1);
    const jitter = delay * 0.1 * Math.random(); // Add 10% jitter
    return Math.min(delay + jitter, this.maxDelay);
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// Circuit breaker pattern for error handling
class CircuitBreaker {
  constructor(options = {}) {
    this.failureThreshold = options.failureThreshold || 5;
    this.recoveryTimeout = options.recoveryTimeout || 60000;
    this.monitoringPeriod = options.monitoringPeriod || 10000;
    this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
    this.failureCount = 0;
    this.successCount = 0;
    this.lastFailureTime = 0;
    this.requestCount = 0;
  }

  async execute(operation) {
    if (this.state === 'OPEN') {
      if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
        this.state = 'HALF_OPEN';
        this.successCount = 0;
      } else {
        throw new ServiceUnavailableError('Circuit breaker is open', Math.ceil((this.recoveryTimeout - (Date.now() - this.lastFailureTime)) / 1000));
      }
    }

    try {
      const result = await operation();
      this.recordSuccess();
      return result;
    } catch (error) {
      this.recordFailure();
      throw error;
    }
  }

  recordSuccess() {
    this.failureCount = 0;

    if (this.state === 'HALF_OPEN') {
      this.successCount++;
      if (this.successCount >= 3) { // Require 3 consecutive successes to close circuit
        this.state = 'CLOSED';
        console.log('Circuit breaker closed after successful recovery');
      }
    }
  }

  recordFailure() {
    this.failureCount++;
    this.lastFailureTime = Date.now();

    if (this.failureCount >= this.failureThreshold) {
      this.state = 'OPEN';
      console.log(`Circuit breaker opened after ${this.failureCount} failures`);
    }
  }

  getState() {
    return {
      state: this.state,
      failureCount: this.failureCount,
      successCount: this.successCount,
      lastFailureTime: this.lastFailureTime
    };
  }
}

// Error recovery strategies
class ErrorRecovery {
  constructor() {
    this.recoveryStrategies = new Map();
    this.setupDefaultStrategies();
  }

  setupDefaultStrategies() {
    // Validation error recovery - provide helpful suggestions
    this.recoveryStrategies.set('VALIDATION_ERROR', (error, context) => {
      return {
        handled: true,
        suggestion: `Please check the ${error.field} field. Expected: ${error.constraint}`,
        correctedValue: this.suggestCorrection(error.value, error.constraint)
      };
    });

    // Rate limit error recovery - suggest retry with backoff
    this.recoveryStrategies.set('RATE_LIMIT_ERROR', (error, context) => {
      return {
        handled: true,
        suggestion: `Rate limit exceeded. Retry after ${error.retryAfter} seconds`,
        retryAfter: parseInt(error.retryAfter),
        backoffStrategy: 'exponential'
      };
    });

    // Service unavailable recovery - suggest alternative endpoints
    this.recoveryStrategies.set('SERVICE_UNAVAILABLE_ERROR', (error, context) => {
      return {
        handled: true,
        suggestion: `Service ${error.serviceName} is unavailable. Please try again later`,
        alternativeEndpoints: this.getAlternativeEndpoints(error.serviceName),
        fallbackAvailable: true
      };
    });
  }

  suggestCorrection(value, constraint) {
    // Simple correction suggestions
    if (constraint.includes('email') && !value.includes('@')) {
      return `${value}@example.com`;
    }

    if (constraint.includes('positive') && isNaN(value)) {
      return '0';
    }

    return null;
  }

  getAlternativeEndpoints(serviceName) {
    const alternatives = {
      'user-service': ['user-service-backup', 'user-service-v2'],
      'order-service': ['order-service-cluster', 'legacy-order-service'],
      'payment-service': ['payment-service-primary', 'payment-service-secondary']
    };

    return alternatives[serviceName] || [];
  }

  registerStrategy(errorType, strategy) {
    this.recoveryStrategies.set(errorType, strategy);
  }

  attemptRecovery(error, context) {
    if (error.metadata && error.metadata.errorType) {
      const strategy = this.recoveryStrategies.get(error.metadata.errorType);
      if (strategy) {
        return strategy(error, context);
      }
    }

    return {
      handled: false,
      suggestion: 'No recovery strategy available for this error type'
    };
  }
}

// Utility functions for error handling
const ErrorUtils = {
  // Convert gRPC status code to string
  getStatusString: (code) => {
    const statusStrings = {
      [grpc.status.OK]: 'OK',
      [grpc.status.CANCELLED]: 'CANCELLED',
      [grpc.status.UNKNOWN]: 'UNKNOWN',
      [grpc.status.INVALID_ARGUMENT]: 'INVALID_ARGUMENT',
      [grpc.status.DEADLINE_EXCEEDED]: 'DEADLINE_EXCEEDED',
      [grpc.status.NOT_FOUND]: 'NOT_FOUND',
      [grpc.status.ALREADY_EXISTS]: 'ALREADY_EXISTS',
      [grpc.status.PERMISSION_DENIED]: 'PERMISSION_DENIED',
      [grpc.status.UNAUTHENTICATED]: 'UNAUTHENTICATED',
      [grpc.status.RESOURCE_EXHAUSTED]: 'RESOURCE_EXHAUSTED',
      [grpc.status.FAILED_PRECONDITION]: 'FAILED_PRECONDITION',
      [grpc.status.ABORTED]: 'ABORTED',
      [grpc.status.OUT_OF_RANGE]: 'OUT_OF_RANGE',
      [grpc.status.UNIMPLEMENTED]: 'UNIMPLEMENTED',
      [grpc.status.INTERNAL]: 'INTERNAL',
      [grpc.status.UNAVAILABLE]: 'UNAVAILABLE',
      [grpc.status.DATA_LOSS]: 'DATA_LOSS'
    };

    return statusStrings[code] || `UNKNOWN_STATUS_${code}`;
  },

  // Check if error is retryable
  isRetryableError: (error) => {
    const retryableCodes = new Set([
      grpc.status.UNAVAILABLE,
      grpc.status.DEADLINE_EXCEEDED,
      grpc.status.RESOURCE_EXHAUSTED,
      grpc.status.ABORTED
    ]);

    return retryableCodes.has(error.code || error.status);
  },

  // Extract error information for logging
  extractErrorInfo: (error, context = {}) => {
    return {
      code: error.code || error.status,
      message: error.message,
      details: error.details,
      requestId: error.requestId,
      timestamp: error.timestamp || new Date().toISOString(),
      context: context,
      metadata: error.metadata || {},
      stackTrace: error.stack
    };
  },

  // Create error response for client
  createErrorResponse: (error, includeDetails = false) => {
    const response = {
      success: false,
      error: {
        code: error.code,
        message: error.message
      }
    };

    if (includeDetails) {
      response.error.details = error.details;
      response.error.metadata = error.metadata;
      response.error.requestId = error.requestId;
      response.error.timestamp = error.timestamp;
    }

    return response;
  }
};

module.exports = {
  GrpcError,
  ValidationError,
  BusinessError,
  NotFoundError,
  PermissionError,
  ConflictError,
  RateLimitError,
  ServiceUnavailableError,
  TimeoutError,
  ErrorHandler,
  RetryHandler,
  CircuitBreaker,
  ErrorRecovery,
  ErrorUtils
};