Server-Sent Events (SSE)

Server-Sent Events examples for real-time unidirectional communication from server to client, including live updates, notifications, and streaming data

💻 Server-Sent Events Basics javascript

🟢 simple ⭐⭐

Introduction to SSE with basic server implementation and client consumption patterns

⏱️ 15 min 🏷️ sse, real-time, streaming, notifications
Prerequisites: Node.js, Express.js, HTTP basics
// Server-Sent Events (SSE) Basic Examples
// Demonstrates real-time server-to-client communication

const express = require('express');
const cors = require('cors');
const app = express();
const PORT = 3000;

// Middleware
app.use(cors());
app.use(express.json());

// Store active SSE connections
const connections = new Set();

// 1. Basic SSE Endpoint
app.get('/events', (req, res) => {
  console.log('New SSE connection established');

  // Set SSE headers
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Headers': 'Cache-Control'
  });

  // Send initial connection message
  res.write('data: {"type": "connected", "message": "Connected to SSE stream"}\n\n');

  // Add connection to active connections
  connections.add(res);

  // Handle client disconnect
  req.on('close', () => {
    console.log('SSE connection closed');
    connections.delete(res);
  });

  // Send keep-alive ping every 30 seconds
  const pingInterval = setInterval(() => {
    res.write(': keep-alive\n\n');
  }, 30000);

  req.on('close', () => {
    clearInterval(pingInterval);
  });
});

// 2. Send event to all connected clients
function broadcastEvent(eventType, data) {
  const message = JSON.stringify({
    type: eventType,
    data: data,
    timestamp: new Date().toISOString()
  });

  connections.forEach(connection => {
    try {
      connection.write(`event: ${eventType}\ndata: ${message}\n\n`);
    } catch (error) {
      console.error('Error sending event:', error);
      connections.delete(connection);
    }
  });
}

// 3. Send regular time updates
function startTimeUpdates() {
  setInterval(() => {
    broadcastEvent('time', {
      time: new Date().toLocaleTimeString(),
      date: new Date().toLocaleDateString()
    });
  }, 1000);
}

// 4. Send random notifications
function startNotifications() {
  const notifications = [
    'New order received!',
    'Server backup completed',
    'New user registered',
    'System update available',
    'Security scan finished'
  ];

  setInterval(() => {
    const notification = notifications[Math.floor(Math.random() * notifications.length)];
    broadcastEvent('notification', {
      message: notification,
      level: Math.random() > 0.5 ? 'info' : 'success'
    });
  }, 5000);
}

// 5. REST API endpoints to trigger events
app.post('/send-message', (req, res) => {
  const { message, type = 'custom' } = req.body;
  broadcastEvent(type, { message });
  res.json({ success: true, message: 'Event broadcasted' });
});

app.post('/trigger-alert', (req, res) => {
  const { level = 'warning', message } = req.body;
  broadcastEvent('alert', { level, message });
  res.json({ success: true, alert: 'Alert sent' });
});

// 6. Stock price simulation
function startStockSimulation() {
  const stocks = {
    'AAPL': { price: 150, change: 0 },
    'GOOGL': { price: 2800, change: 0 },
    'MSFT': { price: 350, change: 0 },
    'AMZN': { price: 3200, change: 0 }
  };

  setInterval(() => {
    Object.keys(stocks).forEach(symbol => {
      const stock = stocks[symbol];
      const oldPrice = stock.price;
      const change = (Math.random() - 0.5) * 10;
      stock.price = Math.max(0, oldPrice + change);
      stock.change = stock.price - oldPrice;

      broadcastEvent('stock-update', {
        symbol: symbol,
        price: stock.price.toFixed(2),
        change: stock.change.toFixed(2),
        changePercent: ((stock.change / oldPrice) * 100).toFixed(2)
      });
    });
  }, 2000);
}

