Server-Sent Events (SSE)
Server-Sent Events examples for real-time unidirectional communication from server to client, including live updates, notifications, and streaming data
Key Facts
- Category
- Communication Protocols
- Items
- 3
- Format Families
- sample
Sample Overview
Server-Sent Events examples for real-time unidirectional communication from server to client, including live updates, notifications, and streaming data This sample set belongs to Communication Protocols and can be used to test related workflows inside Elysia Tools.
💻 Server-Sent Events Basics javascript
Introduction to SSE with basic server implementation and client consumption patterns
// Server-Sent Events (SSE) Basic Examples
// Demonstrates real-time server-to-client communication
const express = require('express');
const cors = require('cors');
const app = express();
const PORT = 3000;
// Middleware
app.use(cors());
app.use(express.json());
// Store active SSE connections
const connections = new Set();
// 1. Basic SSE Endpoint
app.get('/events', (req, res) => {
console.log('New SSE connection established');
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
});
// Send initial connection message
res.write('data: {"type": "connected", "message": "Connected to SSE stream"}\n\n');
// Add connection to active connections
connections.add(res);
// Handle client disconnect
req.on('close', () => {
console.log('SSE connection closed');
connections.delete(res);
});
// Send keep-alive ping every 30 seconds
const pingInterval = setInterval(() => {
res.write(': keep-alive\n\n');
}, 30000);
req.on('close', () => {
clearInterval(pingInterval);
});
});
// 2. Send event to all connected clients
function broadcastEvent(eventType, data) {
const message = JSON.stringify({
type: eventType,
data: data,
timestamp: new Date().toISOString()
});
connections.forEach(connection => {
try {
connection.write(`event: ${eventType}\ndata: ${message}\n\n`);
} catch (error) {
console.error('Error sending event:', error);
connections.delete(connection);
}
});
}
// 3. Send regular time updates
function startTimeUpdates() {
setInterval(() => {
broadcastEvent('time', {
time: new Date().toLocaleTimeString(),
date: new Date().toLocaleDateString()
});
}, 1000);
}
// 4. Send random notifications
function startNotifications() {
const notifications = [
'New order received!',
'Server backup completed',
'New user registered',
'System update available',
'Security scan finished'
];
setInterval(() => {
const notification = notifications[Math.floor(Math.random() * notifications.length)];
broadcastEvent('notification', {
message: notification,
level: Math.random() > 0.5 ? 'info' : 'success'
});
}, 5000);
}
// 5. REST API endpoints to trigger events
app.post('/send-message', (req, res) => {
const { message, type = 'custom' } = req.body;
broadcastEvent(type, { message });
res.json({ success: true, message: 'Event broadcasted' });
});
app.post('/trigger-alert', (req, res) => {
const { level = 'warning', message } = req.body;
broadcastEvent('alert', { level, message });
res.json({ success: true, alert: 'Alert sent' });
});
// 6. Stock price simulation
function startStockSimulation() {
const stocks = {
'AAPL': { price: 150, change: 0 },
'GOOGL': { price: 2800, change: 0 },
'MSFT': { price: 350, change: 0 },
'AMZN': { price: 3200, change: 0 }
};
setInterval(() => {
Object.keys(stocks).forEach(symbol => {
const stock = stocks[symbol];
const oldPrice = stock.price;
const change = (Math.random() - 0.5) * 10;
stock.price = Math.max(0, oldPrice + change);
stock.change = stock.price - oldPrice;
broadcastEvent('stock-update', {
symbol: symbol,
price: stock.price.toFixed(2),
change: stock.change.toFixed(2),
changePercent: ((stock.change / oldPrice) * 100).toFixed(2)
});
});
}, 2000);
}
// HTML Client Example
app.get('/', (req, res) => {
res.send(`
<!DOCTYPE html>
<html>
<head>
<title>SSE Demo</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
.event { margin: 10px 0; padding: 10px; border-radius: 5px; }
.connected { background: #e8f5e8; }
.time { background: #e3f2fd; }
.notification { background: #fff3e0; }
.alert { background: #ffebee; }
.stock-update { background: #f3e5f5; }
.custom { background: #e0f2f1; }
.controls { margin: 20px 0; }
button { margin: 5px; padding: 10px 15px; cursor: pointer; }
input { margin: 5px; padding: 8px; }
.stocks { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 10px; }
.stock { padding: 10px; border: 1px solid #ddd; border-radius: 5px; }
.positive { color: green; }
.negative { color: red; }
</style>
</head>
<body>
<h1>Server-Sent Events Demo</h1>
<div id="status" class="event">Connecting...</div>
<div class="controls">
<input type="text" id="messageInput" placeholder="Enter message">
<button onclick="sendMessage()">Send Message</button>
<button onclick="triggerAlert()">Trigger Alert</button>
</div>
<div class="stocks" id="stocks"></div>
<h3>Event Stream:</h3>
<div id="events"></div>
<script>
const eventSource = new EventSource('/events');
const eventsDiv = document.getElementById('events');
const statusDiv = document.getElementById('status');
const stocksDiv = document.getElementById('stocks');
eventSource.onopen = function(event) {
statusDiv.innerHTML = '<div class="event connected">✅ Connected to SSE stream</div>';
addEvent('connected', 'Connection established');
};
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
addEvent(data.type, data.data);
};
eventSource.addEventListener('time', function(event) {
const data = JSON.parse(event.data);
document.title = `SSE Demo - ${data.data.time}`;
});
eventSource.addEventListener('notification', function(event) {
const data = JSON.parse(event.data);
showNotification(data.data.message, data.data.level);
});
eventSource.addEventListener('alert', function(event) {
const data = JSON.parse(event.data);
showAlert(data.data.message, data.data.level);
});
eventSource.addEventListener('stock-update', function(event) {
const data = JSON.parse(event.data);
updateStock(data.data);
});
function addEvent(type, data) {
const eventDiv = document.createElement('div');
eventDiv.className = `event ${type}`;
eventDiv.innerHTML = `
<strong>${new Date().toLocaleTimeString()} - ${type.toUpperCase()}:</strong>
${typeof data === 'object' ? JSON.stringify(data, null, 2) : data}
`;
eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
// Keep only last 50 events
while (eventsDiv.children.length > 50) {
eventsDiv.removeChild(eventsDiv.lastChild);
}
}
function showNotification(message, level) {
if (Notification.permission === 'granted') {
new Notification('SSE Notification', {
body: message,
icon: '/icon.png'
});
}
}
function showAlert(message, level) {
alert(`Alert (${level}): ${message}`);
}
function updateStock(stock) {
const stockElement = document.getElementById(`stock-${stock.symbol}`);
if (!stockElement) {
const newStockElement = document.createElement('div');
newStockElement.id = `stock-${stock.symbol}`;
newStockElement.className = 'stock';
stocksDiv.appendChild(newStockElement);
}
const element = document.getElementById(`stock-${stock.symbol}`);
const changeClass = stock.change >= 0 ? 'positive' : 'negative';
const changeSymbol = stock.change >= 0 ? '+' : '';
element.innerHTML = `
<strong>${stock.symbol}</strong><br>
$${stock.price}<br>
<span class="${changeClass}">${changeSymbol}${stock.change} (${stock.changePercent}%)</span>
`;
}
function sendMessage() {
const input = document.getElementById('messageInput');
const message = input.value.trim();
if (message) {
fetch('/send-message', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, type: 'custom' })
});
input.value = '';
}
}
function triggerAlert() {
fetch('/trigger-alert', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
level: 'warning',
message: 'This is a test alert!'
})
});
}
// Request notification permission
if ('Notification' in window && Notification.permission === 'default') {
Notification.requestPermission();
}
// Handle connection errors
eventSource.onerror = function(event) {
statusDiv.innerHTML = '<div class="event alert">❌ Connection error. Retrying...</div>';
};
// Handle Enter key in message input
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
`);
});
// Start background processes
startTimeUpdates();
startNotifications();
startStockSimulation();
app.listen(PORT, () => {
console.log(`SSE Server running on http://localhost:${PORT}`);
});
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down SSE server...');
connections.forEach(connection => {
connection.end();
});
process.exit(0);
});
module.exports = { app, broadcastEvent };
💻 Advanced SSE Patterns javascript
Advanced Server-Sent Events patterns including authentication, room-based streaming, backpressure handling, and reconnection strategies
// Advanced Server-Sent Events Patterns
// Enterprise-grade SSE implementation with authentication, rooms, and advanced features
const express = require('express');
const jwt = require('jsonwebtoken');
const Redis = require('ioredis');
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
const compression = require('compression');
const app = express();
const PORT = 3000;
// Configuration
const JWT_SECRET = process.env.JWT_SECRET || 'your-secret-key';
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
// Redis client for distributed systems
const redis = new Redis(REDIS_URL);
// Middleware
app.use(helmet());
app.use(compression());
app.use(express.json());
// Rate limiting
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // limit each IP to 100 requests per windowMs
message: 'Too many requests from this IP'
});
app.use('/events', limiter);
// Connection management
class ConnectionManager {
constructor() {
this.connections = new Map(); // userId -> Set of connections
this.rooms = new Map(); // roomId -> Set of userIds
this.connectionMetadata = new Map(); // connectionId -> metadata
}
addConnection(connectionId, res, userId, metadata = {}) {
const connection = {
id: connectionId,
response: res,
userId: userId,
rooms: new Set(),
lastActivity: Date.now(),
metadata: metadata,
isAlive: true
};
// Store connection
if (!this.connections.has(userId)) {
this.connections.set(userId, new Set());
}
this.connections.get(userId).add(connection);
this.connectionMetadata.set(connectionId, connection);
// Store in Redis for distributed systems
redis.hset(`connections:${userId}`, connectionId, JSON.stringify({
connectionId,
rooms: Array.from(connection.rooms),
lastActivity: connection.lastActivity,
metadata: connection.metadata
}));
console.log(`Connection added: ${connectionId} for user ${userId}`);
return connection;
}
removeConnection(connectionId) {
const connection = this.connectionMetadata.get(connectionId);
if (!connection) return;
// Remove from user connections
const userConnections = this.connections.get(connection.userId);
if (userConnections) {
userConnections.delete(connection);
if (userConnections.size === 0) {
this.connections.delete(connection.userId);
}
}
// Remove from rooms
connection.rooms.forEach(roomId => {
this.leaveRoom(connectionId, roomId);
});
// Remove from metadata and Redis
this.connectionMetadata.delete(connectionId);
redis.hdel(`connections:${connection.userId}`, connectionId);
console.log(`Connection removed: ${connectionId}`);
}
joinRoom(connectionId, roomId) {
const connection = this.connectionMetadata.get(connectionId);
if (!connection) return false;
// Add connection to room
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}
this.rooms.get(roomId).add(connectionId);
connection.rooms.add(roomId);
// Update Redis
redis.sadd(`room:${roomId}`, connection.userId);
console.log(`Connection ${connectionId} joined room ${roomId}`);
return true;
}
leaveRoom(connectionId, roomId) {
const connection = this.connectionMetadata.get(connectionId);
if (!connection) return false;
// Remove from room
if (this.rooms.has(roomId)) {
this.rooms.get(roomId).delete(connectionId);
if (this.rooms.get(roomId).size === 0) {
this.rooms.delete(roomId);
}
}
connection.rooms.delete(roomId);
// Update Redis
redis.srem(`room:${roomId}`, connection.userId);
console.log(`Connection ${connectionId} left room ${roomId}`);
return true;
}
getConnectionsByUser(userId) {
return this.connections.get(userId) || new Set();
}
getConnectionsByRoom(roomId) {
const roomConnections = this.rooms.get(roomId) || new Set();
return Array.from(roomConnections).map(connId =>
this.connectionMetadata.get(connId)
).filter(Boolean);
}
getAllConnections() {
return Array.from(this.connectionMetadata.values());
}
updateActivity(connectionId) {
const connection = this.connectionMetadata.get(connectionId);
if (connection) {
connection.lastActivity = Date.now();
}
}
// Clean up inactive connections
cleanup() {
const now = Date.now();
const timeout = 5 * 60 * 1000; // 5 minutes
for (const [connectionId, connection] of this.connectionMetadata) {
if (now - connection.lastActivity > timeout) {
console.log(`Cleaning up inactive connection: ${connectionId}`);
connection.response.end();
this.removeConnection(connectionId);
}
}
}
}
const connectionManager = new ConnectionManager();
// Clean up every 5 minutes
setInterval(() => {
connectionManager.cleanup();
}, 5 * 60 * 1000);
// JWT Authentication middleware
function authenticateToken(req, res, next) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'Access token required' });
}
jwt.verify(token, JWT_SECRET, (err, user) => {
if (err) {
return res.status(403).json({ error: 'Invalid token' });
}
req.user = user;
next();
});
}
// Generate JWT token endpoint
app.post('/auth/login', (req, res) => {
const { username, password } = req.body;
// Simple authentication (replace with real auth logic)
if (username && password) {
const user = {
id: username,
username: username,
role: 'user'
};
const token = jwt.sign(user, JWT_SECRET, { expiresIn: '24h' });
res.json({ token, user });
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});
// 1. Advanced SSE Endpoint with Authentication
app.get('/events', authenticateToken, (req, res) => {
const connectionId = `${req.user.id}_${Date.now()}`;
const userId = req.user.id;
// Set SSE headers with additional security
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // Disable nginx buffering
'Access-Control-Allow-Origin': req.headers.origin || '*',
'Access-Control-Allow-Credentials': 'true'
});
// Add connection
const connection = connectionManager.addConnection(
connectionId,
res,
userId,
{
userAgent: req.headers['user-agent'],
ip: req.ip,
query: req.query
}
);
// Send connection event with metadata
const connectionEvent = {
type: 'connected',
data: {
connectionId: connectionId,
userId: userId,
timestamp: new Date().toISOString(),
serverTime: Date.now()
}
};
res.write(`event: connected\ndata: ${JSON.stringify(connectionEvent)}\n\n`);
// Join default rooms based on user role
if (req.user.role === 'admin') {
connectionManager.joinRoom(connectionId, 'admin');
}
connectionManager.joinRoom(connectionId, `user_${userId}`);
// Send initial data
const initialData = {
type: 'initial-data',
data: {
message: 'Welcome to the SSE stream',
userId: userId,
availableRooms: ['notifications', 'updates', 'alerts']
}
};
res.write(`event: initial-data\ndata: ${JSON.stringify(initialData)}\n\n`);
// Handle client disconnect
req.on('close', () => {
console.log(`SSE connection closed: ${connectionId}`);
connectionManager.removeConnection(connectionId);
});
req.on('aborted', () => {
console.log(`SSE connection aborted: ${connectionId}`);
connectionManager.removeConnection(connectionId);
});
// Setup keep-alive with backpressure handling
let pingInterval;
const startPing = () => {
pingInterval = setInterval(() => {
try {
// Check if connection is still writable
if (res.destroyed || res.finished) {
clearInterval(pingInterval);
return;
}
res.write(': keep-alive\n\n');
connectionManager.updateActivity(connectionId);
} catch (error) {
console.error(`Ping error for ${connectionId}:`, error);
clearInterval(pingInterval);
connectionManager.removeConnection(connectionId);
}
}, 30000);
};
startPing();
// Clean up ping interval on disconnect
req.on('close', () => {
if (pingInterval) clearInterval(pingInterval);
});
});
// 2. Room management endpoints
app.post('/rooms/:roomId/join', authenticateToken, (req, res) => {
const { roomId } = req.params;
const userId = req.user.id;
// Get user's connections
const userConnections = connectionManager.getConnectionsByUser(userId);
// Join all user connections to the room
let joinedCount = 0;
userConnections.forEach(connection => {
if (connectionManager.joinRoom(connection.id, roomId)) {
joinedCount++;
// Send confirmation to this connection
const joinEvent = {
type: 'room-joined',
data: {
roomId: roomId,
userId: userId,
timestamp: new Date().toISOString()
}
};
connection.response.write(`event: room-joined\ndata: ${JSON.stringify(joinEvent)}\n\n`);
}
});
res.json({
success: true,
roomId: roomId,
connectionsJoined: joinedCount
});
});
app.post('/rooms/:roomId/leave', authenticateToken, (req, res) => {
const { roomId } = req.params;
const userId = req.user.id;
const userConnections = connectionManager.getConnectionsByUser(userId);
let leftCount = 0;
userConnections.forEach(connection => {
if (connectionManager.leaveRoom(connection.id, roomId)) {
leftCount++;
const leaveEvent = {
type: 'room-left',
data: {
roomId: roomId,
userId: userId,
timestamp: new Date().toISOString()
}
};
connection.response.write(`event: room-left\ndata: ${JSON.stringify(leaveEvent)}\n\n`);
}
});
res.json({
success: true,
roomId: roomId,
connectionsLeft: leftCount
});
});
// 3. Advanced broadcasting with filtering
function broadcastEvent(eventType, data, options = {}) {
const {
roomId = null,
userId = null,
connectionId = null,
metadata = {},
priority = 'normal'
} = options;
const message = {
type: eventType,
data: data,
timestamp: new Date().toISOString(),
serverTime: Date.now(),
id: generateEventId(),
priority: priority
};
let targetConnections = [];
if (connectionId) {
// Send to specific connection
const conn = connectionManager.connectionMetadata.get(connectionId);
if (conn) targetConnections.push(conn);
} else if (userId) {
// Send to all connections for a user
targetConnections = Array.from(connectionManager.getConnectionsByUser(userId));
} else if (roomId) {
// Send to all connections in a room
targetConnections = connectionManager.getConnectionsByRoom(roomId);
} else {
// Send to all connections
targetConnections = connectionManager.getAllConnections();
}
// Apply metadata filters
if (metadata.filters) {
targetConnections = targetConnections.filter(conn => {
return Object.entries(metadata.filters).every(([key, value]) => {
return conn.metadata[key] === value;
});
});
}
// Send event
let sentCount = 0;
targetConnections.forEach(connection => {
try {
const eventData = {
...message,
connectionId: connection.id,
roomId: roomId || null,
userId: connection.userId
};
// Add custom event name if provided
const eventName = metadata.eventName || eventType;
connection.response.write(`event: ${eventName}\ndata: ${JSON.stringify(eventData)}\n\n`);
connectionManager.updateActivity(connection.id);
sentCount++;
} catch (error) {
console.error(`Error broadcasting to ${connection.id}:`, error);
connectionManager.removeConnection(connection.id);
}
});
// Store event in Redis for replay/debug
redis.lpush('events:history', JSON.stringify({
...message,
sentCount: sentCount,
targetCount: targetConnections.length
}));
redis.ltrim('events:history', 0, 999); // Keep last 1000 events
console.log(`Broadcasted '${eventType}' to ${sentCount} connections`);
return sentCount;
}
// 4. Event broadcasting endpoints
app.post('/broadcast', authenticateToken, (req, res) => {
const { eventType, data, options = {} } = req.body;
const sentCount = broadcastEvent(eventType, data, {
...options,
metadata: {
...options.metadata,
senderId: req.user.id,
senderRole: req.user.role
}
});
res.json({
success: true,
eventType: eventType,
sentTo: sentCount
});
});
app.post('/broadcast-room/:roomId', authenticateToken, (req, res) => {
const { roomId } = req.params;
const { eventType, data } = req.body;
const sentCount = broadcastEvent(eventType, data, {
roomId: roomId,
metadata: {
senderId: req.user.id,
roomId: roomId
}
});
res.json({
success: true,
roomId: roomId,
eventType: eventType,
sentTo: sentCount
});
});
// 5. Event replay for new connections
app.get('/events/replay/:lastEventId', authenticateToken, (req, res) => {
const { lastEventId } = req.params;
const userId = req.user.id;
// Get events from Redis (simplified - in production use a proper event store)
redis.lrange('events:history', 0, 99, (err, events) => {
if (err) {
return res.status(500).json({ error: 'Failed to fetch events' });
}
const parsedEvents = events.map(e => JSON.parse(e));
const filteredEvents = parsedEvents.filter(event => {
// Filter events relevant to this user or public events
return event.data.userId === userId || !event.data.userId;
});
res.json({
events: filteredEvents,
lastEventId: lastEventId
});
});
});
// 6. Connection statistics
app.get('/stats', authenticateToken, (req, res) => {
const stats = {
totalConnections: connectionManager.getAllConnections().length,
totalRooms: connectionManager.rooms.size,
activeUsers: connectionManager.connections.size,
roomStats: {}
};
// Add room-specific stats
for (const [roomId, connections] of connectionManager.rooms) {
stats.roomStats[roomId] = connections.size;
}
res.json(stats);
});
// 7. Live data simulations
function startLiveSimulations() {
// User activity feed
setInterval(() => {
const activities = [
'User logged in',
'Post published',
'Comment added',
'File uploaded',
'Task completed'
];
const activity = activities[Math.floor(Math.random() * activities.length)];
broadcastEvent('user-activity', {
activity: activity,
userId: `user_${Math.floor(Math.random() * 100)}`,
timestamp: new Date().toISOString()
});
}, 3000);
// System metrics
setInterval(() => {
const metrics = {
cpu: Math.random() * 100,
memory: Math.random() * 100,
disk: Math.random() * 100,
network: Math.random() * 1000
};
broadcastEvent('system-metrics', metrics, {
roomId: 'admin'
});
}, 5000);
// Live chat simulation
setInterval(() => {
const messages = [
'Hello everyone!',
'How are you doing?',
'Great weather today',
'Anyone working on the new feature?',
'Coffee break time!'
];
const message = messages[Math.floor(Math.random() * messages.length)];
const userId = `user_${Math.floor(Math.random() * 10)}`;
broadcastEvent('chat-message', {
message: message,
userId: userId,
username: `User_${userId}`,
timestamp: new Date().toISOString()
}, {
roomId: 'chat'
});
}, 4000);
}
// Utility function to generate unique event IDs
function generateEventId() {
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
// Start background simulations
startLiveSimulations();
app.listen(PORT, () => {
console.log(`Advanced SSE Server running on http://localhost:${PORT}`);
});
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('\nShutting down Advanced SSE server...');
// Close all connections
connectionManager.getAllConnections().forEach(connection => {
try {
connection.response.write('event: shutdown\ndata: {"message": "Server shutting down"}\n\n');
connection.response.end();
} catch (error) {
// Ignore errors during shutdown
}
});
// Close Redis connection
redis.quit();
setTimeout(() => {
process.exit(0);
}, 1000);
});
module.exports = {
app,
connectionManager,
broadcastEvent
};
💻 Python Flask SSE Implementation python
Server-Sent Events implementation using Python Flask with Redis pub/sub for scalable real-time applications
# Server-Sent Events with Python Flask
# pip install flask flask-redis redis gevent
from flask import Flask, Response, jsonify, request, stream_template
from flask_redis import FlaskRedis
import redis as redis_py
import json
import time
import threading
import uuid
from datetime import datetime, timedelta
import queue
from functools import wraps
import jwt
from gevent import monkey
monkey.patch_all()
app = Flask(__name__)
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['REDIS_URL'] = 'redis://localhost:6379/0'
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
# Redis connections
redis_client = FlaskRedis(app)
pubsub = redis_py.Redis.from_url('redis://localhost:6379/0').pubsub()
# In-memory connection storage (use Redis in production)
connections = {}
rooms = {}
class Connection:
def __init__(self, connection_id, user_id, response_queue):
self.id = connection_id
self.user_id = user_id
self.queue = queue.Queue()
self.created_at = datetime.now()
self.last_activity = datetime.now()
self.rooms = set()
self.is_active = True
def send_event(self, event_type, data, event_id=None):
"""Send event to this connection"""
event = {
'id': event_id or str(uuid.uuid4()),
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat()
}
self.queue.put(event)
self.last_activity = datetime.now()
def join_room(self, room_id):
"""Join a room"""
self.rooms.add(room_id)
if room_id not in rooms:
rooms[room_id] = set()
rooms[room_id].add(self.id)
def leave_room(self, room_id):
"""Leave a room"""
self.rooms.discard(room_id)
if room_id in rooms:
rooms[room_id].discard(self.id)
if not rooms[room_id]:
del rooms[room_id]
# Authentication decorator
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(" ")[1]
except IndexError:
return jsonify({'message': 'Bearer token malformed'}), 401
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
data = jwt.decode(token, app.config['JWT_SECRET_KEY'], algorithms=["HS256"])
current_user_id = data['user_id']
except Exception as e:
return jsonify({'message': 'Token is invalid!'}), 401
return f(current_user_id, *args, **kwargs)
return decorated
# 1. Authentication endpoints
@app.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
# Simple authentication (replace with real auth logic)
if username and password:
token = jwt.encode({
'user_id': username,
'exp': datetime.utcnow() + timedelta(hours=24)
}, app.config['JWT_SECRET_KEY'], algorithm="HS256")
return jsonify({
'token': token,
'user': {'id': username, 'username': username}
})
return jsonify({'message': 'Invalid credentials'}), 401
# 2. Main SSE endpoint
@app.route('/events')
@token_required
def events(current_user_id):
connection_id = str(uuid.uuid4())
# Create new connection
connection = Connection(connection_id, current_user_id, queue.Queue())
connections[connection_id] = connection
# Join user-specific room
connection.join_room(f'user_{current_user_id}')
def generate():
try:
# Send initial connection event
yield f"event: connected\n"
yield f"data: {json.dumps({{'type': 'connected', 'connection_id': connection_id, 'user_id': current_user_id}})}\n\n"
# Subscribe to Redis channels
pubsub.subscribe([
f'user_{current_user_id}',
'global',
'notifications'
])
# Listen for Redis messages
while connection.is_active:
# Check for Redis messages
message = pubsub.get_message(timeout=0.1)
if message and message['type'] == 'message':
try:
event_data = json.loads(message['data'].decode('utf-8'))
event_type = event_data.get('type', 'message')
# Send event to client
yield f"event: {event_type}\n"
yield f"data: {json.dumps(event_data)}\n\n"
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error parsing Redis message: {e}")
# Check for queued events
try:
while not connection.queue.empty():
event = connection.queue.get_nowait()
yield f"event: {event['type']}\n"
yield f"data: {json.dumps(event)}\n\n"
except queue.Empty:
pass
# Send keep-alive ping
yield ": keep-alive\n\n"
time.sleep(1)
except GeneratorExit:
print(f"Client disconnected: {connection_id}")
finally:
# Cleanup
cleanup_connection(connection_id)
pubsub.unsubscribe()
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
}
)
def cleanup_connection(connection_id):
"""Clean up connection when disconnected"""
if connection_id in connections:
connection = connections[connection_id]
# Leave all rooms
for room_id in list(connection.rooms):
connection.leave_room(room_id)
# Remove from connections
del connections[connection_id]
print(f"Cleaned up connection: {connection_id}")
# 3. Room management
@app.route('/rooms/<room_id>/join', methods=['POST'])
@token_required
def join_room(current_user_id, room_id):
connection_id = request.headers.get('X-Connection-ID')
if connection_id in connections:
connection = connections[connection_id]
connection.join_room(room_id)
# Subscribe to Redis room channel
pubsub.subscribe(room_id)
# Notify room members
broadcast_to_room(room_id, 'user_joined', {
'user_id': current_user_id,
'room_id': room_id
}, exclude_connection=connection_id)
return jsonify({'message': f'Joined room {room_id}'})
return jsonify({'message': 'Connection not found'}), 404
@app.route('/rooms/<room_id>/leave', methods=['POST'])
@token_required
def leave_room(current_user_id, room_id):
connection_id = request.headers.get('X-Connection-ID')
if connection_id in connections:
connection = connections[connection_id]
connection.leave_room(room_id)
# Unsubscribe from Redis room channel
pubsub.unsubscribe(room_id)
# Notify room members
broadcast_to_room(room_id, 'user_left', {
'user_id': current_user_id,
'room_id': room_id
}, exclude_connection=connection_id)
return jsonify({'message': f'Left room {room_id}'})
return jsonify({'message': 'Connection not found'}), 404
# 4. Broadcasting functions
def broadcast_to_user(user_id, event_type, data):
"""Broadcast event to specific user"""
room_id = f'user_{user_id}'
event_data = {
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat(),
'target_user': user_id
}
# Publish to Redis
redis_client.publish(room_id, json.dumps(event_data))
def broadcast_to_room(room_id, event_type, data, exclude_connection=None):
"""Broadcast event to room members"""
event_data = {
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat(),
'room_id': room_id
}
# Send to in-memory connections
if room_id in rooms:
for conn_id in rooms[room_id]:
if conn_id != exclude_connection and conn_id in connections:
connections[conn_id].send_event(event_type, data)
# Publish to Redis for distributed systems
redis_client.publish(room_id, json.dumps(event_data))
def broadcast_global(event_type, data):
"""Broadcast event to all connected clients"""
event_data = {
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat()
}
redis_client.publish('global', json.dumps(event_data))
# 5. Event broadcasting endpoints
@app.route('/broadcast', methods=['POST'])
@token_required
def broadcast_event(current_user_id):
data = request.get_json()
event_type = data.get('type', 'message')
event_data = data.get('data', {})
broadcast_global(event_type, event_data)
return jsonify({'message': 'Event broadcasted'})
@app.route('/broadcast/room/<room_id>', methods=['POST'])
@token_required
def broadcast_to_room_endpoint(current_user_id, room_id):
data = request.get_json()
event_type = data.get('type', 'message')
event_data = data.get('data', {})
broadcast_to_room(room_id, event_type, event_data)
return jsonify({'message': f'Event sent to room {room_id}'})
@app.route('/broadcast/user/<user_id>', methods=['POST'])
@token_required
def broadcast_to_user_endpoint(current_user_id, user_id):
data = request.get_json()
event_type = data.get('type', 'message')
event_data = data.get('data', {})
broadcast_to_user(user_id, event_type, event_data)
return jsonify({'message': f'Event sent to user {user_id}'})
# 6. Real-time features
def start_time_updates():
"""Start broadcasting time updates"""
def time_loop():
while True:
time_data = {
'time': datetime.now().strftime('%H:%M:%S'),
'date': datetime.now().strftime('%Y-%m-%d'),
'timestamp': datetime.now().isoformat()
}
broadcast_global('time_update', time_data)
time.sleep(1)
thread = threading.Thread(target=time_loop, daemon=True)
thread.start()
def start_notification_simulation():
"""Simulate random notifications"""
def notification_loop():
notifications = [
{'message': 'New message received', 'level': 'info'},
{'message': 'System update available', 'level': 'warning'},
{'message': 'Task completed successfully', 'level': 'success'},
{'message': 'Server maintenance scheduled', 'level': 'info'},
{'message': 'Security scan completed', 'level': 'success'}
]
while True:
notification = notifications[int(time.time()) % len(notifications)]
notification['id'] = str(uuid.uuid4())
notification['timestamp'] = datetime.now().isoformat()
broadcast_global('notification', notification)
time.sleep(7)
thread = threading.Thread(target=notification_loop, daemon=True)
thread.start()
def start_stock_price_simulation():
"""Simulate stock price updates"""
def stock_loop():
stocks = {
'AAPL': {'price': 150.0, 'change': 0.0},
'GOOGL': {'price': 2800.0, 'change': 0.0},
'MSFT': {'price': 350.0, 'change': 0.0},
'AMZN': {'price': 3200.0, 'change': 0.0}
}
while True:
for symbol, stock in stocks.items():
# Random price change
change_percent = (time.time() % 100 - 50) / 1000
old_price = stock['price']
new_price = old_price * (1 + change_percent)
stock['price'] = new_price
stock['change'] = new_price - old_price
stock_data = {
'symbol': symbol,
'price': round(new_price, 2),
'change': round(stock['change'], 2),
'change_percent': round(change_percent * 100, 2),
'timestamp': datetime.now().isoformat()
}
broadcast_to_room('stocks', 'stock_update', stock_data)
time.sleep(2)
thread = threading.Thread(target=stock_loop, daemon=True)
thread.start()
# 7. Statistics and monitoring
@app.route('/stats')
def get_stats():
stats = {
'active_connections': len(connections),
'active_rooms': len(rooms),
'room_details': {}
}
for room_id, connection_ids in rooms.items():
stats['room_details'][room_id] = len(connection_ids)
return jsonify(stats)
@app.route('/health')
def health_check():
"""Health check endpoint"""
try:
# Check Redis connection
redis_client.ping()
return jsonify({'status': 'healthy'}), 200
except Exception as e:
return jsonify({'status': 'unhealthy', 'error': str(e)}), 503
# 8. HTML Client Interface
@app.route('/')
def index():
return '''
<!DOCTYPE html>
<html>
<head>
<title>Python Flask SSE Demo</title>
<style>
body { font-family: Arial, sans-serif; max-width: 1000px; margin: 0 auto; padding: 20px; }
.container { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; }
.panel { border: 1px solid #ddd; border-radius: 5px; padding: 15px; }
.panel h3 { margin-top: 0; }
.event { margin: 5px 0; padding: 8px; border-radius: 3px; font-family: monospace; font-size: 12px; }
.time_update { background: #e3f2fd; }
.notification { background: #fff3e0; }
.stock_update { background: #f3e5f5; }
.message { background: #e8f5e8; }
.user_joined { background: #e1f5fe; }
.user_left { background: #fce4ec; }
.controls { margin: 10px 0; }
button { margin: 5px; padding: 8px 12px; cursor: pointer; }
input, select { margin: 5px; padding: 8px; }
.stocks { display: grid; grid-template-columns: repeat(2, 1fr); gap: 10px; }
.stock { padding: 10px; border: 1px solid #ddd; border-radius: 5px; text-align: center; }
.positive { color: green; }
.negative { color: red; }
</style>
</head>
<body>
<h1>Python Flask Server-Sent Events Demo</h1>
<div class="controls">
<button onclick="getToken()">Get Token</button>
<input type="text" id="tokenInput" placeholder="JWT Token" size="50">
<button onclick="connect()">Connect to SSE</button>
<button onclick="disconnect()">Disconnect</button>
</div>
<div class="controls">
<input type="text" id="messageInput" placeholder="Message to broadcast">
<button onclick="broadcastMessage()">Broadcast</button>
<input type="text" id="roomInput" placeholder="Room name" value="chat">
<button onclick="joinRoom()">Join Room</button>
<button onclick="leaveRoom()">Leave Room</button>
</div>
<div class="container">
<div class="panel">
<h3>Event Stream</h3>
<div id="events" style="height: 400px; overflow-y: scroll;"></div>
</div>
<div class="panel">
<h3>Stock Prices</h3>
<div class="stocks" id="stocks"></div>
</div>
</div>
<script>
let eventSource = null;
let token = null;
function getToken() {
fetch('/auth/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ username: 'demo', password: 'demo' })
})
.then(response => response.json())
.then(data => {
token = data.token;
document.getElementById('tokenInput').value = token;
});
}
function connect() {
token = document.getElementById('tokenInput').value;
if (!token) {
alert('Please get a token first');
return;
}
if (eventSource) {
eventSource.close();
}
eventSource = new EventSource('/events', {
headers: {
'Authorization': 'Bearer ' + token
}
});
eventSource.onopen = function() {
addEvent('connected', 'Connected to SSE stream');
};
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
addEvent(data.type, data);
};
eventSource.addEventListener('time_update', function(event) {
const data = JSON.parse(event.data);
document.title = 'SSE Demo - ' + data.data.time;
});
eventSource.addEventListener('stock_update', function(event) {
const data = JSON.parse(event.data);
updateStock(data.data);
});
eventSource.onerror = function() {
addEvent('error', 'Connection error');
};
}
function disconnect() {
if (eventSource) {
eventSource.close();
eventSource = null;
addEvent('disconnected', 'Disconnected from SSE');
}
}
function broadcastMessage() {
const message = document.getElementById('messageInput').value;
if (message && token) {
fetch('/broadcast', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token
},
body: JSON.stringify({
type: 'message',
data: { message: message }
})
});
document.getElementById('messageInput').value = '';
}
}
function joinRoom() {
const roomName = document.getElementById('roomInput').value;
if (roomName && token) {
fetch('/rooms/' + roomName + '/join', {
method: 'POST',
headers: {
'Authorization': 'Bearer ' + token
}
});
}
}
function leaveRoom() {
const roomName = document.getElementById('roomInput').value;
if (roomName && token) {
fetch('/rooms/' + roomName + '/leave', {
method: 'POST',
headers: {
'Authorization': 'Bearer ' + token
}
});
}
}
function addEvent(type, data) {
const eventsDiv = document.getElementById('events');
const eventDiv = document.createElement('div');
eventDiv.className = 'event ' + type;
const time = new Date().toLocaleTimeString();
let content = '';
if (typeof data === 'object') {
content = JSON.stringify(data, null, 2);
} else {
content = data;
}
eventDiv.innerHTML = '<strong>' + time + ' - ' + type + ':</strong><br>' + content;
eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
// Keep only last 100 events
while (eventsDiv.children.length > 100) {
eventsDiv.removeChild(eventsDiv.lastChild);
}
}
function updateStock(stock) {
const stockElement = document.getElementById('stock_' + stock.symbol);
if (!stockElement) {
const newStockElement = document.createElement('div');
newStockElement.id = 'stock_' + stock.symbol;
newStockElement.className = 'stock';
document.getElementById('stocks').appendChild(newStockElement);
}
const element = document.getElementById('stock_' + stock.symbol);
const changeClass = stock.change >= 0 ? 'positive' : 'negative';
const changeSymbol = stock.change >= 0 ? '+' : '';
element.innerHTML = `
<strong>${stock.symbol}</strong><br>
${stock.price}<br>
<span class="${changeClass}">${changeSymbol}${stock.change} (${stock.change_percent}%)</span>
`;
}
// Auto-connect on load
window.onload = function() {
getToken();
setTimeout(connect, 500);
};
</script>
</body>
</html>
'''
# Start background threads
start_time_updates()
start_notification_simulation()
start_stock_price_simulation()
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000, threaded=True)