NATS Message System Beispiele

NATS Messaging-System Beispiele einschließlich Pub/Sub, Request-Reply, Queue-Groups und JetStream für moderne Cloud-Native-Anwendungen

💻 NATS Hello World - Basic Pub/Sub javascript

🟢 simple

Einfaches NATS Publisher und Subscriber Beispiel zur Demonstration des Basic Publish/Subscribe Patterns

⏱️ 10 min 🏷️ nats, messaging, beginner
Prerequisites: NATS server running, Node.js
// NATS Hello World - Basic Pub/Sub Example
// Install: npm install nats

const { connect, StringCodec } = require('nats');

// Create a codec for string messages
const sc = StringCodec();

async function main() {
  // Connect to NATS server
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'nats-hello-world-example'
  });

  console.log('Connected to NATS server');

  // Simple subscriber
  const sub = nc.subscribe('hello.world');
  (async () => {
    for await (const m of sub) {
      console.log(`Received message: ${sc.decode(m.data)}`);
    }
  })();

  // Wait a moment for subscription to be ready
  await new Promise(resolve => setTimeout(resolve, 100));

  // Publish messages
  console.log('Publishing messages...');
  nc.publish('hello.world', sc.encode('Hello NATS!'));
  nc.publish('hello.world', sc.encode('NATS is awesome!'));
  nc.publish('hello.world', sc.encode('Pub/Sub is easy!'));

  // Wait for messages to be received
  await new Promise(resolve => setTimeout(resolve, 1000));

  // Close connection
  await nc.drain();
  await nc.close();
  console.log('Connection closed');
}

main().catch(err => {
  console.error('Error:', err);
  process.exit(1);
});

💻 NATS Request-Reply Pattern javascript

🟡 intermediate ⭐⭐⭐

Request-Reply-Kommunikationsmuster mit NATS für synchrone Nachrichtenübermittlung zwischen Diensten implementieren

⏱️ 15 min 🏷️ nats, request-reply, rpc
Prerequisites: NATS server running, Node.js, Basic NATS knowledge
// NATS Request-Reply Pattern Example
// Demonstrates synchronous request-response communication

const { connect, StringCodec } = require('nats');

const sc = StringCodec();

// Responder service that answers requests
async function responder() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'responder-service'
  });

  console.log('Responder connected');

  // Subscribe to requests with a specific subject
  const sub = nc.subscribe('time.request');

  (async () => {
    for await (const m of sub) {
      console.log(`Received request: ${sc.decode(m.data)}`);

      // Respond with current time
      const response = {
        timestamp: new Date().toISOString(),
        service: 'time-service',
        request: sc.decode(m.data)
      };

      m.respond(sc.encode(JSON.stringify(response)));
      console.log('Sent response');
    }
  })();

  return nc;
}

// Requester that sends requests and waits for responses
async function requester() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'requester-client'
  });

  console.log('Requester connected');

  try {
    // Send request and wait for response (with timeout)
    console.log('Sending time request...');
    const response = await nc.request(
      'time.request',
      sc.encode('What time is it?'),
      { timeout: 2000 }
    );

    const responseData = JSON.parse(sc.decode(response.data));
    console.log('Response received:', responseData);
  } catch (err) {
    console.error('Request failed:', err.message);
  }

  await nc.close();
}

// Main function to coordinate both sides
async function main() {
  console.log('Starting NATS Request-Reply Example');

  // Start responder in background
  const responderNC = await responder();

  // Wait for responder to be ready
  await new Promise(resolve => setTimeout(resolve, 500));

  // Run requester
  await requester();

  // Close responder
  await responderNC.close();
}

main().catch(err => {
  console.error('Error:', err);
});

💻 NATS Queue-Groups - Load Balancing javascript

🟡 intermediate ⭐⭐⭐

Queue-Groups für Lastausgleich von Nachrichten über mehrere Subscriber implementieren

⏱️ 20 min 🏷️ nats, queue, load-balancing
Prerequisites: NATS server running, Node.js, NATS pub/sub basics
// NATS Queue Groups - Load Balancing Example
// Multiple workers share the workload

const { connect, StringCodec } = require('nats');

const sc = StringCodec();