// HTML Client Example
app.get('/', (req, res) => {
  res.send(`
<!DOCTYPE html>
<html>
<head>
    <title>SSE Demo</title>
    <style>
        body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
        .event { margin: 10px 0; padding: 10px; border-radius: 5px; }
        .connected { background: #e8f5e8; }
        .time { background: #e3f2fd; }
        .notification { background: #fff3e0; }
        .alert { background: #ffebee; }
        .stock-update { background: #f3e5f5; }
        .custom { background: #e0f2f1; }
        .controls { margin: 20px 0; }
        button { margin: 5px; padding: 10px 15px; cursor: pointer; }
        input { margin: 5px; padding: 8px; }
        .stocks { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 10px; }
        .stock { padding: 10px; border: 1px solid #ddd; border-radius: 5px; }
        .positive { color: green; }
        .negative { color: red; }
    </style>
</head>
<body>
    <h1>Server-Sent Events Demo</h1>

    <div id="status" class="event">Connecting...</div>

    <div class="controls">
        <input type="text" id="messageInput" placeholder="Enter message">
        <button onclick="sendMessage()">Send Message</button>
        <button onclick="triggerAlert()">Trigger Alert</button>
    </div>

    <div class="stocks" id="stocks"></div>

    <h3>Event Stream:</h3>
    <div id="events"></div>

    <script>
        const eventSource = new EventSource('/events');
        const eventsDiv = document.getElementById('events');
        const statusDiv = document.getElementById('status');
        const stocksDiv = document.getElementById('stocks');

        eventSource.onopen = function(event) {
            statusDiv.innerHTML = '<div class="event connected">✅ Connected to SSE stream</div>';
            addEvent('connected', 'Connection established');
        };

        eventSource.onmessage = function(event) {
            const data = JSON.parse(event.data);
            addEvent(data.type, data.data);
        };

        eventSource.addEventListener('time', function(event) {
            const data = JSON.parse(event.data);
            document.title = `SSE Demo - ${data.data.time}`;
        });

        eventSource.addEventListener('notification', function(event) {
            const data = JSON.parse(event.data);
            showNotification(data.data.message, data.data.level);
        });

        eventSource.addEventListener('alert', function(event) {
            const data = JSON.parse(event.data);
            showAlert(data.data.message, data.data.level);
        });

        eventSource.addEventListener('stock-update', function(event) {
            const data = JSON.parse(event.data);
            updateStock(data.data);
        });

        function addEvent(type, data) {
            const eventDiv = document.createElement('div');
            eventDiv.className = `event ${type}`;
            eventDiv.innerHTML = `
                <strong>${new Date().toLocaleTimeString()} - ${type.toUpperCase()}:</strong>
                ${typeof data === 'object' ? JSON.stringify(data, null, 2) : data}
            `;
            eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);

            // Keep only last 50 events
            while (eventsDiv.children.length > 50) {
                eventsDiv.removeChild(eventsDiv.lastChild);
            }
        }

        function showNotification(message, level) {
            if (Notification.permission === 'granted') {
                new Notification('SSE Notification', {
                    body: message,
                    icon: '/icon.png'
                });
            }
        }

        function showAlert(message, level) {
            alert(`Alert (${level}): ${message}`);
        }

        function updateStock(stock) {
            const stockElement = document.getElementById(`stock-${stock.symbol}`);

            if (!stockElement) {
                const newStockElement = document.createElement('div');
                newStockElement.id = `stock-${stock.symbol}`;
                newStockElement.className = 'stock';
                stocksDiv.appendChild(newStockElement);
            }

            const element = document.getElementById(`stock-${stock.symbol}`);
            const changeClass = stock.change >= 0 ? 'positive' : 'negative';
            const changeSymbol = stock.change >= 0 ? '+' : '';

            element.innerHTML = `
                <strong>${stock.symbol}</strong><br>
                $${stock.price}<br>
                <span class="${changeClass}">${changeSymbol}${stock.change} (${stock.changePercent}%)</span>
            `;
        }

        function sendMessage() {
            const input = document.getElementById('messageInput');
            const message = input.value.trim();

            if (message) {
                fetch('/send-message', {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify({ message, type: 'custom' })
                });
                input.value = '';
            }
        }

        function triggerAlert() {
            fetch('/trigger-alert', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({
                    level: 'warning',
                    message: 'This is a test alert!'
                })
            });
        }

        // Request notification permission
        if ('Notification' in window && Notification.permission === 'default') {
            Notification.requestPermission();
        }

        // Handle connection errors
        eventSource.onerror = function(event) {
            statusDiv.innerHTML = '<div class="event alert">❌ Connection error. Retrying...</div>';
        };

        // Handle Enter key in message input
        document.getElementById('messageInput').addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
    </script>
</body>
</html>
  `);
});

// Start background processes
startTimeUpdates();
startNotifications();
startStockSimulation();

app.listen(PORT, () => {
  console.log(`SSE Server running on http://localhost:${PORT}`);
});

// Graceful shutdown
process.on('SIGINT', () => {
  console.log('\nShutting down SSE server...');
  connections.forEach(connection => {
    connection.end();
  });
  process.exit(0);
});

module.exports = { app, broadcastEvent };

💻 Advanced SSE Patterns javascript

🟡 intermediate ⭐⭐⭐⭐

Advanced Server-Sent Events patterns including authentication, room-based streaming, backpressure handling, and reconnection strategies

⏱️ 30 min 🏷️ sse, real-time, authentication, rooms, enterprise
Prerequisites: Node.js, Express.js, JWT, Redis, Advanced JavaScript
// Advanced Server-Sent Events Patterns
// Enterprise-grade SSE implementation with authentication, rooms, and advanced features

const express = require('express');
const jwt = require('jsonwebtoken');
const Redis = require('ioredis');
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
const compression = require('compression');
const app = express();
const PORT = 3000;

// Configuration
const JWT_SECRET = process.env.JWT_SECRET || 'your-secret-key';
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';

// Redis client for distributed systems
const redis = new Redis(REDIS_URL);

// Middleware
app.use(helmet());
app.use(compression());
app.use(express.json());

// Rate limiting
const limiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15 minutes
  max: 100, // limit each IP to 100 requests per windowMs
  message: 'Too many requests from this IP'
});
app.use('/events', limiter);

// Connection management
class ConnectionManager {
  constructor() {
    this.connections = new Map(); // userId -> Set of connections
    this.rooms = new Map(); // roomId -> Set of userIds
    this.connectionMetadata = new Map(); // connectionId -> metadata
  }

  addConnection(connectionId, res, userId, metadata = {}) {
    const connection = {
      id: connectionId,
      response: res,
      userId: userId,
      rooms: new Set(),
      lastActivity: Date.now(),
      metadata: metadata,
      isAlive: true
    };

    // Store connection
    if (!this.connections.has(userId)) {
      this.connections.set(userId, new Set());
    }
    this.connections.get(userId).add(connection);
    this.connectionMetadata.set(connectionId, connection);

    // Store in Redis for distributed systems
    redis.hset(`connections:${userId}`, connectionId, JSON.stringify({
      connectionId,
      rooms: Array.from(connection.rooms),
      lastActivity: connection.lastActivity,
      metadata: connection.metadata
    }));

    console.log(`Connection added: ${connectionId} for user ${userId}`);
    return connection;
  }

