🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Server-Sent Events (SSE)
Exemples de Server-Sent Events pour communication unidirectionnelle en temps réel du serveur au client, incluant les mises à jour en direct, notifications et streaming de données
💻 Bases des Server-Sent Events javascript
🟢 simple
⭐⭐
Introduction à SSE avec implémentation de base du serveur et patterns de consommation du client
⏱️ 15 min
🏷️ sse, real-time, streaming, notifications
Prerequisites:
Node.js, Express.js, HTTP basics
// Server-Sent Events (SSE) Basic Examples
// Demonstrates real-time server-to-client communication
const express = require('express');
const cors = require('cors');
const app = express();
const PORT = 3000;
// Middleware
app.use(cors());
app.use(express.json());
// Store active SSE connections
const connections = new Set();
// 1. Basic SSE Endpoint
app.get('/events', (req, res) => {
console.log('New SSE connection established');
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
});
// Send initial connection message
res.write('data: {"type": "connected", "message": "Connected to SSE stream"}\n\n');
// Add connection to active connections
connections.add(res);
// Handle client disconnect
req.on('close', () => {
console.log('SSE connection closed');
connections.delete(res);
});
// Send keep-alive ping every 30 seconds
const pingInterval = setInterval(() => {
res.write(': keep-alive\n\n');
}, 30000);
req.on('close', () => {
clearInterval(pingInterval);
});
});
// 2. Send event to all connected clients
function broadcastEvent(eventType, data) {
const message = JSON.stringify({
type: eventType,
data: data,
timestamp: new Date().toISOString()
});
connections.forEach(connection => {
try {
connection.write(`event: ${eventType}\ndata: ${message}\n\n`);
} catch (error) {
console.error('Error sending event:', error);
connections.delete(connection);
}
});
}
// 3. Send regular time updates
function startTimeUpdates() {
setInterval(() => {
broadcastEvent('time', {
time: new Date().toLocaleTimeString(),
date: new Date().toLocaleDateString()
});
}, 1000);
}
// 4. Send random notifications
function startNotifications() {
const notifications = [
'New order received!',
'Server backup completed',
'New user registered',
'System update available',
'Security scan finished'
];
setInterval(() => {
const notification = notifications[Math.floor(Math.random() * notifications.length)];
broadcastEvent('notification', {
message: notification,
level: Math.random() > 0.5 ? 'info' : 'success'
});
}, 5000);
}
// 5. REST API endpoints to trigger events
app.post('/send-message', (req, res) => {
const { message, type = 'custom' } = req.body;
broadcastEvent(type, { message });
res.json({ success: true, message: 'Event broadcasted' });
});
app.post('/trigger-alert', (req, res) => {
const { level = 'warning', message } = req.body;
broadcastEvent('alert', { level, message });
res.json({ success: true, alert: 'Alert sent' });
});
// 6. Stock price simulation
function startStockSimulation() {
const stocks = {
'AAPL': { price: 150, change: 0 },
'GOOGL': { price: 2800, change: 0 },
'MSFT': { price: 350, change: 0 },
'AMZN': { price: 3200, change: 0 }
};
setInterval(() => {
Object.keys(stocks).forEach(symbol => {
const stock = stocks[symbol];
const oldPrice = stock.price;
const change = (Math.random() - 0.5) * 10;
stock.price = Math.max(0, oldPrice + change);
stock.change = stock.price - oldPrice;
broadcastEvent('stock-update', {
symbol: symbol,
price: stock.price.toFixed(2),
change: stock.change.toFixed(2),
changePercent: ((stock.change / oldPrice) * 100).toFixed(2)
});
});
}, 2000);
}
// HTML Client Example
app.get('/', (req, res) => {
res.send(`
<!DOCTYPE html>
<html>
<head>
<title>SSE Demo</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
.event { margin: 10px 0; padding: 10px; border-radius: 5px; }
.connected { background: #e8f5e8; }
.time { background: #e3f2fd; }
.notification { background: #fff3e0; }
.alert { background: #ffebee; }
.stock-update { background: #f3e5f5; }
.custom { background: #e0f2f1; }
.controls { margin: 20px 0; }
button { margin: 5px; padding: 10px 15px; cursor: pointer; }
input { margin: 5px; padding: 8px; }
.stocks { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 10px; }
.stock { padding: 10px; border: 1px solid #ddd; border-radius: 5px; }
.positive { color: green; }
.negative { color: red; }
</style>
</head>
<body>
<h1>Server-Sent Events Demo</h1>
<div id="status" class="event">Connecting...</div>
<div class="controls">
<input type="text" id="messageInput" placeholder="Enter message">
<button onclick="sendMessage()">Send Message</button>
<button onclick="triggerAlert()">Trigger Alert</button>
</div>
<div class="stocks" id="stocks"></div>
<h3>Event Stream:</h3>
<div id="events"></div>
<script>
const eventSource = new EventSource('/events');
const eventsDiv = document.getElementById('events');
const statusDiv = document.getElementById('status');
const stocksDiv = document.getElementById('stocks');
eventSource.onopen = function(event) {
statusDiv.innerHTML = '<div class="event connected">✅ Connected to SSE stream</div>';
addEvent('connected', 'Connection established');
};
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
addEvent(data.type, data.data);
};
eventSource.addEventListener('time', function(event) {
const data = JSON.parse(event.data);
document.title = `SSE Demo - ${data.data.time}`;
});
eventSource.addEventListener('notification', function(event) {
const data = JSON.parse(event.data);
showNotification(data.data.message, data.data.level);
});
eventSource.addEventListener('alert', function(event) {
const data = JSON.parse(event.data);
showAlert(data.data.message, data.data.level);
});
eventSource.addEventListener('stock-update', function(event) {
const data = JSON.parse(event.data);
updateStock(data.data);
});
function addEvent(type, data) {
const eventDiv = document.createElement('div');
eventDiv.className = `event ${type}`;
eventDiv.innerHTML = `
<strong>${new Date().toLocaleTimeString()} - ${type.toUpperCase()}:</strong>
${typeof data === 'object' ? JSON.stringify(data, null, 2) : data}
`;
eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
// Keep only last 50 events
while (eventsDiv.children.length > 50) {
eventsDiv.removeChild(eventsDiv.lastChild);
}
}
function showNotification(message, level) {
if (Notification.permission === 'granted') {
new Notification('SSE Notification', {
body: message,
icon: '/icon.png'
});
}
}
function showAlert(message, level) {
alert(`Alert (${level}): ${message}`);
}
function updateStock(stock) {
const stockElement = document.getElementById(`stock-${stock.symbol}`);
if (!stockElement) {
const newStockElement = document.createElement('div');
newStockElement.id = `stock-${stock.symbol}`;
newStockElement.className = 'stock';
stocksDiv.appendChild(newStockElement);
}
const element = document.getElementById(`stock-${stock.symbol}`);
const changeClass = stock.change >= 0 ? 'positive' : 'negative';
const changeSymbol = stock.change >= 0 ? '+' : '';
element.innerHTML = `
<strong>${stock.symbol}</strong><br>
$${stock.price}<br>
<span class="${changeClass}">${changeSymbol}${stock.change} (${stock.changePercent}%)</span>
`;
}
function sendMessage() {
const input = document.getElementById('messageInput');
const message = input.value.trim();
if (message) {
fetch('/send-message', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, type: 'custom' })
});
input.value = '';
}
}
function triggerAlert() {
fetch('/trigger-alert', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
level: 'warning',
message: 'This is a test alert!'
})
});
}
// Request notification permission
if ('Notification' in window && Notification.permission === 'default') {
Notification.requestPermission();
}
// Handle connection errors
eventSource.onerror = function(event) {
statusDiv.innerHTML = '<div class="event alert">❌ Connection error. Retrying...</div>';
};
// Handle Enter key in message input
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
`);
});
// Start background processes
startTimeUpdates();
startNotifications();
startStockSimulation();
app.listen(PORT, () => {
console.log(`SSE Server running on http://localhost:${PORT}`);
});
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down SSE server...');
connections.forEach(connection => {
connection.end();
});
process.exit(0);
});
module.exports = { app, broadcastEvent };
💻 Patterns Avancés SSE javascript
🟡 intermediate
⭐⭐⭐⭐
Patterns avancés de Server-Sent Events incluant authentification, streaming basé sur salles, gestion du backpressure et stratégies de reconnexion
⏱️ 30 min
🏷️ sse, real-time, authentication, rooms, enterprise
Prerequisites:
Node.js, Express.js, JWT, Redis, Advanced JavaScript
// Advanced Server-Sent Events Patterns
// Enterprise-grade SSE implementation with authentication, rooms, and advanced features
const express = require('express');
const jwt = require('jsonwebtoken');
const Redis = require('ioredis');
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
const compression = require('compression');
const app = express();
const PORT = 3000;
// Configuration
const JWT_SECRET = process.env.JWT_SECRET || 'your-secret-key';
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
// Redis client for distributed systems
const redis = new Redis(REDIS_URL);
// Middleware
app.use(helmet());
app.use(compression());
app.use(express.json());
// Rate limiting
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // limit each IP to 100 requests per windowMs
message: 'Too many requests from this IP'
});
app.use('/events', limiter);
// Connection management
class ConnectionManager {
constructor() {
this.connections = new Map(); // userId -> Set of connections
this.rooms = new Map(); // roomId -> Set of userIds
this.connectionMetadata = new Map(); // connectionId -> metadata
}
addConnection(connectionId, res, userId, metadata = {}) {
const connection = {
id: connectionId,
response: res,
userId: userId,
rooms: new Set(),
lastActivity: Date.now(),
metadata: metadata,
isAlive: true
};
// Store connection
if (!this.connections.has(userId)) {
this.connections.set(userId, new Set());
}
this.connections.get(userId).add(connection);
this.connectionMetadata.set(connectionId, connection);
// Store in Redis for distributed systems
redis.hset(`connections:${userId}`, connectionId, JSON.stringify({
connectionId,
rooms: Array.from(connection.rooms),
lastActivity: connection.lastActivity,
metadata: connection.metadata
}));
console.log(`Connection added: ${connectionId} for user ${userId}`);
return connection;
}
removeConnection(connectionId) {
const connection = this.connectionMetadata.get(connectionId);
if (!connection) return;
// Remove from user connections
const userConnections = this.connections.get(connection.userId);
if (userConnections) {
userConnections.delete(connection);
if (userConnections.size === 0) {
this.connections.delete(connection.userId);
}
}
// Remove from rooms
connection.rooms.forEach(roomId => {
this.leaveRoom(connectionId, roomId);
});
// Remove from metadata and Redis
this.connectionMetadata.delete(connectionId);
redis.hdel(`connections:${connection.userId}`, connectionId);
console.log(`Connection removed: ${connectionId}`);
}
joinRoom(connectionId, roomId) {
const connection = this.connectionMetadata.get(connectionId);
if (!connection) return false;
// Add connection to room
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}
this.rooms.get(roomId).add(connectionId);
connection.rooms.add(roomId);
// Update Redis
redis.sadd(`room:${roomId}`, connection.userId);
console.log(`Connection ${connectionId} joined room ${roomId}`);
return true;
}
leaveRoom(connectionId, roomId) {
const connection = this.connectionMetadata.get(connectionId);
if (!connection) return false;
// Remove from room
if (this.rooms.has(roomId)) {
this.rooms.get(roomId).delete(connectionId);
if (this.rooms.get(roomId).size === 0) {
this.rooms.delete(roomId);
}
}
connection.rooms.delete(roomId);
// Update Redis
redis.srem(`room:${roomId}`, connection.userId);
console.log(`Connection ${connectionId} left room ${roomId}`);
return true;
}
getConnectionsByUser(userId) {
return this.connections.get(userId) || new Set();
}
getConnectionsByRoom(roomId) {
const roomConnections = this.rooms.get(roomId) || new Set();
return Array.from(roomConnections).map(connId =>
this.connectionMetadata.get(connId)
).filter(Boolean);
}
getAllConnections() {
return Array.from(this.connectionMetadata.values());
}
updateActivity(connectionId) {
const connection = this.connectionMetadata.get(connectionId);
if (connection) {
connection.lastActivity = Date.now();
}
}
// Clean up inactive connections
cleanup() {
const now = Date.now();
const timeout = 5 * 60 * 1000; // 5 minutes
for (const [connectionId, connection] of this.connectionMetadata) {
if (now - connection.lastActivity > timeout) {
console.log(`Cleaning up inactive connection: ${connectionId}`);
connection.response.end();
this.removeConnection(connectionId);
}
}
}
}
const connectionManager = new ConnectionManager();
// Clean up every 5 minutes
setInterval(() => {
connectionManager.cleanup();
}, 5 * 60 * 1000);
// JWT Authentication middleware
function authenticateToken(req, res, next) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'Access token required' });
}
jwt.verify(token, JWT_SECRET, (err, user) => {
if (err) {
return res.status(403).json({ error: 'Invalid token' });
}
req.user = user;
next();
});
}
// Generate JWT token endpoint
app.post('/auth/login', (req, res) => {
const { username, password } = req.body;
// Simple authentication (replace with real auth logic)
if (username && password) {
const user = {
id: username,
username: username,
role: 'user'
};
const token = jwt.sign(user, JWT_SECRET, { expiresIn: '24h' });
res.json({ token, user });
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});
// 1. Advanced SSE Endpoint with Authentication
app.get('/events', authenticateToken, (req, res) => {
const connectionId = `${req.user.id}_${Date.now()}`;
const userId = req.user.id;
// Set SSE headers with additional security
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // Disable nginx buffering
'Access-Control-Allow-Origin': req.headers.origin || '*',
'Access-Control-Allow-Credentials': 'true'
});
// Add connection
const connection = connectionManager.addConnection(
connectionId,
res,
userId,
{
userAgent: req.headers['user-agent'],
ip: req.ip,
query: req.query
}
);
// Send connection event with metadata
const connectionEvent = {
type: 'connected',
data: {
connectionId: connectionId,
userId: userId,
timestamp: new Date().toISOString(),
serverTime: Date.now()
}
};
res.write(`event: connected\ndata: ${JSON.stringify(connectionEvent)}\n\n`);
// Join default rooms based on user role
if (req.user.role === 'admin') {
connectionManager.joinRoom(connectionId, 'admin');
}
connectionManager.joinRoom(connectionId, `user_${userId}`);
// Send initial data
const initialData = {
type: 'initial-data',
data: {
message: 'Welcome to the SSE stream',
userId: userId,
availableRooms: ['notifications', 'updates', 'alerts']
}
};
res.write(`event: initial-data\ndata: ${JSON.stringify(initialData)}\n\n`);
// Handle client disconnect
req.on('close', () => {
console.log(`SSE connection closed: ${connectionId}`);
connectionManager.removeConnection(connectionId);
});
req.on('aborted', () => {
console.log(`SSE connection aborted: ${connectionId}`);
connectionManager.removeConnection(connectionId);
});
// Setup keep-alive with backpressure handling
let pingInterval;
const startPing = () => {
pingInterval = setInterval(() => {
try {
// Check if connection is still writable
if (res.destroyed || res.finished) {
clearInterval(pingInterval);
return;
}
res.write(': keep-alive\n\n');
connectionManager.updateActivity(connectionId);
} catch (error) {
console.error(`Ping error for ${connectionId}:`, error);
clearInterval(pingInterval);
connectionManager.removeConnection(connectionId);
}
}, 30000);
};
startPing();
// Clean up ping interval on disconnect
req.on('close', () => {
if (pingInterval) clearInterval(pingInterval);
});
});
// 2. Room management endpoints
app.post('/rooms/:roomId/join', authenticateToken, (req, res) => {
const { roomId } = req.params;
const userId = req.user.id;
// Get user's connections
const userConnections = connectionManager.getConnectionsByUser(userId);
// Join all user connections to the room
let joinedCount = 0;
userConnections.forEach(connection => {
if (connectionManager.joinRoom(connection.id, roomId)) {
joinedCount++;
// Send confirmation to this connection
const joinEvent = {
type: 'room-joined',
data: {
roomId: roomId,
userId: userId,
timestamp: new Date().toISOString()
}
};
connection.response.write(`event: room-joined\ndata: ${JSON.stringify(joinEvent)}\n\n`);
}
});
res.json({
success: true,
roomId: roomId,
connectionsJoined: joinedCount
});
});
app.post('/rooms/:roomId/leave', authenticateToken, (req, res) => {
const { roomId } = req.params;
const userId = req.user.id;
const userConnections = connectionManager.getConnectionsByUser(userId);
let leftCount = 0;
userConnections.forEach(connection => {
if (connectionManager.leaveRoom(connection.id, roomId)) {
leftCount++;
const leaveEvent = {
type: 'room-left',
data: {
roomId: roomId,
userId: userId,
timestamp: new Date().toISOString()
}
};
connection.response.write(`event: room-left\ndata: ${JSON.stringify(leaveEvent)}\n\n`);
}
});
res.json({
success: true,
roomId: roomId,
connectionsLeft: leftCount
});
});
// 3. Advanced broadcasting with filtering
function broadcastEvent(eventType, data, options = {}) {
const {
roomId = null,
userId = null,
connectionId = null,
metadata = {},
priority = 'normal'
} = options;
const message = {
type: eventType,
data: data,
timestamp: new Date().toISOString(),
serverTime: Date.now(),
id: generateEventId(),
priority: priority
};
let targetConnections = [];
if (connectionId) {
// Send to specific connection
const conn = connectionManager.connectionMetadata.get(connectionId);
if (conn) targetConnections.push(conn);
} else if (userId) {
// Send to all connections for a user
targetConnections = Array.from(connectionManager.getConnectionsByUser(userId));
} else if (roomId) {
// Send to all connections in a room
targetConnections = connectionManager.getConnectionsByRoom(roomId);
} else {
// Send to all connections
targetConnections = connectionManager.getAllConnections();
}
// Apply metadata filters
if (metadata.filters) {
targetConnections = targetConnections.filter(conn => {
return Object.entries(metadata.filters).every(([key, value]) => {
return conn.metadata[key] === value;
});
});
}
// Send event
let sentCount = 0;
targetConnections.forEach(connection => {
try {
const eventData = {
...message,
connectionId: connection.id,
roomId: roomId || null,
userId: connection.userId
};
// Add custom event name if provided
const eventName = metadata.eventName || eventType;
connection.response.write(`event: ${eventName}\ndata: ${JSON.stringify(eventData)}\n\n`);
connectionManager.updateActivity(connection.id);
sentCount++;
} catch (error) {
console.error(`Error broadcasting to ${connection.id}:`, error);
connectionManager.removeConnection(connection.id);
}
});
// Store event in Redis for replay/debug
redis.lpush('events:history', JSON.stringify({
...message,
sentCount: sentCount,
targetCount: targetConnections.length
}));
redis.ltrim('events:history', 0, 999); // Keep last 1000 events
console.log(`Broadcasted '${eventType}' to ${sentCount} connections`);
return sentCount;
}
// 4. Event broadcasting endpoints
app.post('/broadcast', authenticateToken, (req, res) => {
const { eventType, data, options = {} } = req.body;
const sentCount = broadcastEvent(eventType, data, {
...options,
metadata: {
...options.metadata,
senderId: req.user.id,
senderRole: req.user.role
}
});
res.json({
success: true,
eventType: eventType,
sentTo: sentCount
});
});
app.post('/broadcast-room/:roomId', authenticateToken, (req, res) => {
const { roomId } = req.params;
const { eventType, data } = req.body;
const sentCount = broadcastEvent(eventType, data, {
roomId: roomId,
metadata: {
senderId: req.user.id,
roomId: roomId
}
});
res.json({
success: true,
roomId: roomId,
eventType: eventType,
sentTo: sentCount
});
});
// 5. Event replay for new connections
app.get('/events/replay/:lastEventId', authenticateToken, (req, res) => {
const { lastEventId } = req.params;
const userId = req.user.id;
// Get events from Redis (simplified - in production use a proper event store)
redis.lrange('events:history', 0, 99, (err, events) => {
if (err) {
return res.status(500).json({ error: 'Failed to fetch events' });
}
const parsedEvents = events.map(e => JSON.parse(e));
const filteredEvents = parsedEvents.filter(event => {
// Filter events relevant to this user or public events
return event.data.userId === userId || !event.data.userId;
});
res.json({
events: filteredEvents,
lastEventId: lastEventId
});
});
});
// 6. Connection statistics
app.get('/stats', authenticateToken, (req, res) => {
const stats = {
totalConnections: connectionManager.getAllConnections().length,
totalRooms: connectionManager.rooms.size,
activeUsers: connectionManager.connections.size,
roomStats: {}
};
// Add room-specific stats
for (const [roomId, connections] of connectionManager.rooms) {
stats.roomStats[roomId] = connections.size;
}
res.json(stats);
});
// 7. Live data simulations
function startLiveSimulations() {
// User activity feed
setInterval(() => {
const activities = [
'User logged in',
'Post published',
'Comment added',
'File uploaded',
'Task completed'
];
const activity = activities[Math.floor(Math.random() * activities.length)];
broadcastEvent('user-activity', {
activity: activity,
userId: `user_${Math.floor(Math.random() * 100)}`,
timestamp: new Date().toISOString()
});
}, 3000);
// System metrics
setInterval(() => {
const metrics = {
cpu: Math.random() * 100,
memory: Math.random() * 100,
disk: Math.random() * 100,
network: Math.random() * 1000
};
broadcastEvent('system-metrics', metrics, {
roomId: 'admin'
});
}, 5000);
// Live chat simulation
setInterval(() => {
const messages = [
'Hello everyone!',
'How are you doing?',
'Great weather today',
'Anyone working on the new feature?',
'Coffee break time!'
];
const message = messages[Math.floor(Math.random() * messages.length)];
const userId = `user_${Math.floor(Math.random() * 10)}`;
broadcastEvent('chat-message', {
message: message,
userId: userId,
username: `User_${userId}`,
timestamp: new Date().toISOString()
}, {
roomId: 'chat'
});
}, 4000);
}
// Utility function to generate unique event IDs
function generateEventId() {
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
// Start background simulations
startLiveSimulations();
app.listen(PORT, () => {
console.log(`Advanced SSE Server running on http://localhost:${PORT}`);
});
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('\nShutting down Advanced SSE server...');
// Close all connections
connectionManager.getAllConnections().forEach(connection => {
try {
connection.response.write('event: shutdown\ndata: {"message": "Server shutting down"}\n\n');
connection.response.end();
} catch (error) {
// Ignore errors during shutdown
}
});
// Close Redis connection
redis.quit();
setTimeout(() => {
process.exit(0);
}, 1000);
});
module.exports = {
app,
connectionManager,
broadcastEvent
};
💻 Implémentation SSE Python Flask python
🟡 intermediate
⭐⭐⭐
Implémentation Server-Sent Events utilisant Python Flask avec Redis pub/sub pour applications en temps réel scalables
⏱️ 25 min
🏷️ sse, python, flask, real-time, redis
Prerequisites:
Python, Flask, Redis, JWT, Threading
# Server-Sent Events with Python Flask
# pip install flask flask-redis redis gevent
from flask import Flask, Response, jsonify, request, stream_template
from flask_redis import FlaskRedis
import redis as redis_py
import json
import time
import threading
import uuid
from datetime import datetime
import queue
from functools import wraps
import jwt
from gevent import monkey
monkey.patch_all()
app = Flask(__name__)
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['REDIS_URL'] = 'redis://localhost:6379/0'
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
# Redis connections
redis_client = FlaskRedis(app)
pubsub = redis_py.Redis.from_url('redis://localhost:6379/0').pubsub()
# In-memory connection storage (use Redis in production)
connections = {}
rooms = {}
class Connection:
def __init__(self, connection_id, user_id, response_queue):
self.id = connection_id
self.user_id = user_id
self.queue = queue.Queue()
self.created_at = datetime.now()
self.last_activity = datetime.now()
self.rooms = set()
self.is_active = True
def send_event(self, event_type, data, event_id=None):
"""Send event to this connection"""
event = {
'id': event_id or str(uuid.uuid4()),
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat()
}
self.queue.put(event)
self.last_activity = datetime.now()
def join_room(self, room_id):
"""Join a room"""
self.rooms.add(room_id)
if room_id not in rooms:
rooms[room_id] = set()
rooms[room_id].add(self.id)
def leave_room(self, room_id):
"""Leave a room"""
self.rooms.discard(room_id)
if room_id in rooms:
rooms[room_id].discard(self.id)
if not rooms[room_id]:
del rooms[room_id]
# Authentication decorator
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(" ")[1]
except IndexError:
return jsonify({'message': 'Bearer token malformed'}), 401
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
data = jwt.decode(token, app.config['JWT_SECRET_KEY'], algorithms=["HS256"])
current_user_id = data['user_id']
except Exception as e:
return jsonify({'message': 'Token is invalid!'}), 401
return f(current_user_id, *args, **kwargs)
return decorated
# 1. Authentication endpoints
@app.route('/auth/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
# Simple authentication (replace with real auth logic)
if username and password:
token = jwt.encode({
'user_id': username,
'exp': datetime.utcnow() + timedelta(hours=24)
}, app.config['JWT_SECRET_KEY'], algorithm="HS256")
return jsonify({
'token': token,
'user': {'id': username, 'username': username}
})
return jsonify({'message': 'Invalid credentials'}), 401
# 2. Main SSE endpoint
@app.route('/events')
@token_required
def events(current_user_id):
connection_id = str(uuid.uuid4())
# Create new connection
connection = Connection(connection_id, current_user_id, queue.Queue())
connections[connection_id] = connection
# Join user-specific room
connection.join_room(f'user_{current_user_id}')
def generate():
try:
# Send initial connection event
yield f"event: connected\n"
yield f"data: {json.dumps({{'type': 'connected', 'connection_id': connection_id, 'user_id': current_user_id}})}\n\n"
# Subscribe to Redis channels
pubsub.subscribe([
f'user_{current_user_id}',
'global',
'notifications'
])
# Listen for Redis messages
while connection.is_active:
# Check for Redis messages
message = pubsub.get_message(timeout=0.1)
if message and message['type'] == 'message':
try:
event_data = json.loads(message['data'].decode('utf-8'))
event_type = event_data.get('type', 'message')
# Send event to client
yield f"event: {event_type}\n"
yield f"data: {json.dumps(event_data)}\n\n"
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error parsing Redis message: {e}")
# Check for queued events
try:
while not connection.queue.empty():
event = connection.queue.get_nowait()
yield f"event: {event['type']}\n"
yield f"data: {json.dumps(event)}\n\n"
except queue.Empty:
pass
# Send keep-alive ping
yield ": keep-alive\n\n"
time.sleep(1)
except GeneratorExit:
print(f"Client disconnected: {connection_id}")
finally:
# Cleanup
cleanup_connection(connection_id)
pubsub.unsubscribe()
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
}
)
def cleanup_connection(connection_id):
"""Clean up connection when disconnected"""
if connection_id in connections:
connection = connections[connection_id]
# Leave all rooms
for room_id in list(connection.rooms):
connection.leave_room(room_id)
# Remove from connections
del connections[connection_id]
print(f"Cleaned up connection: {connection_id}")
# 3. Room management
@app.route('/rooms/<room_id>/join', methods=['POST'])
@token_required
def join_room(current_user_id, room_id):
connection_id = request.headers.get('X-Connection-ID')
if connection_id in connections:
connection = connections[connection_id]
connection.join_room(room_id)
# Subscribe to Redis room channel
pubsub.subscribe(room_id)
# Notify room members
broadcast_to_room(room_id, 'user_joined', {
'user_id': current_user_id,
'room_id': room_id
}, exclude_connection=connection_id)
return jsonify({'message': f'Joined room {room_id}'})
return jsonify({'message': 'Connection not found'}), 404
@app.route('/rooms/<room_id>/leave', methods=['POST'])
@token_required
def leave_room(current_user_id, room_id):
connection_id = request.headers.get('X-Connection-ID')
if connection_id in connections:
connection = connections[connection_id]
connection.leave_room(room_id)
# Unsubscribe from Redis room channel
pubsub.unsubscribe(room_id)
# Notify room members
broadcast_to_room(room_id, 'user_left', {
'user_id': current_user_id,
'room_id': room_id
}, exclude_connection=connection_id)
return jsonify({'message': f'Left room {room_id}'})
return jsonify({'message': 'Connection not found'}), 404
# 4. Broadcasting functions
def broadcast_to_user(user_id, event_type, data):
"""Broadcast event to specific user"""
room_id = f'user_{user_id}'
event_data = {
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat(),
'target_user': user_id
}
# Publish to Redis
redis_client.publish(room_id, json.dumps(event_data))
def broadcast_to_room(room_id, event_type, data, exclude_connection=None):
"""Broadcast event to room members"""
event_data = {
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat(),
'room_id': room_id
}
# Send to in-memory connections
if room_id in rooms:
for conn_id in rooms[room_id]:
if conn_id != exclude_connection and conn_id in connections:
connections[conn_id].send_event(event_type, data)
# Publish to Redis for distributed systems
redis_client.publish(room_id, json.dumps(event_data))
def broadcast_global(event_type, data):
"""Broadcast event to all connected clients"""
event_data = {
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat()
}
redis_client.publish('global', json.dumps(event_data))
# 5. Event broadcasting endpoints
@app.route('/broadcast', methods=['POST'])
@token_required
def broadcast_event(current_user_id):
data = request.get_json()
event_type = data.get('type', 'message')
event_data = data.get('data', {})
broadcast_global(event_type, event_data)
return jsonify({'message': 'Event broadcasted'})
@app.route('/broadcast/room/<room_id>', methods=['POST'])
@token_required
def broadcast_to_room_endpoint(current_user_id, room_id):
data = request.get_json()
event_type = data.get('type', 'message')
event_data = data.get('data', {})
broadcast_to_room(room_id, event_type, event_data)
return jsonify({'message': f'Event sent to room {room_id}'})
@app.route('/broadcast/user/<user_id>', methods=['POST'])
@token_required
def broadcast_to_user_endpoint(current_user_id, user_id):
data = request.get_json()
event_type = data.get('type', 'message')
event_data = data.get('data', {})
broadcast_to_user(user_id, event_type, event_data)
return jsonify({'message': f'Event sent to user {user_id}'})
# 6. Real-time features
def start_time_updates():
"""Start broadcasting time updates"""
def time_loop():
while True:
time_data = {
'time': datetime.now().strftime('%H:%M:%S'),
'date': datetime.now().strftime('%Y-%m-%d'),
'timestamp': datetime.now().isoformat()
}
broadcast_global('time_update', time_data)
time.sleep(1)
thread = threading.Thread(target=time_loop, daemon=True)
thread.start()
def start_notification_simulation():
"""Simulate random notifications"""
def notification_loop():
notifications = [
{'message': 'New message received', 'level': 'info'},
{'message': 'System update available', 'level': 'warning'},
{'message': 'Task completed successfully', 'level': 'success'},
{'message': 'Server maintenance scheduled', 'level': 'info'},
{'message': 'Security scan completed', 'level': 'success'}
]
while True:
notification = notifications[int(time.time()) % len(notifications)]
notification['id'] = str(uuid.uuid4())
notification['timestamp'] = datetime.now().isoformat()
broadcast_global('notification', notification)
time.sleep(7)
thread = threading.Thread(target=notification_loop, daemon=True)
thread.start()
def start_stock_price_simulation():
"""Simulate stock price updates"""
def stock_loop():
stocks = {
'AAPL': {'price': 150.0, 'change': 0.0},
'GOOGL': {'price': 2800.0, 'change': 0.0},
'MSFT': {'price': 350.0, 'change': 0.0},
'AMZN': {'price': 3200.0, 'change': 0.0}
}
while True:
for symbol, stock in stocks.items():
# Random price change
change_percent = (time.time() % 100 - 50) / 1000
old_price = stock['price']
new_price = old_price * (1 + change_percent)
stock['price'] = new_price
stock['change'] = new_price - old_price
stock_data = {
'symbol': symbol,
'price': round(new_price, 2),
'change': round(stock['change'], 2),
'change_percent': round(change_percent * 100, 2),
'timestamp': datetime.now().isoformat()
}
broadcast_to_room('stocks', 'stock_update', stock_data)
time.sleep(2)
thread = threading.Thread(target=stock_loop, daemon=True)
thread.start()
# 7. Statistics and monitoring
@app.route('/stats')
def get_stats():
stats = {
'active_connections': len(connections),
'active_rooms': len(rooms),
'room_details': {}
}
for room_id, connection_ids in rooms.items():
stats['room_details'][room_id] = len(connection_ids)
return jsonify(stats)
@app.route('/health')
def health_check():
"""Health check endpoint"""
try:
# Check Redis connection
redis_client.ping()
return jsonify({'status': 'healthy'}), 200
except Exception as e:
return jsonify({'status': 'unhealthy', 'error': str(e)}), 503
# 8. HTML Client Interface
@app.route('/')
def index():
return '''
<!DOCTYPE html>
<html>
<head>
<title>Python Flask SSE Demo</title>
<style>
body { font-family: Arial, sans-serif; max-width: 1000px; margin: 0 auto; padding: 20px; }
.container { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; }
.panel { border: 1px solid #ddd; border-radius: 5px; padding: 15px; }
.panel h3 { margin-top: 0; }
.event { margin: 5px 0; padding: 8px; border-radius: 3px; font-family: monospace; font-size: 12px; }
.time_update { background: #e3f2fd; }
.notification { background: #fff3e0; }
.stock_update { background: #f3e5f5; }
.message { background: #e8f5e8; }
.user_joined { background: #e1f5fe; }
.user_left { background: #fce4ec; }
.controls { margin: 10px 0; }
button { margin: 5px; padding: 8px 12px; cursor: pointer; }
input, select { margin: 5px; padding: 8px; }
.stocks { display: grid; grid-template-columns: repeat(2, 1fr); gap: 10px; }
.stock { padding: 10px; border: 1px solid #ddd; border-radius: 5px; text-align: center; }
.positive { color: green; }
.negative { color: red; }
</style>
</head>
<body>
<h1>Python Flask Server-Sent Events Demo</h1>
<div class="controls">
<button onclick="getToken()">Get Token</button>
<input type="text" id="tokenInput" placeholder="JWT Token" size="50">
<button onclick="connect()">Connect to SSE</button>
<button onclick="disconnect()">Disconnect</button>
</div>
<div class="controls">
<input type="text" id="messageInput" placeholder="Message to broadcast">
<button onclick="broadcastMessage()">Broadcast</button>
<input type="text" id="roomInput" placeholder="Room name" value="chat">
<button onclick="joinRoom()">Join Room</button>
<button onclick="leaveRoom()">Leave Room</button>
</div>
<div class="container">
<div class="panel">
<h3>Event Stream</h3>
<div id="events" style="height: 400px; overflow-y: scroll;"></div>
</div>
<div class="panel">
<h3>Stock Prices</h3>
<div class="stocks" id="stocks"></div>
</div>
</div>
<script>
let eventSource = null;
let token = null;
function getToken() {
fetch('/auth/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ username: 'demo', password: 'demo' })
})
.then(response => response.json())
.then(data => {
token = data.token;
document.getElementById('tokenInput').value = token;
});
}
function connect() {
token = document.getElementById('tokenInput').value;
if (!token) {
alert('Please get a token first');
return;
}
if (eventSource) {
eventSource.close();
}
eventSource = new EventSource('/events', {
headers: {
'Authorization': 'Bearer ' + token
}
});
eventSource.onopen = function() {
addEvent('connected', 'Connected to SSE stream');
};
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
addEvent(data.type, data);
};
eventSource.addEventListener('time_update', function(event) {
const data = JSON.parse(event.data);
document.title = 'SSE Demo - ' + data.data.time;
});
eventSource.addEventListener('stock_update', function(event) {
const data = JSON.parse(event.data);
updateStock(data.data);
});
eventSource.onerror = function() {
addEvent('error', 'Connection error');
};
}
function disconnect() {
if (eventSource) {
eventSource.close();
eventSource = null;
addEvent('disconnected', 'Disconnected from SSE');
}
}
function broadcastMessage() {
const message = document.getElementById('messageInput').value;
if (message && token) {
fetch('/broadcast', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token
},
body: JSON.stringify({
type: 'message',
data: { message: message }
})
});
document.getElementById('messageInput').value = '';
}
}
function joinRoom() {
const roomName = document.getElementById('roomInput').value;
if (roomName && token) {
fetch('/rooms/' + roomName + '/join', {
method: 'POST',
headers: {
'Authorization': 'Bearer ' + token
}
});
}
}
function leaveRoom() {
const roomName = document.getElementById('roomInput').value;
if (roomName && token) {
fetch('/rooms/' + roomName + '/leave', {
method: 'POST',
headers: {
'Authorization': 'Bearer ' + token
}
});
}
}
function addEvent(type, data) {
const eventsDiv = document.getElementById('events');
const eventDiv = document.createElement('div');
eventDiv.className = 'event ' + type;
const time = new Date().toLocaleTimeString();
let content = '';
if (typeof data === 'object') {
content = JSON.stringify(data, null, 2);
} else {
content = data;
}
eventDiv.innerHTML = '<strong>' + time + ' - ' + type + ':</strong><br>' + content;
eventsDiv.insertBefore(eventDiv, eventsDiv.firstChild);
// Keep only last 100 events
while (eventsDiv.children.length > 100) {
eventsDiv.removeChild(eventsDiv.lastChild);
}
}
function updateStock(stock) {
const stockElement = document.getElementById('stock_' + stock.symbol);
if (!stockElement) {
const newStockElement = document.createElement('div');
newStockElement.id = 'stock_' + stock.symbol;
newStockElement.className = 'stock';
document.getElementById('stocks').appendChild(newStockElement);
}
const element = document.getElementById('stock_' + stock.symbol);
const changeClass = stock.change >= 0 ? 'positive' : 'negative';
const changeSymbol = stock.change >= 0 ? '+' : '';
element.innerHTML = `
<strong>${stock.symbol}</strong><br>
${stock.price}<br>
<span class="${changeClass}">${changeSymbol}${stock.change} (${stock.change_percent}%)</span>
`;
}
// Auto-connect on load
window.onload = function() {
getToken();
setTimeout(connect, 500);
};
</script>
</body>
</html>
'''
# Start background threads
start_time_updates()
start_notification_simulation()
start_stock_price_simulation()
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000, threaded=True)