// Worker function that processes tasks
async function createWorker(workerId) {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: `worker-${workerId}`
  });

  console.log(`Worker ${workerId} connected`);

  // Create queue subscription
  // All workers with same queue name share messages
  const sub = nc.subscribe('tasks.process', { queue: 'workers' });

  (async () => {
    for await (const m of sub) {
      const task = JSON.parse(sc.decode(m.data));
      console.log(`Worker ${workerId} processing task: ${task.id}`);

      // Simulate work
      await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));

      console.log(`Worker ${workerId} completed task: ${task.id}`);

      // Send completion notification
      const completion = {
        taskId: task.id,
        workerId: workerId,
        completedAt: new Date().toISOString(),
        result: `Task ${task.id} processed by worker ${workerId}`
      };

      nc.publish('tasks.completed', sc.encode(JSON.stringify(completion)));
    }
  })();

  return nc;
}

// Task producer that creates work
async function taskProducer() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'task-producer'
  });

  console.log('Task producer connected');

  // Subscribe to completion notifications
  nc.subscribe('tasks.completed', {
    callback: (err, msg) => {
      if (err) return;
      const completion = JSON.parse(sc.decode(msg.data));
      console.log(`✓ Task completed: ${completion.taskId} by ${completion.workerId}`);
    }
  });

  // Produce tasks
  for (let i = 1; i <= 10; i++) {
    const task = {
      id: `task-${i}`,
      type: 'processing',
      createdAt: new Date().toISOString(),
      data: `Task data for task ${i}`
    };

    console.log(`Publishing task: ${task.id}`);
    nc.publish('tasks.process', sc.encode(JSON.stringify(task)));

    // Small delay between tasks
    await new Promise(resolve => setTimeout(resolve, 200));
  }

  await new Promise(resolve => setTimeout(resolve, 5000));
  await nc.close();
}

// Monitor to observe load balancing
async function monitor() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'monitor'
  });

  console.log('Monitor connected');

  // Listen to all task distribution
  const sub = nc.subscribe('tasks.*');

  (async () => {
    for await (const m of sub) {
      if (m.subject === 'tasks.process') {
        const task = JSON.parse(sc.decode(m.data));
        console.log(`📤 Task dispatched: ${task.id}`);
      }
    }
  })();
}

async function main() {
  console.log('Starting NATS Queue Groups Example');

  // Start monitor
  const monitorNC = await monitor();

  // Start multiple workers
  const workers = [];
  for (let i = 1; i <= 3; i++) {
    workers.push(await createWorker(i));
  }

  // Wait for workers to be ready
  await new Promise(resolve => setTimeout(resolve, 1000));

  // Run task producer
  await taskProducer();

  // Close all connections
  await monitorNC.close();
  for (const worker of workers) {
    await worker.close();
  }
}

main().catch(err => {
  console.error('Error:', err);
});

💻 NATS Wildcard Subscriptions javascript

🟡 intermediate ⭐⭐⭐

Erweiterte Subscription-Patterns mit Wildcards für flexibles Message-Routing

⏱️ 15 min 🏷️ nats, wildcards, routing
Prerequisites: NATS server running, Node.js, Basic NATS knowledge
// NATS Wildcard Subscriptions Example
// Demonstrates single (*) and multi-level (>) wildcards

const { connect, StringCodec } = require('nats');

const sc = StringCodec();

// Service logs collector
async function logsCollector() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'logs-collector'
  });

  console.log('Logs collector connected');

  // Single wildcard (*) matches exactly one token
  // logs.service.* matches: logs.service.auth, logs.service.db, logs.service.api
  const serviceSub = nc.subscribe('logs.service.*');

  (async () => {
    for await (const m of serviceSub) {
      const parts = m.subject.split('.');
      const service = parts[2];
      const logData = sc.decode(m.data);
      console.log(`[SERVICE LOG] ${service}: ${logData}`);
    }
  })();

  // Multi-level wildcard (>) matches one or more tokens
  // logs.> matches: logs.service.auth, logs.service.db.error, logs.api.v1.users
  const allLogsSub = nc.subscribe('logs.>');

  (async () => {
    for await (const m of allLogsSub) {
      const logData = sc.decode(m.data);
      console.log(`[ALL LOGS] ${m.subject}: ${logData}`);
    }
  })();

  return nc;
}