  removeConnection(connectionId) {
    const connection = this.connectionMetadata.get(connectionId);
    if (!connection) return;

    // Remove from user connections
    const userConnections = this.connections.get(connection.userId);
    if (userConnections) {
      userConnections.delete(connection);
      if (userConnections.size === 0) {
        this.connections.delete(connection.userId);
      }
    }

    // Remove from rooms
    connection.rooms.forEach(roomId => {
      this.leaveRoom(connectionId, roomId);
    });

    // Remove from metadata and Redis
    this.connectionMetadata.delete(connectionId);
    redis.hdel(`connections:${connection.userId}`, connectionId);

    console.log(`Connection removed: ${connectionId}`);
  }

  joinRoom(connectionId, roomId) {
    const connection = this.connectionMetadata.get(connectionId);
    if (!connection) return false;

    // Add connection to room
    if (!this.rooms.has(roomId)) {
      this.rooms.set(roomId, new Set());
    }
    this.rooms.get(roomId).add(connectionId);
    connection.rooms.add(roomId);

    // Update Redis
    redis.sadd(`room:${roomId}`, connection.userId);

    console.log(`Connection ${connectionId} joined room ${roomId}`);
    return true;
  }

  leaveRoom(connectionId, roomId) {
    const connection = this.connectionMetadata.get(connectionId);
    if (!connection) return false;

    // Remove from room
    if (this.rooms.has(roomId)) {
      this.rooms.get(roomId).delete(connectionId);
      if (this.rooms.get(roomId).size === 0) {
        this.rooms.delete(roomId);
      }
    }
    connection.rooms.delete(roomId);

    // Update Redis
    redis.srem(`room:${roomId}`, connection.userId);

    console.log(`Connection ${connectionId} left room ${roomId}`);
    return true;
  }

  getConnectionsByUser(userId) {
    return this.connections.get(userId) || new Set();
  }

  getConnectionsByRoom(roomId) {
    const roomConnections = this.rooms.get(roomId) || new Set();
    return Array.from(roomConnections).map(connId =>
      this.connectionMetadata.get(connId)
    ).filter(Boolean);
  }

  getAllConnections() {
    return Array.from(this.connectionMetadata.values());
  }

  updateActivity(connectionId) {
    const connection = this.connectionMetadata.get(connectionId);
    if (connection) {
      connection.lastActivity = Date.now();
    }
  }

  // Clean up inactive connections
  cleanup() {
    const now = Date.now();
    const timeout = 5 * 60 * 1000; // 5 minutes

    for (const [connectionId, connection] of this.connectionMetadata) {
      if (now - connection.lastActivity > timeout) {
        console.log(`Cleaning up inactive connection: ${connectionId}`);
        connection.response.end();
        this.removeConnection(connectionId);
      }
    }
  }
}

const connectionManager = new ConnectionManager();

// Clean up every 5 minutes
setInterval(() => {
  connectionManager.cleanup();
}, 5 * 60 * 1000);

// JWT Authentication middleware
function authenticateToken(req, res, next) {
  const authHeader = req.headers['authorization'];
  const token = authHeader && authHeader.split(' ')[1];

  if (!token) {
    return res.status(401).json({ error: 'Access token required' });
  }

  jwt.verify(token, JWT_SECRET, (err, user) => {
    if (err) {
      return res.status(403).json({ error: 'Invalid token' });
    }
    req.user = user;
    next();
  });
}

// Generate JWT token endpoint
app.post('/auth/login', (req, res) => {
  const { username, password } = req.body;

  // Simple authentication (replace with real auth logic)
  if (username && password) {
    const user = {
      id: username,
      username: username,
      role: 'user'
    };

    const token = jwt.sign(user, JWT_SECRET, { expiresIn: '24h' });
    res.json({ token, user });
  } else {
    res.status(401).json({ error: 'Invalid credentials' });
  }
});

// 1. Advanced SSE Endpoint with Authentication
app.get('/events', authenticateToken, (req, res) => {
  const connectionId = `${req.user.id}_${Date.now()}`;
  const userId = req.user.id;

  // Set SSE headers with additional security
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache, no-store, must-revalidate',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // Disable nginx buffering
    'Access-Control-Allow-Origin': req.headers.origin || '*',
    'Access-Control-Allow-Credentials': 'true'
  });

  // Add connection
  const connection = connectionManager.addConnection(
    connectionId,
    res,
    userId,
    {
      userAgent: req.headers['user-agent'],
      ip: req.ip,
      query: req.query
    }
  );

  // Send connection event with metadata
  const connectionEvent = {
    type: 'connected',
    data: {
      connectionId: connectionId,
      userId: userId,
      timestamp: new Date().toISOString(),
      serverTime: Date.now()
    }
  };
  res.write(`event: connected\ndata: ${JSON.stringify(connectionEvent)}\n\n`);

  // Join default rooms based on user role
  if (req.user.role === 'admin') {
    connectionManager.joinRoom(connectionId, 'admin');
  }
  connectionManager.joinRoom(connectionId, `user_${userId}`);

  // Send initial data
  const initialData = {
    type: 'initial-data',
    data: {
      message: 'Welcome to the SSE stream',
      userId: userId,
      availableRooms: ['notifications', 'updates', 'alerts']
    }
  };
  res.write(`event: initial-data\ndata: ${JSON.stringify(initialData)}\n\n`);

  // Handle client disconnect
  req.on('close', () => {
    console.log(`SSE connection closed: ${connectionId}`);
    connectionManager.removeConnection(connectionId);
  });

  req.on('aborted', () => {
    console.log(`SSE connection aborted: ${connectionId}`);
    connectionManager.removeConnection(connectionId);
  });

  // Setup keep-alive with backpressure handling
  let pingInterval;
  const startPing = () => {
    pingInterval = setInterval(() => {
      try {
        // Check if connection is still writable
        if (res.destroyed || res.finished) {
          clearInterval(pingInterval);
          return;
        }

        res.write(': keep-alive\n\n');
        connectionManager.updateActivity(connectionId);
      } catch (error) {
        console.error(`Ping error for ${connectionId}:`, error);
        clearInterval(pingInterval);
        connectionManager.removeConnection(connectionId);
      }
    }, 30000);
  };

  startPing();

  // Clean up ping interval on disconnect
  req.on('close', () => {
    if (pingInterval) clearInterval(pingInterval);
  });
});

