🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples de Réseau Web Python
Exemples de réseau Web Python incluant les requêtes HTTP, le serveur HTTP et la programmation de sockets TCP
💻 Requêtes HTTP python
🟡 intermediate
⭐⭐⭐
Envoyer des requêtes GET, POST, PUT, DELETE en utilisant la bibliothèque requests avec gestion des erreurs et analyse des réponses
⏱️ 25 min
🏷️ python, web, networking, http
Prerequisites:
Intermediate Python, requests library
# Web Python HTTP Requests Examples
# Using requests library for HTTP communication with servers
# 1. Basic HTTP Request Methods
import requests
import json
from typing import Any, Dict, Optional
class HttpClient:
"""Simple HTTP client for making requests"""
def __init__(self, base_url: str = '', default_headers: Optional[Dict[str, str]] = None):
self.base_url = base_url
self.default_headers = {
'Content-Type': 'application/json',
**(default_headers or {})
}
def get(self, url: str, headers: Optional[Dict[str, str]] = None, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Send GET request"""
response = requests.get(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})},
params=params
)
response.raise_for_status()
return response.json()
def post(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Send POST request"""
response = requests.post(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})},
json=data
)
response.raise_for_status()
return response.json()
def put(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Send PUT request"""
response = requests.put(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})},
json=data
)
response.raise_for_status()
return response.json()
def delete(self, url: str, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Send DELETE request"""
response = requests.delete(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})}
)
response.raise_for_status()
return response.json()
def patch(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Send PATCH request"""
response = requests.patch(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})},
json=data
)
response.raise_for_status()
return response.json()
# 2. Advanced HTTP Features
class AdvancedHttpClient:
"""Advanced HTTP client with additional features"""
def get_with_params(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Send GET request with query parameters"""
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
def get_with_timeout(self, url: str, timeout: int = 5) -> Dict[str, Any]:
"""Send GET request with timeout"""
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response.json()
def post_with_files(self, url: str, files: Dict[str, Any], data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Upload files via POST request"""
response = requests.post(url, files=files, data=data)
response.raise_for_status()
return response.json()
def download_file(self, url: str, filename: str) -> None:
"""Download file from URL"""
response = requests.get(url, stream=True)
response.raise_for_status()
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
def download_file_with_progress(self, url: str, filename: str) -> None:
"""Download file with progress indicator"""
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"\rProgress: {progress:.1f}%", end='')
print() # New line after progress
def get_with_auth(self, url: str, username: str, password: str) -> Dict[str, Any]:
"""Send GET request with basic authentication"""
response = requests.get(url, auth=(username, password))
response.raise_for_status()
return response.json()
def get_with_bearer_token(self, url: str, token: str) -> Dict[str, Any]:
"""Send GET request with Bearer token"""
headers = {'Authorization': f'Bearer {token}'}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
# 3. Request with Session and Cookies
class SessionClient:
"""HTTP client with session management"""
def __init__(self):
self.session = requests.Session()
def get(self, url: str, **kwargs) -> Dict[str, Any]:
"""Send GET request using session"""
response = self.session.get(url, **kwargs)
response.raise_for_status()
return response.json()
def post(self, url: str, **kwargs) -> Dict[str, Any]:
"""Send POST request using session"""
response = self.session.post(url, **kwargs)
response.raise_for_status()
return response.json()
def set_cookie(self, name: str, value: str):
"""Set cookie in session"""
self.session.cookies.set(name, value)
def get_cookies(self) -> Dict[str, str]:
"""Get all cookies from session"""
return dict(self.session.cookies)
def close(self):
"""Close session"""
self.session.close()
# 4. Retry Logic
class RetryClient:
"""HTTP client with automatic retry logic"""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def get_with_retry(self, url: str, **kwargs) -> Dict[str, Any]:
"""Send GET request with retry on failure"""
import time
last_error = None
for attempt in range(self.max_retries + 1):
try:
response = requests.get(url, **kwargs)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
last_error = e
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt)
print(f"Retry {attempt + 1}/{self.max_retries} after {delay}s")
time.sleep(delay)
raise last_error
# 5. Response Handling
def handle_response(response: requests.Response) -> Dict[str, Any]:
"""Handle different response types"""
result = {
'status_code': response.status_code,
'headers': dict(response.headers),
'success': response.ok
}
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
result['data'] = response.json()
elif 'text/' in content_type:
result['data'] = response.text
else:
result['data'] = response.content
return result
def check_status_code(status_code: int) -> str:
"""Get status code description"""
status_codes = {
200: 'OK',
201: 'Created',
204: 'No Content',
400: 'Bad Request',
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
500: 'Internal Server Error',
502: 'Bad Gateway',
503: 'Service Unavailable'
}
return status_codes.get(status_code, 'Unknown')
# 6. API Client Builder
class ApiClient:
"""Generic REST API client"""
def __init__(self, base_url: str, auth_token: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.headers = {'Content-Type': 'application/json'}
if auth_token:
self.headers['Authorization'] = f'Bearer {auth_token}'
def get_all(self, resource: str) -> list:
"""Get all items"""
response = requests.get(f"{self.base_url}/{resource}", headers=self.headers)
response.raise_for_status()
return response.json()
def get_by_id(self, resource: str, item_id: Any) -> Dict[str, Any]:
"""Get item by ID"""
response = requests.get(f"{self.base_url}/{resource}/{item_id}", headers=self.headers)
response.raise_for_status()
return response.json()
def create(self, resource: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create new item"""
response = requests.post(
f"{self.base_url}/{resource}",
headers=self.headers,
json=data
)
response.raise_for_status()
return response.json()
def update(self, resource: str, item_id: Any, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update item"""
response = requests.put(
f"{self.base_url}/{resource}/{item_id}",
headers=self.headers,
json=data
)
response.raise_for_status()
return response.json()
def partial_update(self, resource: str, item_id: Any, data: Dict[str, Any]) -> Dict[str, Any]:
"""Partial update item"""
response = requests.patch(
f"{self.base_url}/{resource}/{item_id}",
headers=self.headers,
json=data
)
response.raise_for_status()
return response.json()
def delete(self, resource: str, item_id: Any) -> Dict[str, Any]:
"""Delete item"""
response = requests.delete(
f"{self.base_url}/{resource}/{item_id}",
headers=self.headers
)
response.raise_for_status()
return response.json()
# 7. Async HTTP Client (using aiohttp)
async def async_get_request(url: str) -> Dict[str, Any]:
"""Async GET request using aiohttp"""
try:
import aiohttp
import asyncio
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except ImportError:
print("aiohttp not installed. Install with: pip install aiohttp")
return {}
# 8. Request Logging
class LoggedHttpClient:
"""HTTP client with request logging"""
def __init__(self):
self.request_log = []
def get(self, url: str, **kwargs) -> Dict[str, Any]:
"""Send GET request with logging"""
import time
start_time = time.time()
response = requests.get(url, **kwargs)
duration = time.time() - start_time
log_entry = {
'method': 'GET',
'url': url,
'status_code': response.status_code,
'duration': duration,
'success': response.ok
}
self.request_log.append(log_entry)
response.raise_for_status()
return response.json()
def get_logs(self) -> list:
"""Get all request logs"""
return self.request_log
def get_stats(self) -> Dict[str, Any]:
"""Get request statistics"""
if not self.request_log:
return {}
total = len(self.request_log)
successful = sum(1 for log in self.request_log if log['success'])
avg_duration = sum(log['duration'] for log in self.request_log) / total
return {
'total_requests': total,
'successful': successful,
'failed': total - successful,
'avg_duration': avg_duration
}
# Usage Examples
def demonstrate_http_requests():
print("=== Web Python HTTP Requests Examples ===\n")
# 1. Basic HTTP methods
print("--- 1. Basic HTTP Methods ---")
client = HttpClient('https://jsonplaceholder.typicode.com')
try:
posts = client.get('/posts')
print(f"GET /posts: Retrieved {len(posts)} posts")
new_post = client.post('/posts', {
'title': 'Test Post',
'body': 'This is a test post',
'userId': 1
})
print(f"POST /posts: Created post with ID {new_post.get('id')}")
except requests.RequestException as e:
print(f"Error: {e}")
# 2. Advanced features
print("\n--- 2. Advanced HTTP Features ---")
advanced_client = AdvancedHttpClient()
try:
filtered_posts = advanced_client.get_with_params(
'https://jsonplaceholder.typicode.com/posts',
{'userId': 1, '_limit': 5}
)
print(f"Filtered posts: {len(filtered_posts)} results")
except requests.RequestException as e:
print(f"Error: {e}")
# 3. Session management
print("\n--- 3. Session Management ---")
session_client = SessionClient()
try:
session_client.set_cookie('session_id', 'abc123')
data = session_client.get('https://jsonplaceholder.typicode.com/posts/1')
print(f"Session cookies: {session_client.get_cookies()}")
except requests.RequestException as e:
print(f"Error: {e}")
# 4. Retry logic
print("\n--- 4. Retry Logic ---")
retry_client = RetryClient(max_retries=3)
try:
result = retry_client.get_with_retry('https://jsonplaceholder.typicode.com/posts/1')
print(f"Retry successful: {result.get('title')}")
except requests.RequestException as e:
print(f"Error after retries: {e}")
# 5. REST API client
print("\n--- 5. REST API Client ---")
api_client = ApiClient('https://jsonplaceholder.typicode.com')
try:
todos = api_client.get_all('todos')
print(f"GET /todos: {len(todos)} items")
todo = api_client.get_by_id('todos', 1)
print(f"GET /todos/1: {todo.get('title')}")
except requests.RequestException as e:
print(f"Error: {e}")
# 6. Request logging
print("\n--- 6. Request Logging ---")
logged_client = LoggedHttpClient()
try:
logged_client.get('https://jsonplaceholder.typicode.com/posts')
logged_client.get('https://jsonplaceholder.typicode.com/users')
stats = logged_client.get_stats()
print(f"Request stats: {stats}")
except requests.RequestException as e:
print(f"Error: {e}")
print("\n=== All HTTP Requests Examples Completed ===")
# Export functions
export { HttpClient, AdvancedHttpClient, SessionClient, RetryClient, ApiClient, LoggedHttpClient }
export { handle_response, check_status_code, async_get_request, demonstrate_http_requests }
💻 Serveur HTTP python
🟡 intermediate
⭐⭐⭐
Créer un serveur HTTP simple en utilisant le module http.server avec routage, gestion des requêtes et service de fichiers statiques
⏱️ 30 min
🏷️ python, web, networking, server
Prerequisites:
Intermediate Python, http.server module
# Web Python HTTP Server Examples
# Creating HTTP servers using http.server module
# 1. Basic HTTP Server
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
from typing import Dict, Any, Optional
import os
import mimetypes
import urllib.parse
class BasicHttpRequestHandler(BaseHTTPRequestHandler):
"""Basic HTTP request handler"""
def do_GET(self):
"""Handle GET requests"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
response = "<html><body><h1>Hello, World!</h1></body></html>"
self.wfile.write(response.encode())
def do_POST(self):
"""Handle POST requests"""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'received': post_data.decode(), 'status': 'success'}
self.wfile.write(json.dumps(response).encode())
def log_message(self, format: str, *args):
"""Custom log message"""
print(f"[{self.log_date_time_string()}] {format % args}")
# 2. REST API Server
class RestApiHandler(BaseHTTPRequestHandler):
"""REST API request handler with routing"""
# Simple in-memory database
database: Dict[str, Dict[str, Any]] = {}
def do_GET(self):
"""Handle GET requests"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path == '/items':
self._get_all_items()
elif path.startswith('/items/'):
item_id = path.split('/')[-1]
self._get_item(item_id)
else:
self._send_error(404, 'Not Found')
def do_POST(self):
"""Handle POST requests"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path == '/items':
self._create_item()
else:
self._send_error(404, 'Not Found')
def do_PUT(self):
"""Handle PUT requests"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path.startswith('/items/'):
item_id = path.split('/')[-1]
self._update_item(item_id)
else:
self._send_error(404, 'Not Found')
def do_DELETE(self):
"""Handle DELETE requests"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path.startswith('/items/'):
item_id = path.split('/')[-1]
self._delete_item(item_id)
else:
self._send_error(404, 'Not Found')
def _get_all_items(self):
"""Get all items"""
self._send_json_response(list(RestApiHandler.database.values()))
def _get_item(self, item_id: str):
"""Get specific item"""
if item_id in RestApiHandler.database:
self._send_json_response(RestApiHandler.database[item_id])
else:
self._send_error(404, 'Item not found')
def _create_item(self):
"""Create new item"""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode())
item_id = str(len(RestApiHandler.database) + 1)
data['id'] = item_id
RestApiHandler.database[item_id] = data
self._send_json_response(data, 201)
except json.JSONDecodeError:
self._send_error(400, 'Invalid JSON')
def _update_item(self, item_id: str):
"""Update item"""
if item_id not in RestApiHandler.database:
self._send_error(404, 'Item not found')
return
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode())
RestApiHandler.database[item_id].update(data)
self._send_json_response(RestApiHandler.database[item_id])
except json.JSONDecodeError:
self._send_error(400, 'Invalid JSON')
def _delete_item(self, item_id: str):
"""Delete item"""
if item_id in RestApiHandler.database:
del RestApiHandler.database[item_id]
self._send_json_response({'message': 'Item deleted'})
else:
self._send_error(404, 'Item not found')
def _send_json_response(self, data: Any, status_code: int = 200):
"""Send JSON response"""
self.send_response(status_code)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def _send_error(self, status_code: int, message: str):
"""Send error response"""
self._send_json_response({'error': message}, status_code)
def log_message(self, format: str, *args):
"""Suppress default logging"""
pass
# 3. Static File Server
class StaticFileHandler(BaseHTTPRequestHandler):
"""Handler for serving static files"""
def __init__(self, *args, root_dir: str = '.', **kwargs):
self.root_dir = root_dir
super().__init__(*args, **kwargs)
def do_GET(self):
"""Handle GET requests for static files"""
parsed_path = urllib.parse.urlparse(self.path)
file_path = os.path.join(self.root_dir, parsed_path.path.lstrip('/'))
# Default to index.html
if os.path.isdir(file_path):
file_path = os.path.join(file_path, 'index.html')
# Security check
if not os.path.abspath(file_path).startswith(os.path.abspath(self.root_dir)):
self._send_error(403, 'Forbidden')
return
if os.path.exists(file_path) and os.path.isfile(file_path):
self._serve_file(file_path)
else:
self._send_error(404, 'File not found')
def _serve_file(self, file_path: str):
"""Serve file content"""
content_type, _ = mimetypes.guess_type(file_path)
if content_type is None:
content_type = 'application/octet-stream'
self.send_response(200)
self.send_header('Content-type', content_type)
# Add CORS headers
self.send_header('Access-Control-Allow-Origin', '*')
# Add cache control
self.send_header('Cache-Control', 'max-age=3600')
file_size = os.path.getsize(file_path)
self.send_header('Content-Length', str(file_size))
self.end_headers()
with open(file_path, 'rb') as f:
self.wfile.write(f.read())
def _send_error(self, status_code: int, message: str):
"""Send error response"""
self.send_response(status_code)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(f"<html><body><h1>{status_code} - {message}</h1></body></html>".encode())
def log_message(self, format: str, *args):
"""Custom logging"""
pass
# 4. Multi-threaded Server
class ThreadedHTTPServer(HTTPServer):
"""Multi-threaded HTTP server"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def start_multi_threaded_server(port: int = 8000):
"""Start multi-threaded server"""
from threading import Thread
class ThreadedRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""Handle GET request"""
import threading
thread_id = threading.get_ident()
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
response = f"<html><body><h1>Hello from thread {thread_id}!</h1></body></html>"
self.wfile.write(response.encode())
def log_message(self, format: str, *args):
pass
server = ThreadedHTTPServer(('', port), ThreadedRequestHandler)
print(f"Multi-threaded server running on port {port}")
server.serve_forever()
# 5. Server with Middleware
class MiddlewareHandler(BaseHTTPRequestHandler):
"""Request handler with middleware support"""
middleware_chain = []
@classmethod
def add_middleware(cls, middleware_func):
"""Add middleware to chain"""
cls.middleware_chain.append(middleware_func)
def handle_one_request(self):
"""Override to add middleware"""
# Run middleware before handling request
for middleware in self.middleware_chain:
middleware(self)
super().handle_one_request()
def do_GET(self):
"""Handle GET request"""
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'message': 'Hello with middleware!'}
self.wfile.write(json.dumps(response).encode())
def log_message(self, format: str, *args):
pass
# Middleware functions
def logging_middleware(handler):
"""Log all requests"""
print(f"[LOG] {handler.command} {handler.path}")
def cors_middleware(handler):
"""Add CORS headers"""
def send_response(code, message=None):
handler.send_response(code, message)
handler.send_header('Access-Control-Allow-Origin', '*')
handler.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
handler.send_header('Access-Control-Allow-Headers', 'Content-Type')
# Store original send_response
handler._original_send_response = handler.send_response
handler.send_response = send_response
def auth_middleware(handler):
"""Simple authentication middleware"""
auth_header = handler.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
handler.send_response(401)
handler.end_headers()
handler.wfile.write(b'Unauthorized')
return False
return True
# 6. WebSocket-like Server (using Server-Sent Events)
class SSEHandler(BaseHTTPRequestHandler):
"""Server-Sent Events handler"""
def do_GET(self):
"""Handle SSE connection"""
if self.path == '/events':
self.send_response(200)
self.send_header('Content-type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache')
self.send_header('Connection', 'keep-alive')
self.end_headers()
# Send events
import time
for i in range(5):
event_data = f"data: Message {i}\n\n"
self.wfile.write(event_data.encode())
self.wfile.flush()
time.sleep(1)
def log_message(self, format: str, *args):
pass
# 7. Server with Request Logging
class LoggedRequestHandler(BaseHTTPRequestHandler):
"""Request handler with detailed logging"""
def log_request(self, code: int = '-', size: int = '-'):
"""Log request details"""
user_agent = self.headers.get('User-Agent', 'Unknown')
print(f"[{self.log_date_time_string()}] {self.client_address[0]} - {self.command} {self.path} - {code} - {user_agent}")
def do_GET(self):
"""Handle GET request"""
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'path': self.path,
'method': self.command,
'headers': dict(self.headers)
}
self.wfile.write(json.dumps(response, indent=2).encode())
def do_POST(self):
"""Handle POST request"""
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length) if content_length > 0 else b''
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'path': self.path,
'method': self.command,
'data': post_data.decode()
}
self.wfile.write(json.dumps(response, indent=2).encode())
# Server Management Functions
def start_basic_server(port: int = 8000):
"""Start basic HTTP server"""
server = HTTPServer(('', port), BasicHttpRequestHandler)
print(f"Basic server running on http://localhost:{port}")
server.serve_forever()
def start_rest_api_server(port: int = 8000):
"""Start REST API server"""
server = HTTPServer(('', port), RestApiHandler)
print(f"REST API server running on http://localhost:{port}")
server.serve_forever()
def start_static_server(port: int = 8000, directory: str = '.'):
"""Start static file server"""
def handler(*args, **kwargs):
StaticFileHandler(*args, root_dir=directory, **kwargs)
server = HTTPServer(('', port), handler)
print(f"Static file server running on http://localhost:{port}")
print(f"Serving directory: {os.path.abspath(directory)}")
server.serve_forever()
def start_middleware_server(port: int = 8000):
"""Start server with middleware"""
MiddlewareHandler.add_middleware(logging_middleware)
server = HTTPServer(('', port), MiddlewareHandler)
print(f"Server with middleware running on http://localhost:{port}")
server.serve_forever()
# Usage Examples
def demonstrate_http_server():
print("=== Web Python HTTP Server Examples ===\n")
print("Note: Server examples require running the server in a separate process")
print("Choose one of the following servers to start:\n")
print("--- 1. Basic Server ---")
print("start_basic_server(8000)")
print("Serves simple HTML page\n")
print("--- 2. REST API Server ---")
print("start_rest_api_server(8000)")
print("Endpoints:")
print(" GET /items - Get all items")
print(" GET /items/{id} - Get item by ID")
print(" POST /items - Create new item")
print(" PUT /items/{id} - Update item")
print(" DELETE /items/{id} - Delete item\n")
print("--- 3. Static File Server ---")
print("start_static_server(8000, './public')")
print("Serves static files from directory\n")
print("--- 4. Multi-threaded Server ---")
print("start_multi_threaded_server(8000)")
print("Handles multiple requests concurrently\n")
print("--- 5. Server with Middleware ---")
print("start_middleware_server(8000)")
print("Includes logging and CORS middleware\n")
print("--- Example Client Usage ---")
print("# Using requests library")
print("import requests")
print("")
print("# Get all items")
print("response = requests.get('http://localhost:8000/items')")
print("print(response.json())")
print("")
print("# Create new item")
print("response = requests.post('http://localhost:8000/items',")
print(" json={'name': 'Test Item', 'value': 123})")
print("print(response.json())")
print("\n=== All HTTP Server Examples Listed ===")
# Export functions and classes
export { BasicHttpRequestHandler, RestApiHandler, StaticFileHandler, SSEHandler, LoggedRequestHandler }
export { start_basic_server, start_rest_api_server, start_static_server, start_multi_threaded_server, start_middleware_server }
export { logging_middleware, cors_middleware, auth_middleware, demonstrate_http_server }
💻 Socket TCP python
🔴 complex
⭐⭐⭐⭐
Créer un client et un serveur TCP en utilisant le module socket pour communication bidirectionnelle
⏱️ 35 min
🏷️ python, web, networking, socket, tcp
Prerequisites:
Advanced Python, socket module, threading
# Web Python TCP Socket Examples
# Creating TCP clients and servers using socket module
# 1. Basic TCP Server
import socket
import threading
import json
from typing import Callable, Optional
class TcpServer:
"""Basic TCP server"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start the TCP server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"TCP server listening on {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
self.stop()
def _handle_client(self, client_socket: socket.socket, address: tuple):
"""Handle client connection"""
try:
while self.running:
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"Received from {address}: {message}")
# Echo back
response = f"Echo: {message}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"Error handling client {address}: {e}")
finally:
client_socket.close()
print(f"Connection closed: {address}")
def stop(self):
"""Stop the server"""
self.running = False
if self.server_socket:
self.server_socket.close()
# 2. Basic TCP Client
class TcpClient:
"""Basic TCP client"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.client_socket: Optional[socket.socket] = None
def connect(self):
"""Connect to server"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
print(f"Connected to {self.host}:{self.port}")
def send(self, message: str):
"""Send message to server"""
if self.client_socket:
self.client_socket.send(message.encode('utf-8'))
def receive(self, buffer_size: int = 1024) -> str:
"""Receive message from server"""
if self.client_socket:
data = self.client_socket.recv(buffer_size)
return data.decode('utf-8')
return ''
def close(self):
"""Close connection"""
if self.client_socket:
self.client_socket.close()
# 3. Chat Server
class ChatServer:
"""Multi-client chat server"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.clients: list[socket.socket] = []
self.nicknames: dict[socket.socket, str] = {}
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start chat server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"Chat server listening on {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
self.clients.append(client_socket)
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket,)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nChat server shutting down...")
finally:
self.stop()
def broadcast(self, message: str, sender_socket: Optional[socket.socket] = None):
"""Broadcast message to all clients"""
for client in self.clients:
if client != sender_socket:
try:
client.send(message.encode('utf-8'))
except:
self.clients.remove(client)
def _handle_client(self, client_socket: socket.socket):
"""Handle client communication"""
try:
# Request nickname
client_socket.send("NICK".encode('utf-8'))
nickname = client_socket.recv(1024).decode('utf-8')
self.nicknames[client_socket] = nickname
self.broadcast(f"{nickname} joined the chat!")
while self.running:
message = client_socket.recv(1024).decode('utf-8')
if message:
self.broadcast(f"{nickname}: {message}", client_socket)
else:
break
except Exception as e:
print(f"Error handling client: {e}")
finally:
if client_socket in self.clients:
nickname = self.nicknames.get(client_socket, 'Unknown')
self.clients.remove(client_socket)
self.broadcast(f"{nickname} left the chat!")
client_socket.close()
def stop(self):
"""Stop chat server"""
self.running = False
for client in self.clients:
client.close()
if self.server_socket:
self.server_socket.close()
# 4. Chat Client
class ChatClient:
"""Chat client with interactive interface"""
def __init__(self, host: str = 'localhost', port: int = 9999, nickname: str = 'Anonymous'):
self.host = host
self.port = port
self.nickname = nickname
self.client_socket: Optional[socket.socket] = None
self.running = False
def connect(self):
"""Connect to chat server"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
# Send nickname
self.client_socket.send(self.nickname.encode('utf-8'))
self.running = True
# Start receive thread
receive_thread = threading.Thread(target=self._receive_messages)
receive_thread.daemon = True
receive_thread.start()
def _receive_messages(self):
"""Receive messages from server"""
while self.running:
try:
message = self.client_socket.recv(1024).decode('utf-8')
if message:
print(f"\r{message}")
print(f"{self.nickname}: ", end='', flush=True)
except:
break
def send_message(self, message: str):
"""Send message to server"""
if self.client_socket:
self.client_socket.send(message.encode('utf-8'))
def close(self):
"""Close connection"""
self.running = False
if self.client_socket:
self.client_socket.close()
# 5. Message Protocol Server (JSON-based)
class ProtocolServer:
"""Server using JSON message protocol"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start protocol server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"Protocol server listening on {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
self.stop()
def _handle_client(self, client_socket: socket.socket, address: tuple):
"""Handle client with protocol"""
try:
while self.running:
data = client_socket.recv(4096)
if not data:
break
try:
message = json.loads(data.decode('utf-8'))
response = self._process_message(message)
client_socket.send(json.dumps(response).encode('utf-8'))
except json.JSONDecodeError:
error_response = {'status': 'error', 'message': 'Invalid JSON'}
client_socket.send(json.dumps(error_response).encode('utf-8'))
except Exception as e:
print(f"Error handling client {address}: {e}")
finally:
client_socket.close()
def _process_message(self, message: dict) -> dict:
"""Process incoming message"""
msg_type = message.get('type')
if msg_type == 'ping':
return {'type': 'pong', 'timestamp': message.get('timestamp')}
elif msg_type == 'echo':
return {'type': 'echo_response', 'data': message.get('data')}
elif msg_type == 'add':
a = message.get('a', 0)
b = message.get('b', 0)
return {'type': 'result', 'operation': 'add', 'result': a + b}
elif msg_type == 'get_time':
from datetime import datetime
return {'type': 'time', 'timestamp': datetime.now().isoformat()}
else:
return {'type': 'error', 'message': 'Unknown message type'}
def stop(self):
"""Stop server"""
self.running = False
if self.server_socket:
self.server_socket.close()
# 6. File Transfer Server
class FileTransferServer:
"""Server for file transfer"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start file transfer server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"File transfer server listening on {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
self.stop()
def _handle_client(self, client_socket: socket.socket, address: tuple):
"""Handle file transfer"""
try:
# Receive file info
file_info = client_socket.recv(1024).decode('utf-8')
filename, filesize = file_info.split('|')
filesize = int(filesize)
print(f"Receiving file: {filename} ({filesize} bytes)")
# Receive file data
received = 0
with open(filename, 'wb') as f:
while received < filesize:
data = client_socket.recv(4096)
if not data:
break
f.write(data)
received += len(data)
progress = (received / filesize) * 100
print(f"\rProgress: {progress:.1f}%", end='')
print(f"\nFile received successfully!")
client_socket.send(b"ACK")
except Exception as e:
print(f"Error: {e}")
finally:
client_socket.close()
def stop(self):
"""Stop server"""
self.running = False
if self.server_socket:
self.server_socket.close()
# 7. File Transfer Client
class FileTransferClient:
"""Client for file transfer"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.client_socket: Optional[socket.socket] = None
def connect(self):
"""Connect to server"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
print(f"Connected to {self.host}:{self.port}")
def send_file(self, filename: str):
"""Send file to server"""
import os
if not os.path.exists(filename):
print(f"File not found: {filename}")
return
filesize = os.path.getsize(filename)
# Send file info
file_info = f"{filename}|{filesize}"
self.client_socket.send(file_info.encode('utf-8'))
# Send file data
sent = 0
with open(filename, 'rb') as f:
while sent < filesize:
data = f.read(4096)
self.client_socket.send(data)
sent += len(data)
progress = (sent / filesize) * 100
print(f"\rProgress: {progress:.1f}%", end='')
print(f"\nFile sent successfully!")
# Wait for acknowledgment
ack = self.client_socket.recv(1024)
print(f"Server response: {ack.decode('utf-8')}")
def close(self):
"""Close connection"""
if self.client_socket:
self.client_socket.close()
# 8. UDP Socket (for comparison)
class UdpServer:
"""Basic UDP server"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start UDP server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((self.host, self.port))
self.running = True
print(f"UDP server listening on {self.host}:{self.port}")
try:
while self.running:
data, address = self.server_socket.recvfrom(1024)
message = data.decode('utf-8')
print(f"Received from {address}: {message}")
response = f"Echo: {message}"
self.server_socket.sendto(response.encode('utf-8'), address)
except KeyboardInterrupt:
print("\nUDP server shutting down...")
finally:
self.stop()
def stop(self):
"""Stop UDP server"""
self.running = False
if self.server_socket:
self.server_socket.close()
# Usage Examples
def demonstrate_tcp_socket():
print("=== Web Python TCP Socket Examples ===\n")
print("Note: Socket examples require running server and client in separate processes")
print("Choose one of the following examples:\n")
print("--- 1. Basic TCP Server ---")
print("# Server")
print("server = TcpServer('localhost', 9999)")
print("server.start()")
print("")
print("# Client")
print("client = TcpClient('localhost', 9999)")
print("client.connect()")
print("client.send('Hello, Server!')")
print("response = client.receive()")
print("print(response)")
print("client.close()\n")
print("--- 2. Chat Server ---")
print("# Server")
print("chat_server = ChatServer('localhost', 9999)")
print("chat_server.start()")
print("")
print("# Client")
print("chat_client = ChatClient('localhost', 9999, 'Alice')")
print("chat_client.connect()")
print("# Then send messages:")
print("chat_client.send_message('Hello everyone!')\n")
print("--- 3. Protocol Server (JSON-based) ---")
print("# Server")
print("protocol_server = ProtocolServer('localhost', 9999)")
print("protocol_server.start()")
print("")
print("# Client sends JSON messages:")
print('client.send(json.dumps({"type": "ping", "timestamp": 123456}))')
print('client.send(json.dumps({"type": "add", "a": 5, "b": 3}))')
print('client.send(json.dumps({"type": "echo", "data": "Hello"}))\n')
print("--- 4. File Transfer ---")
print("# Server")
print("file_server = FileTransferServer('localhost', 9999)")
print("file_server.start()")
print("")
print("# Client")
print("file_client = FileTransferClient('localhost', 9999)")
print("file_client.connect()")
print("file_client.send_file('example.txt')")
print("file_client.close()\n")
print("--- 5. UDP Server (for comparison) ---")
print("# Server")
print("udp_server = UdpServer('localhost', 9999)")
print("udp_server.start()")
print("")
print("# Client")
print("sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)")
print("sock.sendto(b'Hello UDP!', ('localhost', 9999))")
print("data, addr = sock.recvfrom(1024)")
print("print(data.decode())\n")
print("\n=== All TCP Socket Examples Listed ===")
# Export functions and classes
export { TcpServer, TcpClient, ChatServer, ChatClient, ProtocolServer, FileTransferServer, FileTransferClient, UdpServer }
export { demonstrate_tcp_socket }