// API event monitor
async function apiMonitor() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'api-monitor'
  });

  console.log('API monitor connected');

  // Monitor all API events
  // api.> matches: api.v1.users, api.v2.auth.login, api.v1.orders.create
  const apiSub = nc.subscribe('api.>');

  (async () => {
    for await (const m of apiSub) {
      const parts = m.subject.split('.');
      const version = parts[1];
      const resource = parts.slice(2).join('.');
      const eventData = sc.decode(m.data);
      console.log(`[API] v${version} ${resource}: ${eventData}`);
    }
  })();

  return nc;
}

// Metrics collector
async function metricsCollector() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'metrics-collector'
  });

  console.log('Metrics collector connected');

  // Collect CPU metrics from any host
  // metrics.cpu.* matches: metrics.cpu.host1, metrics.cpu.host2, metrics.cpu.host3
  const cpuSub = nc.subscribe('metrics.cpu.*');

  (async () => {
    for await (const m of cpuSub) {
      const parts = m.subject.split('.');
      const host = parts[2];
      const metricData = JSON.parse(sc.decode(m.data));
      console.log(`[CPU METRIC] ${host}: ${metricData.usage}%`);
    }
  })();

  // Collect memory metrics with specific pattern
  // metrics.memory.host1.error for errors
  // metrics.memory.host1.warning for warnings
  const memorySub = nc.subscribe('metrics.memory.*');

  (async () => {
    for await (const m of memorySub) {
      const parts = m.subject.split('.');
      const host = parts[2];
      const metricData = JSON.parse(sc.decode(m.data));
      console.log(`[MEMORY METRIC] ${host}: ${metricData.used}MB / ${metricData.total}MB`);
    }
  })();

  return nc;
}

// Event producer that generates various messages
async function eventProducer() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'event-producer'
  });

  console.log('Event producer connected');

  // Service logs
  console.log('Publishing service logs...');
  nc.publish('logs.service.auth', sc.encode('User authentication successful'));
  nc.publish('logs.service.db', sc.encode('Database connection established'));
  nc.publish('logs.service.cache', sc.encode('Cache refreshed'));

  // API events
  console.log('Publishing API events...');
  nc.publish('api.v1.users.create', sc.encode('New user created'));
  nc.publish('api.v2.auth.login', sc.encode('User logged in'));
  nc.publish('api.v1.orders.process', sc.encode('Order processed'));

  // Metrics
  console.log('Publishing metrics...');
  nc.publish('metrics.cpu.host1', sc.encode(JSON.stringify({ usage: 45.2, timestamp: Date.now() })));
  nc.publish('metrics.cpu.host2', sc.encode(JSON.stringify({ usage: 67.8, timestamp: Date.now() })));
  nc.publish('metrics.memory.host1', sc.encode(JSON.stringify({ used: 2048, total: 4096 })));

  // Nested logs
  console.log('Publishing nested logs...');
  nc.publish('logs.service.db.error', sc.encode('Connection timeout'));
  nc.publish('logs.api.v1.users.warning', sc.encode('Rate limit approaching'));

  // Wait for messages to be processed
  await new Promise(resolve => setTimeout(resolve, 2000));

  await nc.close();
}

// Main function
async function main() {
  console.log('Starting NATS Wildcard Subscriptions Example');

  // Start subscribers
  const logsNC = await logsCollector();
  const apiNC = await apiMonitor();
  const metricsNC = await metricsCollector();

  // Wait for subscriptions to be ready
  await new Promise(resolve => setTimeout(resolve, 1000));

  // Produce events
  await eventProducer();

  // Close connections
  await logsNC.close();
  await apiNC.close();
  await metricsNC.close();

  console.log('Wildcard subscriptions example completed');
}

main().catch(err => {
  console.error('Error:', err);
});

💻 NATS JetStream - Persistent Streaming javascript

🔴 complex ⭐⭐⭐⭐

NATS JetStream für persistentes Message-Streaming mit Durability- und Replay-Fähigkeiten verwenden