// 2. Room management endpoints
app.post('/rooms/:roomId/join', authenticateToken, (req, res) => {
  const { roomId } = req.params;
  const userId = req.user.id;

  // Get user's connections
  const userConnections = connectionManager.getConnectionsByUser(userId);

  // Join all user connections to the room
  let joinedCount = 0;
  userConnections.forEach(connection => {
    if (connectionManager.joinRoom(connection.id, roomId)) {
      joinedCount++;

      // Send confirmation to this connection
      const joinEvent = {
        type: 'room-joined',
        data: {
          roomId: roomId,
          userId: userId,
          timestamp: new Date().toISOString()
        }
      };
      connection.response.write(`event: room-joined\ndata: ${JSON.stringify(joinEvent)}\n\n`);
    }
  });

  res.json({
    success: true,
    roomId: roomId,
    connectionsJoined: joinedCount
  });
});

app.post('/rooms/:roomId/leave', authenticateToken, (req, res) => {
  const { roomId } = req.params;
  const userId = req.user.id;

  const userConnections = connectionManager.getConnectionsByUser(userId);
  let leftCount = 0;

  userConnections.forEach(connection => {
    if (connectionManager.leaveRoom(connection.id, roomId)) {
      leftCount++;

      const leaveEvent = {
        type: 'room-left',
        data: {
          roomId: roomId,
          userId: userId,
          timestamp: new Date().toISOString()
        }
      };
      connection.response.write(`event: room-left\ndata: ${JSON.stringify(leaveEvent)}\n\n`);
    }
  });

  res.json({
    success: true,
    roomId: roomId,
    connectionsLeft: leftCount
  });
});

// 3. Advanced broadcasting with filtering
function broadcastEvent(eventType, data, options = {}) {
  const {
    roomId = null,
    userId = null,
    connectionId = null,
    metadata = {},
    priority = 'normal'
  } = options;

  const message = {
    type: eventType,
    data: data,
    timestamp: new Date().toISOString(),
    serverTime: Date.now(),
    id: generateEventId(),
    priority: priority
  };

  let targetConnections = [];

  if (connectionId) {
    // Send to specific connection
    const conn = connectionManager.connectionMetadata.get(connectionId);
    if (conn) targetConnections.push(conn);
  } else if (userId) {
    // Send to all connections for a user
    targetConnections = Array.from(connectionManager.getConnectionsByUser(userId));
  } else if (roomId) {
    // Send to all connections in a room
    targetConnections = connectionManager.getConnectionsByRoom(roomId);
  } else {
    // Send to all connections
    targetConnections = connectionManager.getAllConnections();
  }

  // Apply metadata filters
  if (metadata.filters) {
    targetConnections = targetConnections.filter(conn => {
      return Object.entries(metadata.filters).every(([key, value]) => {
        return conn.metadata[key] === value;
      });
    });
  }

  // Send event
  let sentCount = 0;
  targetConnections.forEach(connection => {
    try {
      const eventData = {
        ...message,
        connectionId: connection.id,
        roomId: roomId || null,
        userId: connection.userId
      };

      // Add custom event name if provided
      const eventName = metadata.eventName || eventType;

      connection.response.write(`event: ${eventName}\ndata: ${JSON.stringify(eventData)}\n\n`);
      connectionManager.updateActivity(connection.id);
      sentCount++;
    } catch (error) {
      console.error(`Error broadcasting to ${connection.id}:`, error);
      connectionManager.removeConnection(connection.id);
    }
  });

  // Store event in Redis for replay/debug
  redis.lpush('events:history', JSON.stringify({
    ...message,
    sentCount: sentCount,
    targetCount: targetConnections.length
  }));
  redis.ltrim('events:history', 0, 999); // Keep last 1000 events

  console.log(`Broadcasted '${eventType}' to ${sentCount} connections`);
  return sentCount;
}

// 4. Event broadcasting endpoints
app.post('/broadcast', authenticateToken, (req, res) => {
  const { eventType, data, options = {} } = req.body;

  const sentCount = broadcastEvent(eventType, data, {
    ...options,
    metadata: {
      ...options.metadata,
      senderId: req.user.id,
      senderRole: req.user.role
    }
  });

  res.json({
    success: true,
    eventType: eventType,
    sentTo: sentCount
  });
});

