🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples de NATS Message System
Exemples du système de messagerie NATS incluant pub/sub, request-reply, groupes de file et JetStream pour les applications modernes cloud-native
💻 NATS Hello World - Pub/Sub de Base javascript
🟢 simple
⭐
Exemple simple d'éditeur et d'abonné NATS démontrant le modèle de base publication/abonnement
⏱️ 10 min
🏷️ nats, messaging, beginner
Prerequisites:
NATS server running, Node.js
// NATS Hello World - Basic Pub/Sub Example
// Install: npm install nats
const { connect, StringCodec } = require('nats');
// Create a codec for string messages
const sc = StringCodec();
async function main() {
// Connect to NATS server
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'nats-hello-world-example'
});
console.log('Connected to NATS server');
// Simple subscriber
const sub = nc.subscribe('hello.world');
(async () => {
for await (const m of sub) {
console.log(`Received message: ${sc.decode(m.data)}`);
}
})();
// Wait a moment for subscription to be ready
await new Promise(resolve => setTimeout(resolve, 100));
// Publish messages
console.log('Publishing messages...');
nc.publish('hello.world', sc.encode('Hello NATS!'));
nc.publish('hello.world', sc.encode('NATS is awesome!'));
nc.publish('hello.world', sc.encode('Pub/Sub is easy!'));
// Wait for messages to be received
await new Promise(resolve => setTimeout(resolve, 1000));
// Close connection
await nc.drain();
await nc.close();
console.log('Connection closed');
}
main().catch(err => {
console.error('Error:', err);
process.exit(1);
});
💻 Modèle Requête-Réponse NATS javascript
🟡 intermediate
⭐⭐⭐
Implémenter une communication requête-réponse avec NATS pour messagerie synchrone entre services
⏱️ 15 min
🏷️ nats, request-reply, rpc
Prerequisites:
NATS server running, Node.js, Basic NATS knowledge
// NATS Request-Reply Pattern Example
// Demonstrates synchronous request-response communication
const { connect, StringCodec } = require('nats');
const sc = StringCodec();
// Responder service that answers requests
async function responder() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'responder-service'
});
console.log('Responder connected');
// Subscribe to requests with a specific subject
const sub = nc.subscribe('time.request');
(async () => {
for await (const m of sub) {
console.log(`Received request: ${sc.decode(m.data)}`);
// Respond with current time
const response = {
timestamp: new Date().toISOString(),
service: 'time-service',
request: sc.decode(m.data)
};
m.respond(sc.encode(JSON.stringify(response)));
console.log('Sent response');
}
})();
return nc;
}
// Requester that sends requests and waits for responses
async function requester() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'requester-client'
});
console.log('Requester connected');
try {
// Send request and wait for response (with timeout)
console.log('Sending time request...');
const response = await nc.request(
'time.request',
sc.encode('What time is it?'),
{ timeout: 2000 }
);
const responseData = JSON.parse(sc.decode(response.data));
console.log('Response received:', responseData);
} catch (err) {
console.error('Request failed:', err.message);
}
await nc.close();
}
// Main function to coordinate both sides
async function main() {
console.log('Starting NATS Request-Reply Example');
// Start responder in background
const responderNC = await responder();
// Wait for responder to be ready
await new Promise(resolve => setTimeout(resolve, 500));
// Run requester
await requester();
// Close responder
await responderNC.close();
}
main().catch(err => {
console.error('Error:', err);
});
💻 Groupes de File NATS - Équilibrage de Charge javascript
🟡 intermediate
⭐⭐⭐
Implémenter des groupes de file pour équilibrer les messages à travers plusieurs abonnés
⏱️ 20 min
🏷️ nats, queue, load-balancing
Prerequisites:
NATS server running, Node.js, NATS pub/sub basics
// NATS Queue Groups - Load Balancing Example
// Multiple workers share the workload
const { connect, StringCodec } = require('nats');
const sc = StringCodec();
// Worker function that processes tasks
async function createWorker(workerId) {
const nc = await connect({
servers: 'nats://localhost:4222',
name: `worker-${workerId}`
});
console.log(`Worker ${workerId} connected`);
// Create queue subscription
// All workers with same queue name share messages
const sub = nc.subscribe('tasks.process', { queue: 'workers' });
(async () => {
for await (const m of sub) {
const task = JSON.parse(sc.decode(m.data));
console.log(`Worker ${workerId} processing task: ${task.id}`);
// Simulate work
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
console.log(`Worker ${workerId} completed task: ${task.id}`);
// Send completion notification
const completion = {
taskId: task.id,
workerId: workerId,
completedAt: new Date().toISOString(),
result: `Task ${task.id} processed by worker ${workerId}`
};
nc.publish('tasks.completed', sc.encode(JSON.stringify(completion)));
}
})();
return nc;
}
// Task producer that creates work
async function taskProducer() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'task-producer'
});
console.log('Task producer connected');
// Subscribe to completion notifications
nc.subscribe('tasks.completed', {
callback: (err, msg) => {
if (err) return;
const completion = JSON.parse(sc.decode(msg.data));
console.log(`✓ Task completed: ${completion.taskId} by ${completion.workerId}`);
}
});
// Produce tasks
for (let i = 1; i <= 10; i++) {
const task = {
id: `task-${i}`,
type: 'processing',
createdAt: new Date().toISOString(),
data: `Task data for task ${i}`
};
console.log(`Publishing task: ${task.id}`);
nc.publish('tasks.process', sc.encode(JSON.stringify(task)));
// Small delay between tasks
await new Promise(resolve => setTimeout(resolve, 200));
}
await new Promise(resolve => setTimeout(resolve, 5000));
await nc.close();
}
// Monitor to observe load balancing
async function monitor() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'monitor'
});
console.log('Monitor connected');
// Listen to all task distribution
const sub = nc.subscribe('tasks.*');
(async () => {
for await (const m of sub) {
if (m.subject === 'tasks.process') {
const task = JSON.parse(sc.decode(m.data));
console.log(`📤 Task dispatched: ${task.id}`);
}
}
})();
}
async function main() {
console.log('Starting NATS Queue Groups Example');
// Start monitor
const monitorNC = await monitor();
// Start multiple workers
const workers = [];
for (let i = 1; i <= 3; i++) {
workers.push(await createWorker(i));
}
// Wait for workers to be ready
await new Promise(resolve => setTimeout(resolve, 1000));
// Run task producer
await taskProducer();
// Close all connections
await monitorNC.close();
for (const worker of workers) {
await worker.close();
}
}
main().catch(err => {
console.error('Error:', err);
});
💻 Abonnements avec Caractères Génériques NATS javascript
🟡 intermediate
⭐⭐⭐
Modèles d'abonnement avancés utilisant des caractères génériques pour un routage flexible des messages
⏱️ 15 min
🏷️ nats, wildcards, routing
Prerequisites:
NATS server running, Node.js, Basic NATS knowledge
// NATS Wildcard Subscriptions Example
// Demonstrates single (*) and multi-level (>) wildcards
const { connect, StringCodec } = require('nats');
const sc = StringCodec();
// Service logs collector
async function logsCollector() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'logs-collector'
});
console.log('Logs collector connected');
// Single wildcard (*) matches exactly one token
// logs.service.* matches: logs.service.auth, logs.service.db, logs.service.api
const serviceSub = nc.subscribe('logs.service.*');
(async () => {
for await (const m of serviceSub) {
const parts = m.subject.split('.');
const service = parts[2];
const logData = sc.decode(m.data);
console.log(`[SERVICE LOG] ${service}: ${logData}`);
}
})();
// Multi-level wildcard (>) matches one or more tokens
// logs.> matches: logs.service.auth, logs.service.db.error, logs.api.v1.users
const allLogsSub = nc.subscribe('logs.>');
(async () => {
for await (const m of allLogsSub) {
const logData = sc.decode(m.data);
console.log(`[ALL LOGS] ${m.subject}: ${logData}`);
}
})();
return nc;
}
// API event monitor
async function apiMonitor() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'api-monitor'
});
console.log('API monitor connected');
// Monitor all API events
// api.> matches: api.v1.users, api.v2.auth.login, api.v1.orders.create
const apiSub = nc.subscribe('api.>');
(async () => {
for await (const m of apiSub) {
const parts = m.subject.split('.');
const version = parts[1];
const resource = parts.slice(2).join('.');
const eventData = sc.decode(m.data);
console.log(`[API] v${version} ${resource}: ${eventData}`);
}
})();
return nc;
}
// Metrics collector
async function metricsCollector() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'metrics-collector'
});
console.log('Metrics collector connected');
// Collect CPU metrics from any host
// metrics.cpu.* matches: metrics.cpu.host1, metrics.cpu.host2, metrics.cpu.host3
const cpuSub = nc.subscribe('metrics.cpu.*');
(async () => {
for await (const m of cpuSub) {
const parts = m.subject.split('.');
const host = parts[2];
const metricData = JSON.parse(sc.decode(m.data));
console.log(`[CPU METRIC] ${host}: ${metricData.usage}%`);
}
})();
// Collect memory metrics with specific pattern
// metrics.memory.host1.error for errors
// metrics.memory.host1.warning for warnings
const memorySub = nc.subscribe('metrics.memory.*');
(async () => {
for await (const m of memorySub) {
const parts = m.subject.split('.');
const host = parts[2];
const metricData = JSON.parse(sc.decode(m.data));
console.log(`[MEMORY METRIC] ${host}: ${metricData.used}MB / ${metricData.total}MB`);
}
})();
return nc;
}
// Event producer that generates various messages
async function eventProducer() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'event-producer'
});
console.log('Event producer connected');
// Service logs
console.log('Publishing service logs...');
nc.publish('logs.service.auth', sc.encode('User authentication successful'));
nc.publish('logs.service.db', sc.encode('Database connection established'));
nc.publish('logs.service.cache', sc.encode('Cache refreshed'));
// API events
console.log('Publishing API events...');
nc.publish('api.v1.users.create', sc.encode('New user created'));
nc.publish('api.v2.auth.login', sc.encode('User logged in'));
nc.publish('api.v1.orders.process', sc.encode('Order processed'));
// Metrics
console.log('Publishing metrics...');
nc.publish('metrics.cpu.host1', sc.encode(JSON.stringify({ usage: 45.2, timestamp: Date.now() })));
nc.publish('metrics.cpu.host2', sc.encode(JSON.stringify({ usage: 67.8, timestamp: Date.now() })));
nc.publish('metrics.memory.host1', sc.encode(JSON.stringify({ used: 2048, total: 4096 })));
// Nested logs
console.log('Publishing nested logs...');
nc.publish('logs.service.db.error', sc.encode('Connection timeout'));
nc.publish('logs.api.v1.users.warning', sc.encode('Rate limit approaching'));
// Wait for messages to be processed
await new Promise(resolve => setTimeout(resolve, 2000));
await nc.close();
}
// Main function
async function main() {
console.log('Starting NATS Wildcard Subscriptions Example');
// Start subscribers
const logsNC = await logsCollector();
const apiNC = await apiMonitor();
const metricsNC = await metricsCollector();
// Wait for subscriptions to be ready
await new Promise(resolve => setTimeout(resolve, 1000));
// Produce events
await eventProducer();
// Close connections
await logsNC.close();
await apiNC.close();
await metricsNC.close();
console.log('Wildcard subscriptions example completed');
}
main().catch(err => {
console.error('Error:', err);
});
💻 NATS JetStream - Streaming Persistant javascript
🔴 complex
⭐⭐⭐⭐
Utiliser NATS JetStream pour le streaming de messages persistant avec capacités de durabilité et relecture
⏱️ 25 min
🏷️ nats, jetstream, streaming, persistence
Prerequisites:
NATS server with JetStream enabled, Node.js, Advanced NATS knowledge
// NATS JetStream - Persistent Streaming Example
// Features: Streams, Consumers, Persistent storage
const { connect, StringCodec, JSONCodec } = require('nats');
const sc = StringCodec();
const jc = JSONCodec();
// Setup JetStream with stream and consumer
async function setupJetStream() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'jetstream-setup'
});
// Create JetStream context
const jsm = await nc.jetstreamManager();
try {
// Create a stream
await jsm.streams.add({
name: 'EVENTS',
subjects: ['events.>'],
retention: 'workqueue', // Messages removed after ack
storage: 'file', // Persistent storage
replicas: 1,
max_age: 60 * 60 * 24 * 7, // 7 days
});
console.log('Stream EVENTS created');
} catch (err) {
if (!err.message.includes('stream name already in use')) {
throw err;
}
console.log('Stream EVENTS already exists');
}
try {
// Create a durable consumer
await jsm.consumers.add('EVENTS', {
name: 'event-processor',
durable_name: 'event-processor',
ack_policy: 'explicit',
ack_wait: 30 * 1000, // 30 seconds
max_deliver: 3,
filter_subject: 'events.>',
});
console.log('Consumer event-processor created');
} catch (err) {
if (!err.message.includes('consumer already exists')) {
throw err;
}
console.log('Consumer event-processor already exists');
}
await nc.close();
}
// Event producer using JetStream
async function eventProducer() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'event-producer'
});
// Get JetStream context
const js = nc.jetstream();
console.log('Event producer connected with JetStream');
// Publish events to stream
const events = [
{ type: 'user.created', userId: 'user123', email: '[email protected]' },
{ type: 'order.placed', orderId: 'order456', amount: 99.99 },
{ type: 'payment.processed', paymentId: 'pay789', status: 'success' },
{ type: 'notification.sent', userId: 'user123', channel: 'email' },
];
for (let i = 0; i < events.length; i++) {
const event = {
id: `event-${Date.now()}-${i}`,
timestamp: new Date().toISOString(),
...events[i]
};
console.log(`Publishing event: ${event.type}`);
// Publish to JetStream
const pa = await js.publish(
`events.${event.type}`,
jc.encode(event)
);
console.log(`Event published to stream: ${pa.stream}, seq: ${pa.seq}`);
}
await nc.close();
}
// Event consumer using JetStream
async function eventConsumer() {
const nc = await connect({
servers: 'nats://localhost:4222',
name: 'event-consumer'
});
// Get JetStream context
const js = nc.jetstream();
console.log('Event consumer connected with JetStream');
// Consume messages using the consumer
const consumer = await js.consumers.get('EVENTS', 'event-processor');
console.log('Starting to consume messages...');
// Consume messages
const messages = await consumer.consume({ max_messages: 10 });
for await (const m of messages) {
try {
const event = jc.decode(m.data);
console.log(`Processing event ${m.seq}: ${event.type}`);
console.log(`Event data: ${JSON.stringify(event, null, 2)}`);
// Simulate processing
await new Promise(resolve => setTimeout(resolve, 100));
// Acknowledge message
await m.ack();
console.log(`✓ Event ${m.seq} acknowledged`);
} catch (err) {
console.error(`Error processing event ${m.seq}:`, err);
// Negative acknowledgment
await m.nak();
}
}
await nc.close();
}
// Main function
async function main() {
console.log('Starting NATS JetStream Example');
try {
// Setup JetStream
await setupJetStream();
// Wait a moment for setup to complete
await new Promise(resolve => setTimeout(resolve, 1000));
// Start producer
await eventProducer();
// Start consumer
await eventConsumer();
console.log('JetStream example completed');
} catch (err) {
console.error('Error:', err);
}
}
main().catch(err => {
console.error('Error:', err);
});