Exemples de Réseau Web Python

Exemples de réseau Web Python incluant les requêtes HTTP, le serveur HTTP et la programmation de sockets TCP

Key Facts

Category
Python
Items
3
Format Families
sample

Sample Overview

Exemples de réseau Web Python incluant les requêtes HTTP, le serveur HTTP et la programmation de sockets TCP This sample set belongs to Python and can be used to test related workflows inside Elysia Tools.

💻 Requêtes HTTP python

🟡 intermediate ⭐⭐⭐

Envoyer des requêtes GET, POST, PUT, DELETE en utilisant la bibliothèque requests avec gestion des erreurs et analyse des réponses

⏱️ 25 min 🏷️ python, web, networking, http
Prerequisites: Intermediate Python, requests library
# Web Python HTTP Requests Examples
# Using requests library for HTTP communication with servers

# 1. Basic HTTP Request Methods
import requests
import json
from typing import Any, Dict, Optional

class HttpClient:
    """Simple HTTP client for making requests"""

    def __init__(self, base_url: str = '', default_headers: Optional[Dict[str, str]] = None):
        self.base_url = base_url
        self.default_headers = {
            'Content-Type': 'application/json',
            **(default_headers or {})
        }

    def get(self, url: str, headers: Optional[Dict[str, str]] = None, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Send GET request"""
        response = requests.get(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})},
            params=params
        )
        response.raise_for_status()
        return response.json()

    def post(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Send POST request"""
        response = requests.post(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})},
            json=data
        )
        response.raise_for_status()
        return response.json()

    def put(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Send PUT request"""
        response = requests.put(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})},
            json=data
        )
        response.raise_for_status()
        return response.json()

    def delete(self, url: str, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Send DELETE request"""
        response = requests.delete(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})}
        )
        response.raise_for_status()
        return response.json()

    def patch(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Send PATCH request"""
        response = requests.patch(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})},
            json=data
        )
        response.raise_for_status()
        return response.json()

# 2. Advanced HTTP Features
class AdvancedHttpClient:
    """Advanced HTTP client with additional features"""

    def get_with_params(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """Send GET request with query parameters"""
        response = requests.get(url, params=params)
        response.raise_for_status()
        return response.json()

    def get_with_timeout(self, url: str, timeout: int = 5) -> Dict[str, Any]:
        """Send GET request with timeout"""
        response = requests.get(url, timeout=timeout)
        response.raise_for_status()
        return response.json()

    def post_with_files(self, url: str, files: Dict[str, Any], data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Upload files via POST request"""
        response = requests.post(url, files=files, data=data)
        response.raise_for_status()
        return response.json()

    def download_file(self, url: str, filename: str) -> None:
        """Download file from URL"""
        response = requests.get(url, stream=True)
        response.raise_for_status()

        with open(filename, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)

    def download_file_with_progress(self, url: str, filename: str) -> None:
        """Download file with progress indicator"""
        response = requests.get(url, stream=True)
        response.raise_for_status()

        total_size = int(response.headers.get('content-length', 0))
        downloaded = 0

        with open(filename, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)
                    if total_size > 0:
                        progress = (downloaded / total_size) * 100
                        print(f"\rProgress: {progress:.1f}%", end='')
        print()  # New line after progress

    def get_with_auth(self, url: str, username: str, password: str) -> Dict[str, Any]:
        """Send GET request with basic authentication"""
        response = requests.get(url, auth=(username, password))
        response.raise_for_status()
        return response.json()

    def get_with_bearer_token(self, url: str, token: str) -> Dict[str, Any]:
        """Send GET request with Bearer token"""
        headers = {'Authorization': f'Bearer {token}'}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response.json()

# 3. Request with Session and Cookies
class SessionClient:
    """HTTP client with session management"""

    def __init__(self):
        self.session = requests.Session()

    def get(self, url: str, **kwargs) -> Dict[str, Any]:
        """Send GET request using session"""
        response = self.session.get(url, **kwargs)
        response.raise_for_status()
        return response.json()

    def post(self, url: str, **kwargs) -> Dict[str, Any]:
        """Send POST request using session"""
        response = self.session.post(url, **kwargs)
        response.raise_for_status()
        return response.json()

    def set_cookie(self, name: str, value: str):
        """Set cookie in session"""
        self.session.cookies.set(name, value)

    def get_cookies(self) -> Dict[str, str]:
        """Get all cookies from session"""
        return dict(self.session.cookies)

    def close(self):
        """Close session"""
        self.session.close()

# 4. Retry Logic
class RetryClient:
    """HTTP client with automatic retry logic"""

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay

    def get_with_retry(self, url: str, **kwargs) -> Dict[str, Any]:
        """Send GET request with retry on failure"""
        import time

        last_error = None
        for attempt in range(self.max_retries + 1):
            try:
                response = requests.get(url, **kwargs)
                response.raise_for_status()
                return response.json()
            except requests.RequestException as e:
                last_error = e
                if attempt < self.max_retries:
                    delay = self.base_delay * (2 ** attempt)
                    print(f"Retry {attempt + 1}/{self.max_retries} after {delay}s")
                    time.sleep(delay)

        raise last_error

# 5. Response Handling
def handle_response(response: requests.Response) -> Dict[str, Any]:
    """Handle different response types"""
    result = {
        'status_code': response.status_code,
        'headers': dict(response.headers),
        'success': response.ok
    }

    content_type = response.headers.get('Content-Type', '')

    if 'application/json' in content_type:
        result['data'] = response.json()
    elif 'text/' in content_type:
        result['data'] = response.text
    else:
        result['data'] = response.content

    return result

def check_status_code(status_code: int) -> str:
    """Get status code description"""
    status_codes = {
        200: 'OK',
        201: 'Created',
        204: 'No Content',
        400: 'Bad Request',
        401: 'Unauthorized',
        403: 'Forbidden',
        404: 'Not Found',
        500: 'Internal Server Error',
        502: 'Bad Gateway',
        503: 'Service Unavailable'
    }
    return status_codes.get(status_code, 'Unknown')

# 6. API Client Builder
class ApiClient:
    """Generic REST API client"""

    def __init__(self, base_url: str, auth_token: Optional[str] = None):
        self.base_url = base_url.rstrip('/')
        self.headers = {'Content-Type': 'application/json'}
        if auth_token:
            self.headers['Authorization'] = f'Bearer {auth_token}'

    def get_all(self, resource: str) -> list:
        """Get all items"""
        response = requests.get(f"{self.base_url}/{resource}", headers=self.headers)
        response.raise_for_status()
        return response.json()

    def get_by_id(self, resource: str, item_id: Any) -> Dict[str, Any]:
        """Get item by ID"""
        response = requests.get(f"{self.base_url}/{resource}/{item_id}", headers=self.headers)
        response.raise_for_status()
        return response.json()

    def create(self, resource: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Create new item"""
        response = requests.post(
            f"{self.base_url}/{resource}",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        return response.json()

    def update(self, resource: str, item_id: Any, data: Dict[str, Any]) -> Dict[str, Any]:
        """Update item"""
        response = requests.put(
            f"{self.base_url}/{resource}/{item_id}",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        return response.json()

    def partial_update(self, resource: str, item_id: Any, data: Dict[str, Any]) -> Dict[str, Any]:
        """Partial update item"""
        response = requests.patch(
            f"{self.base_url}/{resource}/{item_id}",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        return response.json()

    def delete(self, resource: str, item_id: Any) -> Dict[str, Any]:
        """Delete item"""
        response = requests.delete(
            f"{self.base_url}/{resource}/{item_id}",
            headers=self.headers
        )
        response.raise_for_status()
        return response.json()

# 7. Async HTTP Client (using aiohttp)
async def async_get_request(url: str) -> Dict[str, Any]:
    """Async GET request using aiohttp"""
    try:
        import aiohttp
        import asyncio

        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
    except ImportError:
        print("aiohttp not installed. Install with: pip install aiohttp")
        return {}

# 8. Request Logging
class LoggedHttpClient:
    """HTTP client with request logging"""

    def __init__(self):
        self.request_log = []

    def get(self, url: str, **kwargs) -> Dict[str, Any]:
        """Send GET request with logging"""
        import time

        start_time = time.time()
        response = requests.get(url, **kwargs)
        duration = time.time() - start_time

        log_entry = {
            'method': 'GET',
            'url': url,
            'status_code': response.status_code,
            'duration': duration,
            'success': response.ok
        }

        self.request_log.append(log_entry)
        response.raise_for_status()
        return response.json()

    def get_logs(self) -> list:
        """Get all request logs"""
        return self.request_log

    def get_stats(self) -> Dict[str, Any]:
        """Get request statistics"""
        if not self.request_log:
            return {}

        total = len(self.request_log)
        successful = sum(1 for log in self.request_log if log['success'])
        avg_duration = sum(log['duration'] for log in self.request_log) / total

        return {
            'total_requests': total,
            'successful': successful,
            'failed': total - successful,
            'avg_duration': avg_duration
        }

# Usage Examples
def demonstrate_http_requests():
    print("=== Web Python HTTP Requests Examples ===\n")

    # 1. Basic HTTP methods
    print("--- 1. Basic HTTP Methods ---")
    client = HttpClient('https://jsonplaceholder.typicode.com')

    try:
        posts = client.get('/posts')
        print(f"GET /posts: Retrieved {len(posts)} posts")

        new_post = client.post('/posts', {
            'title': 'Test Post',
            'body': 'This is a test post',
            'userId': 1
        })
        print(f"POST /posts: Created post with ID {new_post.get('id')}")
    except requests.RequestException as e:
        print(f"Error: {e}")

    # 2. Advanced features
    print("\n--- 2. Advanced HTTP Features ---")
    advanced_client = AdvancedHttpClient()

    try:
        filtered_posts = advanced_client.get_with_params(
            'https://jsonplaceholder.typicode.com/posts',
            {'userId': 1, '_limit': 5}
        )
        print(f"Filtered posts: {len(filtered_posts)} results")
    except requests.RequestException as e:
        print(f"Error: {e}")

    # 3. Session management
    print("\n--- 3. Session Management ---")
    session_client = SessionClient()

    try:
        session_client.set_cookie('session_id', 'abc123')
        data = session_client.get('https://jsonplaceholder.typicode.com/posts/1')
        print(f"Session cookies: {session_client.get_cookies()}")
    except requests.RequestException as e:
        print(f"Error: {e}")

    # 4. Retry logic
    print("\n--- 4. Retry Logic ---")
    retry_client = RetryClient(max_retries=3)

    try:
        result = retry_client.get_with_retry('https://jsonplaceholder.typicode.com/posts/1')
        print(f"Retry successful: {result.get('title')}")
    except requests.RequestException as e:
        print(f"Error after retries: {e}")

    # 5. REST API client
    print("\n--- 5. REST API Client ---")
    api_client = ApiClient('https://jsonplaceholder.typicode.com')

    try:
        todos = api_client.get_all('todos')
        print(f"GET /todos: {len(todos)} items")

        todo = api_client.get_by_id('todos', 1)
        print(f"GET /todos/1: {todo.get('title')}")
    except requests.RequestException as e:
        print(f"Error: {e}")

    # 6. Request logging
    print("\n--- 6. Request Logging ---")
    logged_client = LoggedHttpClient()

    try:
        logged_client.get('https://jsonplaceholder.typicode.com/posts')
        logged_client.get('https://jsonplaceholder.typicode.com/users')

        stats = logged_client.get_stats()
        print(f"Request stats: {stats}")
    except requests.RequestException as e:
        print(f"Error: {e}")

    print("\n=== All HTTP Requests Examples Completed ===")

# Export functions
# export { HttpClient, AdvancedHttpClient, SessionClient, RetryClient, ApiClient, LoggedHttpClient }
# export { handle_response, check_status_code, async_get_request, demonstrate_http_requests }

💻 Serveur HTTP python

🟡 intermediate ⭐⭐⭐

Créer un serveur HTTP simple en utilisant le module http.server avec routage, gestion des requêtes et service de fichiers statiques

⏱️ 30 min 🏷️ python, web, networking, server
Prerequisites: Intermediate Python, http.server module
# Web Python HTTP Server Examples
# Creating HTTP servers using http.server module

# 1. Basic HTTP Server
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
from typing import Dict, Any, Optional
import os
import mimetypes
import urllib.parse

class BasicHttpRequestHandler(BaseHTTPRequestHandler):
    """Basic HTTP request handler"""

    def do_GET(self):
        """Handle GET requests"""
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        response = "<html><body><h1>Hello, World!</h1></body></html>"
        self.wfile.write(response.encode())

    def do_POST(self):
        """Handle POST requests"""
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        response = {'received': post_data.decode(), 'status': 'success'}
        self.wfile.write(json.dumps(response).encode())

    def log_message(self, format: str, *args):
        """Custom log message"""
        print(f"[{self.log_date_time_string()}] {format % args}")

# 2. REST API Server
class RestApiHandler(BaseHTTPRequestHandler):
    """REST API request handler with routing"""

    # Simple in-memory database
    database: Dict[str, Dict[str, Any]] = {}

    def do_GET(self):
        """Handle GET requests"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path

        if path == '/items':
            self._get_all_items()
        elif path.startswith('/items/'):
            item_id = path.split('/')[-1]
            self._get_item(item_id)
        else:
            self._send_error(404, 'Not Found')

    def do_POST(self):
        """Handle POST requests"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path

        if path == '/items':
            self._create_item()
        else:
            self._send_error(404, 'Not Found')

    def do_PUT(self):
        """Handle PUT requests"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path

        if path.startswith('/items/'):
            item_id = path.split('/')[-1]
            self._update_item(item_id)
        else:
            self._send_error(404, 'Not Found')

    def do_DELETE(self):
        """Handle DELETE requests"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path

        if path.startswith('/items/'):
            item_id = path.split('/')[-1]
            self._delete_item(item_id)
        else:
            self._send_error(404, 'Not Found')

    def _get_all_items(self):
        """Get all items"""
        self._send_json_response(list(RestApiHandler.database.values()))

    def _get_item(self, item_id: str):
        """Get specific item"""
        if item_id in RestApiHandler.database:
            self._send_json_response(RestApiHandler.database[item_id])
        else:
            self._send_error(404, 'Item not found')

    def _create_item(self):
        """Create new item"""
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        try:
            data = json.loads(post_data.decode())
            item_id = str(len(RestApiHandler.database) + 1)
            data['id'] = item_id
            RestApiHandler.database[item_id] = data
            self._send_json_response(data, 201)
        except json.JSONDecodeError:
            self._send_error(400, 'Invalid JSON')

    def _update_item(self, item_id: str):
        """Update item"""
        if item_id not in RestApiHandler.database:
            self._send_error(404, 'Item not found')
            return

        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        try:
            data = json.loads(post_data.decode())
            RestApiHandler.database[item_id].update(data)
            self._send_json_response(RestApiHandler.database[item_id])
        except json.JSONDecodeError:
            self._send_error(400, 'Invalid JSON')

    def _delete_item(self, item_id: str):
        """Delete item"""
        if item_id in RestApiHandler.database:
            del RestApiHandler.database[item_id]
            self._send_json_response({'message': 'Item deleted'})
        else:
            self._send_error(404, 'Item not found')

    def _send_json_response(self, data: Any, status_code: int = 200):
        """Send JSON response"""
        self.send_response(status_code)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps(data).encode())

    def _send_error(self, status_code: int, message: str):
        """Send error response"""
        self._send_json_response({'error': message}, status_code)

    def log_message(self, format: str, *args):
        """Suppress default logging"""
        pass

# 3. Static File Server
class StaticFileHandler(BaseHTTPRequestHandler):
    """Handler for serving static files"""

    def __init__(self, *args, root_dir: str = '.', **kwargs):
        self.root_dir = root_dir
        super().__init__(*args, **kwargs)

    def do_GET(self):
        """Handle GET requests for static files"""
        parsed_path = urllib.parse.urlparse(self.path)
        file_path = os.path.join(self.root_dir, parsed_path.path.lstrip('/'))

        # Default to index.html
        if os.path.isdir(file_path):
            file_path = os.path.join(file_path, 'index.html')

        # Security check
        if not os.path.abspath(file_path).startswith(os.path.abspath(self.root_dir)):
            self._send_error(403, 'Forbidden')
            return

        if os.path.exists(file_path) and os.path.isfile(file_path):
            self._serve_file(file_path)
        else:
            self._send_error(404, 'File not found')

    def _serve_file(self, file_path: str):
        """Serve file content"""
        content_type, _ = mimetypes.guess_type(file_path)
        if content_type is None:
            content_type = 'application/octet-stream'

        self.send_response(200)
        self.send_header('Content-type', content_type)

        # Add CORS headers
        self.send_header('Access-Control-Allow-Origin', '*')

        # Add cache control
        self.send_header('Cache-Control', 'max-age=3600')

        file_size = os.path.getsize(file_path)
        self.send_header('Content-Length', str(file_size))

        self.end_headers()

        with open(file_path, 'rb') as f:
            self.wfile.write(f.read())

    def _send_error(self, status_code: int, message: str):
        """Send error response"""
        self.send_response(status_code)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write(f"<html><body><h1>{status_code} - {message}</h1></body></html>".encode())

    def log_message(self, format: str, *args):
        """Custom logging"""
        pass

# 4. Multi-threaded Server
class ThreadedHTTPServer(HTTPServer):
    """Multi-threaded HTTP server"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

def start_multi_threaded_server(port: int = 8000):
    """Start multi-threaded server"""
    from threading import Thread

    class ThreadedRequestHandler(BaseHTTPRequestHandler):
        def do_GET(self):
            """Handle GET request"""
            import threading
            thread_id = threading.get_ident()

            self.send_response(200)
            self.send_header('Content-type', 'text/html')
            self.end_headers()

            response = f"<html><body><h1>Hello from thread {thread_id}!</h1></body></html>"
            self.wfile.write(response.encode())

        def log_message(self, format: str, *args):
            pass

    server = ThreadedHTTPServer(('', port), ThreadedRequestHandler)
    print(f"Multi-threaded server running on port {port}")
    server.serve_forever()

# 5. Server with Middleware
class MiddlewareHandler(BaseHTTPRequestHandler):
    """Request handler with middleware support"""

    middleware_chain = []

    @classmethod
    def add_middleware(cls, middleware_func):
        """Add middleware to chain"""
        cls.middleware_chain.append(middleware_func)

    def handle_one_request(self):
        """Override to add middleware"""
        # Run middleware before handling request
        for middleware in self.middleware_chain:
            middleware(self)

        super().handle_one_request()

    def do_GET(self):
        """Handle GET request"""
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        response = {'message': 'Hello with middleware!'}
        self.wfile.write(json.dumps(response).encode())

    def log_message(self, format: str, *args):
        pass

# Middleware functions
def logging_middleware(handler):
    """Log all requests"""
    print(f"[LOG] {handler.command} {handler.path}")

def cors_middleware(handler):
    """Add CORS headers"""
    def send_response(code, message=None):
        handler.send_response(code, message)
        handler.send_header('Access-Control-Allow-Origin', '*')
        handler.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
        handler.send_header('Access-Control-Allow-Headers', 'Content-Type')

    # Store original send_response
    handler._original_send_response = handler.send_response
    handler.send_response = send_response

def auth_middleware(handler):
    """Simple authentication middleware"""
    auth_header = handler.headers.get('Authorization')

    if not auth_header or not auth_header.startswith('Bearer '):
        handler.send_response(401)
        handler.end_headers()
        handler.wfile.write(b'Unauthorized')
        return False

    return True

# 6. WebSocket-like Server (using Server-Sent Events)
class SSEHandler(BaseHTTPRequestHandler):
    """Server-Sent Events handler"""

    def do_GET(self):
        """Handle SSE connection"""
        if self.path == '/events':
            self.send_response(200)
            self.send_header('Content-type', 'text/event-stream')
            self.send_header('Cache-Control', 'no-cache')
            self.send_header('Connection', 'keep-alive')
            self.end_headers()

            # Send events
            import time
            for i in range(5):
                event_data = f"data: Message {i}\n\n"
                self.wfile.write(event_data.encode())
                self.wfile.flush()
                time.sleep(1)

    def log_message(self, format: str, *args):
        pass

# 7. Server with Request Logging
class LoggedRequestHandler(BaseHTTPRequestHandler):
    """Request handler with detailed logging"""

    def log_request(self, code: int = '-', size: int = '-'):
        """Log request details"""
        user_agent = self.headers.get('User-Agent', 'Unknown')
        print(f"[{self.log_date_time_string()}] {self.client_address[0]} - {self.command} {self.path} - {code} - {user_agent}")

    def do_GET(self):
        """Handle GET request"""
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        response = {
            'path': self.path,
            'method': self.command,
            'headers': dict(self.headers)
        }
        self.wfile.write(json.dumps(response, indent=2).encode())

    def do_POST(self):
        """Handle POST request"""
        content_length = int(self.headers.get('Content-Length', 0))
        post_data = self.rfile.read(content_length) if content_length > 0 else b''

        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        response = {
            'path': self.path,
            'method': self.command,
            'data': post_data.decode()
        }
        self.wfile.write(json.dumps(response, indent=2).encode())

# Server Management Functions
def start_basic_server(port: int = 8000):
    """Start basic HTTP server"""
    server = HTTPServer(('', port), BasicHttpRequestHandler)
    print(f"Basic server running on http://localhost:{port}")
    server.serve_forever()

def start_rest_api_server(port: int = 8000):
    """Start REST API server"""
    server = HTTPServer(('', port), RestApiHandler)
    print(f"REST API server running on http://localhost:{port}")
    server.serve_forever()

def start_static_server(port: int = 8000, directory: str = '.'):
    """Start static file server"""
    def handler(*args, **kwargs):
        StaticFileHandler(*args, root_dir=directory, **kwargs)

    server = HTTPServer(('', port), handler)
    print(f"Static file server running on http://localhost:{port}")
    print(f"Serving directory: {os.path.abspath(directory)}")
    server.serve_forever()

def start_middleware_server(port: int = 8000):
    """Start server with middleware"""
    MiddlewareHandler.add_middleware(logging_middleware)

    server = HTTPServer(('', port), MiddlewareHandler)
    print(f"Server with middleware running on http://localhost:{port}")
    server.serve_forever()

# Usage Examples
def demonstrate_http_server():
    print("=== Web Python HTTP Server Examples ===\n")
    print("Note: Server examples require running the server in a separate process")
    print("Choose one of the following servers to start:\n")

    print("--- 1. Basic Server ---")
    print("start_basic_server(8000)")
    print("Serves simple HTML page\n")

    print("--- 2. REST API Server ---")
    print("start_rest_api_server(8000)")
    print("Endpoints:")
    print("  GET    /items      - Get all items")
    print("  GET    /items/{id} - Get item by ID")
    print("  POST   /items      - Create new item")
    print("  PUT    /items/{id} - Update item")
    print("  DELETE /items/{id} - Delete item\n")

    print("--- 3. Static File Server ---")
    print("start_static_server(8000, './public')")
    print("Serves static files from directory\n")

    print("--- 4. Multi-threaded Server ---")
    print("start_multi_threaded_server(8000)")
    print("Handles multiple requests concurrently\n")

    print("--- 5. Server with Middleware ---")
    print("start_middleware_server(8000)")
    print("Includes logging and CORS middleware\n")

    print("--- Example Client Usage ---")
    print("# Using requests library")
    print("import requests")
    print("")
    print("# Get all items")
    print("response = requests.get('http://localhost:8000/items')")
    print("print(response.json())")
    print("")
    print("# Create new item")
    print("response = requests.post('http://localhost:8000/items',")
    print("    json={'name': 'Test Item', 'value': 123})")
    print("print(response.json())")

    print("\n=== All HTTP Server Examples Listed ===")

# Export functions and classes
# export { BasicHttpRequestHandler, RestApiHandler, StaticFileHandler, SSEHandler, LoggedRequestHandler }
# export { start_basic_server, start_rest_api_server, start_static_server, start_multi_threaded_server, start_middleware_server }
# export { logging_middleware, cors_middleware, auth_middleware, demonstrate_http_server }

💻 Socket TCP python

🔴 complex ⭐⭐⭐⭐

Créer un client et un serveur TCP en utilisant le module socket pour communication bidirectionnelle

⏱️ 35 min 🏷️ python, web, networking, socket, tcp
Prerequisites: Advanced Python, socket module, threading
# Web Python TCP Socket Examples
# Creating TCP clients and servers using socket module

# 1. Basic TCP Server
import socket
import threading
import json
from typing import Callable, Optional

class TcpServer:
    """Basic TCP server"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start the TCP server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True

        print(f"TCP server listening on {self.host}:{self.port}")

        try:
            while self.running:
                client_socket, address = self.server_socket.accept()
                print(f"New connection from {address}")
                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket, address)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\nServer shutting down...")
        finally:
            self.stop()

    def _handle_client(self, client_socket: socket.socket, address: tuple):
        """Handle client connection"""
        try:
            while self.running:
                data = client_socket.recv(1024)
                if not data:
                    break

                message = data.decode('utf-8')
                print(f"Received from {address}: {message}")

                # Echo back
                response = f"Echo: {message}"
                client_socket.send(response.encode('utf-8'))
        except Exception as e:
            print(f"Error handling client {address}: {e}")
        finally:
            client_socket.close()
            print(f"Connection closed: {address}")

    def stop(self):
        """Stop the server"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()

# 2. Basic TCP Client
class TcpClient:
    """Basic TCP client"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.client_socket: Optional[socket.socket] = None

    def connect(self):
        """Connect to server"""
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((self.host, self.port))
        print(f"Connected to {self.host}:{self.port}")

    def send(self, message: str):
        """Send message to server"""
        if self.client_socket:
            self.client_socket.send(message.encode('utf-8'))

    def receive(self, buffer_size: int = 1024) -> str:
        """Receive message from server"""
        if self.client_socket:
            data = self.client_socket.recv(buffer_size)
            return data.decode('utf-8')
        return ''

    def close(self):
        """Close connection"""
        if self.client_socket:
            self.client_socket.close()

# 3. Chat Server
class ChatServer:
    """Multi-client chat server"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.clients: list[socket.socket] = []
        self.nicknames: dict[socket.socket, str] = {}
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start chat server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True

        print(f"Chat server listening on {self.host}:{self.port}")

        try:
            while self.running:
                client_socket, address = self.server_socket.accept()
                print(f"New connection from {address}")
                self.clients.append(client_socket)

                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket,)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\nChat server shutting down...")
        finally:
            self.stop()

    def broadcast(self, message: str, sender_socket: Optional[socket.socket] = None):
        """Broadcast message to all clients"""
        for client in self.clients:
            if client != sender_socket:
                try:
                    client.send(message.encode('utf-8'))
                except:
                    self.clients.remove(client)

    def _handle_client(self, client_socket: socket.socket):
        """Handle client communication"""
        try:
            # Request nickname
            client_socket.send("NICK".encode('utf-8'))
            nickname = client_socket.recv(1024).decode('utf-8')
            self.nicknames[client_socket] = nickname

            self.broadcast(f"{nickname} joined the chat!")

            while self.running:
                message = client_socket.recv(1024).decode('utf-8')
                if message:
                    self.broadcast(f"{nickname}: {message}", client_socket)
                else:
                    break
        except Exception as e:
            print(f"Error handling client: {e}")
        finally:
            if client_socket in self.clients:
                nickname = self.nicknames.get(client_socket, 'Unknown')
                self.clients.remove(client_socket)
                self.broadcast(f"{nickname} left the chat!")
                client_socket.close()

    def stop(self):
        """Stop chat server"""
        self.running = False
        for client in self.clients:
            client.close()
        if self.server_socket:
            self.server_socket.close()

# 4. Chat Client
class ChatClient:
    """Chat client with interactive interface"""

    def __init__(self, host: str = 'localhost', port: int = 9999, nickname: str = 'Anonymous'):
        self.host = host
        self.port = port
        self.nickname = nickname
        self.client_socket: Optional[socket.socket] = None
        self.running = False

    def connect(self):
        """Connect to chat server"""
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((self.host, self.port))

        # Send nickname
        self.client_socket.send(self.nickname.encode('utf-8'))
        self.running = True

        # Start receive thread
        receive_thread = threading.Thread(target=self._receive_messages)
        receive_thread.daemon = True
        receive_thread.start()

    def _receive_messages(self):
        """Receive messages from server"""
        while self.running:
            try:
                message = self.client_socket.recv(1024).decode('utf-8')
                if message:
                    print(f"\r{message}")
                    print(f"{self.nickname}: ", end='', flush=True)
            except:
                break

    def send_message(self, message: str):
        """Send message to server"""
        if self.client_socket:
            self.client_socket.send(message.encode('utf-8'))

    def close(self):
        """Close connection"""
        self.running = False
        if self.client_socket:
            self.client_socket.close()

# 5. Message Protocol Server (JSON-based)
class ProtocolServer:
    """Server using JSON message protocol"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start protocol server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True

        print(f"Protocol server listening on {self.host}:{self.port}")

        try:
            while self.running:
                client_socket, address = self.server_socket.accept()
                print(f"New connection from {address}")

                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket, address)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\nServer shutting down...")
        finally:
            self.stop()

    def _handle_client(self, client_socket: socket.socket, address: tuple):
        """Handle client with protocol"""
        try:
            while self.running:
                data = client_socket.recv(4096)
                if not data:
                    break

                try:
                    message = json.loads(data.decode('utf-8'))
                    response = self._process_message(message)
                    client_socket.send(json.dumps(response).encode('utf-8'))
                except json.JSONDecodeError:
                    error_response = {'status': 'error', 'message': 'Invalid JSON'}
                    client_socket.send(json.dumps(error_response).encode('utf-8'))
        except Exception as e:
            print(f"Error handling client {address}: {e}")
        finally:
            client_socket.close()

    def _process_message(self, message: dict) -> dict:
        """Process incoming message"""
        msg_type = message.get('type')

        if msg_type == 'ping':
            return {'type': 'pong', 'timestamp': message.get('timestamp')}
        elif msg_type == 'echo':
            return {'type': 'echo_response', 'data': message.get('data')}
        elif msg_type == 'add':
            a = message.get('a', 0)
            b = message.get('b', 0)
            return {'type': 'result', 'operation': 'add', 'result': a + b}
        elif msg_type == 'get_time':
            from datetime import datetime
            return {'type': 'time', 'timestamp': datetime.now().isoformat()}
        else:
            return {'type': 'error', 'message': 'Unknown message type'}

    def stop(self):
        """Stop server"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()

# 6. File Transfer Server
class FileTransferServer:
    """Server for file transfer"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start file transfer server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True

        print(f"File transfer server listening on {self.host}:{self.port}")

        try:
            while self.running:
                client_socket, address = self.server_socket.accept()
                print(f"New connection from {address}")

                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket, address)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\nServer shutting down...")
        finally:
            self.stop()

    def _handle_client(self, client_socket: socket.socket, address: tuple):
        """Handle file transfer"""
        try:
            # Receive file info
            file_info = client_socket.recv(1024).decode('utf-8')
            filename, filesize = file_info.split('|')
            filesize = int(filesize)

            print(f"Receiving file: {filename} ({filesize} bytes)")

            # Receive file data
            received = 0
            with open(filename, 'wb') as f:
                while received < filesize:
                    data = client_socket.recv(4096)
                    if not data:
                        break
                    f.write(data)
                    received += len(data)
                    progress = (received / filesize) * 100
                    print(f"\rProgress: {progress:.1f}%", end='')

            print(f"\nFile received successfully!")
            client_socket.send(b"ACK")

        except Exception as e:
            print(f"Error: {e}")
        finally:
            client_socket.close()

    def stop(self):
        """Stop server"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()

# 7. File Transfer Client
class FileTransferClient:
    """Client for file transfer"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.client_socket: Optional[socket.socket] = None

    def connect(self):
        """Connect to server"""
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((self.host, self.port))
        print(f"Connected to {self.host}:{self.port}")

    def send_file(self, filename: str):
        """Send file to server"""
        import os

        if not os.path.exists(filename):
            print(f"File not found: {filename}")
            return

        filesize = os.path.getsize(filename)

        # Send file info
        file_info = f"{filename}|{filesize}"
        self.client_socket.send(file_info.encode('utf-8'))

        # Send file data
        sent = 0
        with open(filename, 'rb') as f:
            while sent < filesize:
                data = f.read(4096)
                self.client_socket.send(data)
                sent += len(data)
                progress = (sent / filesize) * 100
                print(f"\rProgress: {progress:.1f}%", end='')

        print(f"\nFile sent successfully!")

        # Wait for acknowledgment
        ack = self.client_socket.recv(1024)
        print(f"Server response: {ack.decode('utf-8')}")

    def close(self):
        """Close connection"""
        if self.client_socket:
            self.client_socket.close()

# 8. UDP Socket (for comparison)
class UdpServer:
    """Basic UDP server"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start UDP server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.server_socket.bind((self.host, self.port))
        self.running = True

        print(f"UDP server listening on {self.host}:{self.port}")

        try:
            while self.running:
                data, address = self.server_socket.recvfrom(1024)
                message = data.decode('utf-8')
                print(f"Received from {address}: {message}")

                response = f"Echo: {message}"
                self.server_socket.sendto(response.encode('utf-8'), address)
        except KeyboardInterrupt:
            print("\nUDP server shutting down...")
        finally:
            self.stop()

    def stop(self):
        """Stop UDP server"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()

# Usage Examples
def demonstrate_tcp_socket():
    print("=== Web Python TCP Socket Examples ===\n")
    print("Note: Socket examples require running server and client in separate processes")
    print("Choose one of the following examples:\n")

    print("--- 1. Basic TCP Server ---")
    print("# Server")
    print("server = TcpServer('localhost', 9999)")
    print("server.start()")
    print("")
    print("# Client")
    print("client = TcpClient('localhost', 9999)")
    print("client.connect()")
    print("client.send('Hello, Server!')")
    print("response = client.receive()")
    print("print(response)")
    print("client.close()\n")

    print("--- 2. Chat Server ---")
    print("# Server")
    print("chat_server = ChatServer('localhost', 9999)")
    print("chat_server.start()")
    print("")
    print("# Client")
    print("chat_client = ChatClient('localhost', 9999, 'Alice')")
    print("chat_client.connect()")
    print("# Then send messages:")
    print("chat_client.send_message('Hello everyone!')\n")

    print("--- 3. Protocol Server (JSON-based) ---")
    print("# Server")
    print("protocol_server = ProtocolServer('localhost', 9999)")
    print("protocol_server.start()")
    print("")
    print("# Client sends JSON messages:")
    print('client.send(json.dumps({"type": "ping", "timestamp": 123456}))')
    print('client.send(json.dumps({"type": "add", "a": 5, "b": 3}))')
    print('client.send(json.dumps({"type": "echo", "data": "Hello"}))\n')

    print("--- 4. File Transfer ---")
    print("# Server")
    print("file_server = FileTransferServer('localhost', 9999)")
    print("file_server.start()")
    print("")
    print("# Client")
    print("file_client = FileTransferClient('localhost', 9999)")
    print("file_client.connect()")
    print("file_client.send_file('example.txt')")
    print("file_client.close()\n")

    print("--- 5. UDP Server (for comparison) ---")
    print("# Server")
    print("udp_server = UdpServer('localhost', 9999)")
    print("udp_server.start()")
    print("")
    print("# Client")
    print("sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)")
    print("sock.sendto(b'Hello UDP!', ('localhost', 9999))")
    print("data, addr = sock.recvfrom(1024)")
    print("print(data.decode())\n")

    print("\n=== All TCP Socket Examples Listed ===")

# Export functions and classes
# export { TcpServer, TcpClient, ChatServer, ChatClient, ProtocolServer, FileTransferServer, FileTransferClient, UdpServer }
# export { demonstrate_tcp_socket }