app.post('/broadcast-room/:roomId', authenticateToken, (req, res) => {
  const { roomId } = req.params;
  const { eventType, data } = req.body;

  const sentCount = broadcastEvent(eventType, data, {
    roomId: roomId,
    metadata: {
      senderId: req.user.id,
      roomId: roomId
    }
  });

  res.json({
    success: true,
    roomId: roomId,
    eventType: eventType,
    sentTo: sentCount
  });
});

// 5. Event replay for new connections
app.get('/events/replay/:lastEventId', authenticateToken, (req, res) => {
  const { lastEventId } = req.params;
  const userId = req.user.id;

  // Get events from Redis (simplified - in production use a proper event store)
  redis.lrange('events:history', 0, 99, (err, events) => {
    if (err) {
      return res.status(500).json({ error: 'Failed to fetch events' });
    }

    const parsedEvents = events.map(e => JSON.parse(e));
    const filteredEvents = parsedEvents.filter(event => {
      // Filter events relevant to this user or public events
      return event.data.userId === userId || !event.data.userId;
    });

    res.json({
      events: filteredEvents,
      lastEventId: lastEventId
    });
  });
});

// 6. Connection statistics
app.get('/stats', authenticateToken, (req, res) => {
  const stats = {
    totalConnections: connectionManager.getAllConnections().length,
    totalRooms: connectionManager.rooms.size,
    activeUsers: connectionManager.connections.size,
    roomStats: {}
  };

  // Add room-specific stats
  for (const [roomId, connections] of connectionManager.rooms) {
    stats.roomStats[roomId] = connections.size;
  }

  res.json(stats);
});

// 7. Live data simulations
function startLiveSimulations() {
  // User activity feed
  setInterval(() => {
    const activities = [
      'User logged in',
      'Post published',
      'Comment added',
      'File uploaded',
      'Task completed'
    ];

    const activity = activities[Math.floor(Math.random() * activities.length)];
    broadcastEvent('user-activity', {
      activity: activity,
      userId: `user_${Math.floor(Math.random() * 100)}`,
      timestamp: new Date().toISOString()
    });
  }, 3000);

  // System metrics
  setInterval(() => {
    const metrics = {
      cpu: Math.random() * 100,
      memory: Math.random() * 100,
      disk: Math.random() * 100,
      network: Math.random() * 1000
    };

    broadcastEvent('system-metrics', metrics, {
      roomId: 'admin'
    });
  }, 5000);

  // Live chat simulation
  setInterval(() => {
    const messages = [
      'Hello everyone!',
      'How are you doing?',
      'Great weather today',
      'Anyone working on the new feature?',
      'Coffee break time!'
    ];

    const message = messages[Math.floor(Math.random() * messages.length)];
    const userId = `user_${Math.floor(Math.random() * 10)}`;

    broadcastEvent('chat-message', {
      message: message,
      userId: userId,
      username: `User_${userId}`,
      timestamp: new Date().toISOString()
    }, {
      roomId: 'chat'
    });
  }, 4000);
}

// Utility function to generate unique event IDs
function generateEventId() {
  return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}

// Start background simulations
startLiveSimulations();

app.listen(PORT, () => {
  console.log(`Advanced SSE Server running on http://localhost:${PORT}`);
});

// Graceful shutdown
process.on('SIGINT', async () => {
  console.log('\nShutting down Advanced SSE server...');

  // Close all connections
  connectionManager.getAllConnections().forEach(connection => {
    try {
      connection.response.write('event: shutdown\ndata: {"message": "Server shutting down"}\n\n');
      connection.response.end();
    } catch (error) {
      // Ignore errors during shutdown
    }
  });

  // Close Redis connection
  redis.quit();

  setTimeout(() => {
    process.exit(0);
  }, 1000);
});

module.exports = {
  app,
  connectionManager,
  broadcastEvent
};

💻 Python Flask SSE Implementation python

🟡 intermediate ⭐⭐⭐

Server-Sent Events implementation using Python Flask with Redis pub/sub for scalable real-time applications

⏱️ 25 min 🏷️ sse, python, flask, real-time, redis
Prerequisites: Python, Flask, Redis, JWT, Threading
# Server-Sent Events with Python Flask
# pip install flask flask-redis redis gevent

from flask import Flask, Response, jsonify, request, stream_template
from flask_redis import FlaskRedis
import redis as redis_py
import json
import time
import threading
import uuid
from datetime import datetime
import queue
from functools import wraps
import jwt
from gevent import monkey
monkey.patch_all()

app = Flask(__name__)

# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['REDIS_URL'] = 'redis://localhost:6379/0'
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'

# Redis connections
redis_client = FlaskRedis(app)
pubsub = redis_py.Redis.from_url('redis://localhost:6379/0').pubsub()

# In-memory connection storage (use Redis in production)
connections = {}
rooms = {}

class Connection:
    def __init__(self, connection_id, user_id, response_queue):
        self.id = connection_id
        self.user_id = user_id
        self.queue = queue.Queue()
        self.created_at = datetime.now()
        self.last_activity = datetime.now()
        self.rooms = set()
        self.is_active = True

    def send_event(self, event_type, data, event_id=None):
        """Send event to this connection"""
        event = {
            'id': event_id or str(uuid.uuid4()),
            'type': event_type,
            'data': data,
            'timestamp': datetime.now().isoformat()
        }
        self.queue.put(event)
        self.last_activity = datetime.now()

    def join_room(self, room_id):
        """Join a room"""
        self.rooms.add(room_id)
        if room_id not in rooms:
            rooms[room_id] = set()
        rooms[room_id].add(self.id)

    def leave_room(self, room_id):
        """Leave a room"""
        self.rooms.discard(room_id)
        if room_id in rooms:
            rooms[room_id].discard(self.id)
            if not rooms[room_id]:
                del rooms[room_id]