⏱️ 25 min 🏷️ nats, jetstream, streaming, persistence
Prerequisites: NATS server with JetStream enabled, Node.js, Advanced NATS knowledge
// NATS JetStream - Persistent Streaming Example
// Features: Streams, Consumers, Persistent storage

const { connect, StringCodec, JSONCodec } = require('nats');

const sc = StringCodec();
const jc = JSONCodec();

// Setup JetStream with stream and consumer
async function setupJetStream() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'jetstream-setup'
  });

  // Create JetStream context
  const jsm = await nc.jetstreamManager();

  try {
    // Create a stream
    await jsm.streams.add({
      name: 'EVENTS',
      subjects: ['events.>'],
      retention: 'workqueue', // Messages removed after ack
      storage: 'file', // Persistent storage
      replicas: 1,
      max_age: 60 * 60 * 24 * 7, // 7 days
    });
    console.log('Stream EVENTS created');
  } catch (err) {
    if (!err.message.includes('stream name already in use')) {
      throw err;
    }
    console.log('Stream EVENTS already exists');
  }

  try {
    // Create a durable consumer
    await jsm.consumers.add('EVENTS', {
      name: 'event-processor',
      durable_name: 'event-processor',
      ack_policy: 'explicit',
      ack_wait: 30 * 1000, // 30 seconds
      max_deliver: 3,
      filter_subject: 'events.>',
    });
    console.log('Consumer event-processor created');
  } catch (err) {
    if (!err.message.includes('consumer already exists')) {
      throw err;
    }
    console.log('Consumer event-processor already exists');
  }

  await nc.close();
}

// Event producer using JetStream
async function eventProducer() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'event-producer'
  });

  // Get JetStream context
  const js = nc.jetstream();

  console.log('Event producer connected with JetStream');

  // Publish events to stream
  const events = [
    { type: 'user.created', userId: 'user123', email: '[email protected]' },
    { type: 'order.placed', orderId: 'order456', amount: 99.99 },
    { type: 'payment.processed', paymentId: 'pay789', status: 'success' },
    { type: 'notification.sent', userId: 'user123', channel: 'email' },
  ];

  for (let i = 0; i < events.length; i++) {
    const event = {
      id: `event-${Date.now()}-${i}`,
      timestamp: new Date().toISOString(),
      ...events[i]
    };

    console.log(`Publishing event: ${event.type}`);

    // Publish to JetStream
    const pa = await js.publish(
      `events.${event.type}`,
      jc.encode(event)
    );

    console.log(`Event published to stream: ${pa.stream}, seq: ${pa.seq}`);
  }

  await nc.close();
}

// Event consumer using JetStream
async function eventConsumer() {
  const nc = await connect({
    servers: 'nats://localhost:4222',
    name: 'event-consumer'
  });

  // Get JetStream context
  const js = nc.jetstream();

  console.log('Event consumer connected with JetStream');

  // Consume messages using the consumer
  const consumer = await js.consumers.get('EVENTS', 'event-processor');

  console.log('Starting to consume messages...');

  // Consume messages
  const messages = await consumer.consume({ max_messages: 10 });

  for await (const m of messages) {
    try {
      const event = jc.decode(m.data);
      console.log(`Processing event ${m.seq}: ${event.type}`);
      console.log(`Event data: ${JSON.stringify(event, null, 2)}`);

      // Simulate processing
      await new Promise(resolve => setTimeout(resolve, 100));

      // Acknowledge message
      await m.ack();
      console.log(`✓ Event ${m.seq} acknowledged`);

    } catch (err) {
      console.error(`Error processing event ${m.seq}:`, err);
      // Negative acknowledgment
      await m.nak();
    }
  }

  await nc.close();
}

// Main function
async function main() {
  console.log('Starting NATS JetStream Example');

  try {
    // Setup JetStream
    await setupJetStream();

    // Wait a moment for setup to complete
    await new Promise(resolve => setTimeout(resolve, 1000));

    // Start producer
    await eventProducer();

    // Start consumer
    await eventConsumer();

    console.log('JetStream example completed');
  } catch (err) {
    console.error('Error:', err);
  }
}

main().catch(err => {
  console.error('Error:', err);
});