# Authentication decorator
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = None

        if 'Authorization' in request.headers:
            auth_header = request.headers['Authorization']
            try:
                token = auth_header.split(" ")[1]
            except IndexError:
                return jsonify({'message': 'Bearer token malformed'}), 401

        if not token:
            return jsonify({'message': 'Token is missing!'}), 401

        try:
            data = jwt.decode(token, app.config['JWT_SECRET_KEY'], algorithms=["HS256"])
            current_user_id = data['user_id']
        except Exception as e:
            return jsonify({'message': 'Token is invalid!'}), 401

        return f(current_user_id, *args, **kwargs)

    return decorated

# 1. Authentication endpoints
@app.route('/auth/login', methods=['POST'])
def login():
    data = request.get_json()
    username = data.get('username')
    password = data.get('password')

    # Simple authentication (replace with real auth logic)
    if username and password:
        token = jwt.encode({
            'user_id': username,
            'exp': datetime.utcnow() + timedelta(hours=24)
        }, app.config['JWT_SECRET_KEY'], algorithm="HS256")

        return jsonify({
            'token': token,
            'user': {'id': username, 'username': username}
        })

    return jsonify({'message': 'Invalid credentials'}), 401

# 2. Main SSE endpoint
@app.route('/events')
@token_required
def events(current_user_id):
    connection_id = str(uuid.uuid4())

    # Create new connection
    connection = Connection(connection_id, current_user_id, queue.Queue())
    connections[connection_id] = connection

    # Join user-specific room
    connection.join_room(f'user_{current_user_id}')

    def generate():
        try:
            # Send initial connection event
            yield f"event: connected\n"
            yield f"data: {json.dumps({{'type': 'connected', 'connection_id': connection_id, 'user_id': current_user_id}})}\n\n"

            # Subscribe to Redis channels
            pubsub.subscribe([
                f'user_{current_user_id}',
                'global',
                'notifications'
            ])

            # Listen for Redis messages
            while connection.is_active:
                # Check for Redis messages
                message = pubsub.get_message(timeout=0.1)
                if message and message['type'] == 'message':
                    try:
                        event_data = json.loads(message['data'].decode('utf-8'))
                        event_type = event_data.get('type', 'message')

                        # Send event to client
                        yield f"event: {event_type}\n"
                        yield f"data: {json.dumps(event_data)}\n\n"

                    except (json.JSONDecodeError, UnicodeDecodeError) as e:
                        print(f"Error parsing Redis message: {e}")

                # Check for queued events
                try:
                    while not connection.queue.empty():
                        event = connection.queue.get_nowait()
                        yield f"event: {event['type']}\n"
                        yield f"data: {json.dumps(event)}\n\n"
                except queue.Empty:
                    pass

                # Send keep-alive ping
                yield ": keep-alive\n\n"
                time.sleep(1)

        except GeneratorExit:
            print(f"Client disconnected: {connection_id}")
        finally:
            # Cleanup
            cleanup_connection(connection_id)
            pubsub.unsubscribe()

    return Response(
        generate(),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Headers': 'Cache-Control'
        }
    )

def cleanup_connection(connection_id):
    """Clean up connection when disconnected"""
    if connection_id in connections:
        connection = connections[connection_id]

        # Leave all rooms
        for room_id in list(connection.rooms):
            connection.leave_room(room_id)

        # Remove from connections
        del connections[connection_id]

        print(f"Cleaned up connection: {connection_id}")

# 3. Room management
@app.route('/rooms/<room_id>/join', methods=['POST'])
@token_required
def join_room(current_user_id, room_id):
    connection_id = request.headers.get('X-Connection-ID')

    if connection_id in connections:
        connection = connections[connection_id]
        connection.join_room(room_id)

        # Subscribe to Redis room channel
        pubsub.subscribe(room_id)

        # Notify room members
        broadcast_to_room(room_id, 'user_joined', {
            'user_id': current_user_id,
            'room_id': room_id
        }, exclude_connection=connection_id)

        return jsonify({'message': f'Joined room {room_id}'})

    return jsonify({'message': 'Connection not found'}), 404

@app.route('/rooms/<room_id>/leave', methods=['POST'])
@token_required
def leave_room(current_user_id, room_id):
    connection_id = request.headers.get('X-Connection-ID')

    if connection_id in connections:
        connection = connections[connection_id]
        connection.leave_room(room_id)

        # Unsubscribe from Redis room channel
        pubsub.unsubscribe(room_id)

        # Notify room members
        broadcast_to_room(room_id, 'user_left', {
            'user_id': current_user_id,
            'room_id': room_id
        }, exclude_connection=connection_id)

        return jsonify({'message': f'Left room {room_id}'})

    return jsonify({'message': 'Connection not found'}), 404

# 4. Broadcasting functions
def broadcast_to_user(user_id, event_type, data):
    """Broadcast event to specific user"""
    room_id = f'user_{user_id}'
    event_data = {
        'type': event_type,
        'data': data,
        'timestamp': datetime.now().isoformat(),
        'target_user': user_id
    }

    # Publish to Redis
    redis_client.publish(room_id, json.dumps(event_data))

def broadcast_to_room(room_id, event_type, data, exclude_connection=None):
    """Broadcast event to room members"""
    event_data = {
        'type': event_type,
        'data': data,
        'timestamp': datetime.now().isoformat(),
        'room_id': room_id
    }

    # Send to in-memory connections
    if room_id in rooms:
        for conn_id in rooms[room_id]:
            if conn_id != exclude_connection and conn_id in connections:
                connections[conn_id].send_event(event_type, data)

    # Publish to Redis for distributed systems
    redis_client.publish(room_id, json.dumps(event_data))

def broadcast_global(event_type, data):
    """Broadcast event to all connected clients"""
    event_data = {
        'type': event_type,
        'data': data,
        'timestamp': datetime.now().isoformat()
    }

    redis_client.publish('global', json.dumps(event_data))

# 5. Event broadcasting endpoints
@app.route('/broadcast', methods=['POST'])
@token_required
def broadcast_event(current_user_id):
    data = request.get_json()
    event_type = data.get('type', 'message')
    event_data = data.get('data', {})

    broadcast_global(event_type, event_data)

    return jsonify({'message': 'Event broadcasted'})

@app.route('/broadcast/room/<room_id>', methods=['POST'])
@token_required
def broadcast_to_room_endpoint(current_user_id, room_id):
    data = request.get_json()
    event_type = data.get('type', 'message')
    event_data = data.get('data', {})

    broadcast_to_room(room_id, event_type, event_data)

    return jsonify({'message': f'Event sent to room {room_id}'})

@app.route('/broadcast/user/<user_id>', methods=['POST'])
@token_required
def broadcast_to_user_endpoint(current_user_id, user_id):
    data = request.get_json()
    event_type = data.get('type', 'message')
    event_data = data.get('data', {})

    broadcast_to_user(user_id, event_type, event_data)

    return jsonify({'message': f'Event sent to user {user_id}'})

# 6. Real-time features
def start_time_updates():
    """Start broadcasting time updates"""
    def time_loop():
        while True:
            time_data = {
                'time': datetime.now().strftime('%H:%M:%S'),
                'date': datetime.now().strftime('%Y-%m-%d'),
                'timestamp': datetime.now().isoformat()
            }
            broadcast_global('time_update', time_data)
            time.sleep(1)

    thread = threading.Thread(target=time_loop, daemon=True)
    thread.start()

def start_notification_simulation():
    """Simulate random notifications"""
    def notification_loop():
        notifications = [
            {'message': 'New message received', 'level': 'info'},
            {'message': 'System update available', 'level': 'warning'},
            {'message': 'Task completed successfully', 'level': 'success'},
            {'message': 'Server maintenance scheduled', 'level': 'info'},
            {'message': 'Security scan completed', 'level': 'success'}
        ]

        while True:
            notification = notifications[int(time.time()) % len(notifications)]
            notification['id'] = str(uuid.uuid4())
            notification['timestamp'] = datetime.now().isoformat()

            broadcast_global('notification', notification)
            time.sleep(7)

    thread = threading.Thread(target=notification_loop, daemon=True)
    thread.start()

def start_stock_price_simulation():
    """Simulate stock price updates"""
    def stock_loop():
        stocks = {
            'AAPL': {'price': 150.0, 'change': 0.0},
            'GOOGL': {'price': 2800.0, 'change': 0.0},
            'MSFT': {'price': 350.0, 'change': 0.0},
            'AMZN': {'price': 3200.0, 'change': 0.0}
        }

        while True:
            for symbol, stock in stocks.items():
                # Random price change
                change_percent = (time.time() % 100 - 50) / 1000
                old_price = stock['price']
                new_price = old_price * (1 + change_percent)

                stock['price'] = new_price
                stock['change'] = new_price - old_price

                stock_data = {
                    'symbol': symbol,
                    'price': round(new_price, 2),
                    'change': round(stock['change'], 2),
                    'change_percent': round(change_percent * 100, 2),
                    'timestamp': datetime.now().isoformat()
                }

                broadcast_to_room('stocks', 'stock_update', stock_data)

            time.sleep(2)

    thread = threading.Thread(target=stock_loop, daemon=True)
    thread.start()

# 7. Statistics and monitoring
@app.route('/stats')
def get_stats():
    stats = {
        'active_connections': len(connections),
        'active_rooms': len(rooms),
        'room_details': {}
    }

    for room_id, connection_ids in rooms.items():
        stats['room_details'][room_id] = len(connection_ids)

    return jsonify(stats)

@app.route('/health')
def health_check():
    """Health check endpoint"""
    try:
        # Check Redis connection
        redis_client.ping()
        return jsonify({'status': 'healthy'}), 200
    except Exception as e:
        return jsonify({'status': 'unhealthy', 'error': str(e)}), 503

# 8. HTML Client Interface
@app.route('/')
def index():
    return '''
<!DOCTYPE html>
<html>
<head>
    <title>Python Flask SSE Demo</title>
    <style>
        body { font-family: Arial, sans-serif; max-width: 1000px; margin: 0 auto; padding: 20px; }
        .container { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; }
        .panel { border: 1px solid #ddd; border-radius: 5px; padding: 15px; }
        .panel h3 { margin-top: 0; }
        .event { margin: 5px 0; padding: 8px; border-radius: 3px; font-family: monospace; font-size: 12px; }
        .time_update { background: #e3f2fd; }
        .notification { background: #fff3e0; }
        .stock_update { background: #f3e5f5; }
        .message { background: #e8f5e8; }
        .user_joined { background: #e1f5fe; }
        .user_left { background: #fce4ec; }
        .controls { margin: 10px 0; }
        button { margin: 5px; padding: 8px 12px; cursor: pointer; }
        input, select { margin: 5px; padding: 8px; }
        .stocks { display: grid; grid-template-columns: repeat(2, 1fr); gap: 10px; }
        .stock { padding: 10px; border: 1px solid #ddd; border-radius: 5px; text-align: center; }
        .positive { color: green; }
        .negative { color: red; }
    </style>
</head>
<body>
    <h1>Python Flask Server-Sent Events Demo</h1>

    <div class="controls">
        <button onclick="getToken()">Get Token</button>
        <input type="text" id="tokenInput" placeholder="JWT Token" size="50">
        <button onclick="connect()">Connect to SSE</button>
        <button onclick="disconnect()">Disconnect</button>
    </div>

    <div class="controls">
        <input type="text" id="messageInput" placeholder="Message to broadcast">
        <button onclick="broadcastMessage()">Broadcast</button>

        <input type="text" id="roomInput" placeholder="Room name" value="chat">
        <button onclick="joinRoom()">Join Room</button>
        <button onclick="leaveRoom()">Leave Room</button>
    </div>

    <div class="container">
        <div class="panel">
            <h3>Event Stream</h3>
            <div id="events" style="height: 400px; overflow-y: scroll;"></div>
        </div>

        <div class="panel">
            <h3>Stock Prices</h3>
            <div class="stocks" id="stocks"></div>
        </div>
    </div>

    <script>
        let eventSource = null;
        let token = null;

        function getToken() {
            fetch('/auth/login', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ username: 'demo', password: 'demo' })
            })
            .then(response => response.json())
            .then(data => {
                token = data.token;
                document.getElementById('tokenInput').value = token;
            });
        }

        function connect() {
            token = document.getElementById('tokenInput').value;
            if (!token) {
                alert('Please get a token first');
                return;
            }

            if (eventSource) {
                eventSource.close();
            }

            eventSource = new EventSource('/events', {
                headers: {
                    'Authorization': 'Bearer ' + token
                }
            });

            eventSource.onopen = function() {
                addEvent('connected', 'Connected to SSE stream');
            };

            eventSource.onmessage = function(event) {
                const data = JSON.parse(event.data);
                addEvent(data.type, data);
            };

            eventSource.addEventListener('time_update', function(event) {
                const data = JSON.parse(event.data);
                document.title = 'SSE Demo - ' + data.data.time;
            });

            eventSource.addEventListener('stock_update', function(event) {
                const data = JSON.parse(event.data);
                updateStock(data.data);
            });

            eventSource.onerror = function() {
                addEvent('error', 'Connection error');
            };
        }

        function disconnect() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
                addEvent('disconnected', 'Disconnected from SSE');
            }
        }

        function broadcastMessage() {
            const message = document.getElementById('messageInput').value;
            if (message && token) {
                fetch('/broadcast', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json',
                        'Authorization': 'Bearer ' + token
                    },
                    body: JSON.stringify({
                        type: 'message',
                        data: { message: message }
                    })
                });
                document.getElementById('messageInput').value = '';
            }
        }

        function joinRoom() {
            const roomName = document.getElementById('roomInput').value;
            if (roomName && token) {
                fetch('/rooms/' + roomName + '/join', {
                    method: 'POST',
                    headers: {
                        'Authorization': 'Bearer ' + token
                    }
                });
            }
        }

        function leaveRoom() {
            const roomName = document.getElementById('roomInput').value;
            if (roomName && token) {
                fetch('/rooms/' + roomName + '/leave', {
                    method: 'POST',
                    headers: {
                        'Authorization': 'Bearer ' + token
                    }
                });
            }
        }

        function addEvent(type, data) {
            const eventsDiv = document.getElementById('events');
            const eventDiv = document.createElement('div');
            eventDiv.className = 'event ' + type;

            const time = new Date().toLocaleTimeString();
            let content = '';

            if (typeof data === 'object') {
                content = JSON.stringify(data, null, 2);
            } else {
                content = data;
            }

            eventDiv.innerHTML = '<strong>' + time + ' - ' + type + ':</strong><br>' + content;
            eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);

            // Keep only last 100 events
            while (eventsDiv.children.length > 100) {
                eventsDiv.removeChild(eventsDiv.lastChild);
            }
        }

        function updateStock(stock) {
            const stockElement = document.getElementById('stock_' + stock.symbol);

            if (!stockElement) {
                const newStockElement = document.createElement('div');
                newStockElement.id = 'stock_' + stock.symbol;
                newStockElement.className = 'stock';
                document.getElementById('stocks').appendChild(newStockElement);
            }

            const element = document.getElementById('stock_' + stock.symbol);
            const changeClass = stock.change >= 0 ? 'positive' : 'negative';
            const changeSymbol = stock.change >= 0 ? '+' : '';

            element.innerHTML = `
                <strong>${stock.symbol}</strong><br>
                ${stock.price}<br>
                <span class="${changeClass}">${changeSymbol}${stock.change} (${stock.change_percent}%)</span>
            `;
        }

        // Auto-connect on load
        window.onload = function() {
            getToken();
            setTimeout(connect, 500);
        };
    </script>
</body>
</html>
    '''

# Start background threads
start_time_updates()
start_notification_simulation()
start_stock_price_simulation()

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000, threaded=True)