Web Networking Python Samples

Web Python networking examples including HTTP requests, HTTP server, and TCP socket programming

💻 HTTP Requests python

🟡 intermediate ⭐⭐⭐

Send GET, POST, PUT, DELETE requests using requests library with error handling and response parsing

⏱️ 25 min 🏷️ python, web, networking, http
Prerequisites: Intermediate Python, requests library
# Web Python HTTP Requests Examples
# Using requests library for HTTP communication with servers

# 1. Basic HTTP Request Methods
import requests
import json
from typing import Any, Dict, Optional

class HttpClient:
    """Simple HTTP client for making requests"""

    def __init__(self, base_url: str = '', default_headers: Optional[Dict[str, str]] = None):
        self.base_url = base_url
        self.default_headers = {
            'Content-Type': 'application/json',
            **(default_headers or {})
        }

    def get(self, url: str, headers: Optional[Dict[str, str]] = None, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Send GET request"""
        response = requests.get(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})},
            params=params
        )
        response.raise_for_status()
        return response.json()

    def post(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Send POST request"""
        response = requests.post(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})},
            json=data
        )
        response.raise_for_status()
        return response.json()

    def put(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Send PUT request"""
        response = requests.put(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})},
            json=data
        )
        response.raise_for_status()
        return response.json()

    def delete(self, url: str, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Send DELETE request"""
        response = requests.delete(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})}
        )
        response.raise_for_status()
        return response.json()

    def patch(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """Send PATCH request"""
        response = requests.patch(
            f"{self.base_url}{url}",
            headers={**self.default_headers, **(headers or {})},
            json=data
        )
        response.raise_for_status()
        return response.json()

# 2. Advanced HTTP Features
class AdvancedHttpClient:
    """Advanced HTTP client with additional features"""

    def get_with_params(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """Send GET request with query parameters"""
        response = requests.get(url, params=params)
        response.raise_for_status()
        return response.json()

    def get_with_timeout(self, url: str, timeout: int = 5) -> Dict[str, Any]:
        """Send GET request with timeout"""
        response = requests.get(url, timeout=timeout)
        response.raise_for_status()
        return response.json()

    def post_with_files(self, url: str, files: Dict[str, Any], data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Upload files via POST request"""
        response = requests.post(url, files=files, data=data)
        response.raise_for_status()
        return response.json()

    def download_file(self, url: str, filename: str) -> None:
        """Download file from URL"""
        response = requests.get(url, stream=True)
        response.raise_for_status()

        with open(filename, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)

    def download_file_with_progress(self, url: str, filename: str) -> None:
        """Download file with progress indicator"""
        response = requests.get(url, stream=True)
        response.raise_for_status()

        total_size = int(response.headers.get('content-length', 0))
        downloaded = 0

        with open(filename, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)
                    if total_size > 0:
                        progress = (downloaded / total_size) * 100
                        print(f"\rProgress: {progress:.1f}%", end='')
        print()  # New line after progress

    def get_with_auth(self, url: str, username: str, password: str) -> Dict[str, Any]:
        """Send GET request with basic authentication"""
        response = requests.get(url, auth=(username, password))
        response.raise_for_status()
        return response.json()

    def get_with_bearer_token(self, url: str, token: str) -> Dict[str, Any]:
        """Send GET request with Bearer token"""
        headers = {'Authorization': f'Bearer {token}'}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response.json()

# 3. Request with Session and Cookies
class SessionClient:
    """HTTP client with session management"""

    def __init__(self):
        self.session = requests.Session()

    def get(self, url: str, **kwargs) -> Dict[str, Any]:
        """Send GET request using session"""
        response = self.session.get(url, **kwargs)
        response.raise_for_status()
        return response.json()

    def post(self, url: str, **kwargs) -> Dict[str, Any]:
        """Send POST request using session"""
        response = self.session.post(url, **kwargs)
        response.raise_for_status()
        return response.json()

    def set_cookie(self, name: str, value: str):
        """Set cookie in session"""
        self.session.cookies.set(name, value)

    def get_cookies(self) -> Dict[str, str]:
        """Get all cookies from session"""
        return dict(self.session.cookies)

    def close(self):
        """Close session"""
        self.session.close()

# 4. Retry Logic
class RetryClient:
    """HTTP client with automatic retry logic"""

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay

    def get_with_retry(self, url: str, **kwargs) -> Dict[str, Any]:
        """Send GET request with retry on failure"""
        import time

        last_error = None
        for attempt in range(self.max_retries + 1):
            try:
                response = requests.get(url, **kwargs)
                response.raise_for_status()
                return response.json()
            except requests.RequestException as e:
                last_error = e
                if attempt < self.max_retries:
                    delay = self.base_delay * (2 ** attempt)
                    print(f"Retry {attempt + 1}/{self.max_retries} after {delay}s")
                    time.sleep(delay)

        raise last_error

# 5. Response Handling
def handle_response(response: requests.Response) -> Dict[str, Any]:
    """Handle different response types"""
    result = {
        'status_code': response.status_code,
        'headers': dict(response.headers),
        'success': response.ok
    }

    content_type = response.headers.get('Content-Type', '')

    if 'application/json' in content_type:
        result['data'] = response.json()
    elif 'text/' in content_type:
        result['data'] = response.text
    else:
        result['data'] = response.content

    return result

def check_status_code(status_code: int) -> str:
    """Get status code description"""
    status_codes = {
        200: 'OK',
        201: 'Created',
        204: 'No Content',
        400: 'Bad Request',
        401: 'Unauthorized',
        403: 'Forbidden',
        404: 'Not Found',
        500: 'Internal Server Error',
        502: 'Bad Gateway',
        503: 'Service Unavailable'
    }
    return status_codes.get(status_code, 'Unknown')

# 6. API Client Builder
class ApiClient:
    """Generic REST API client"""

    def __init__(self, base_url: str, auth_token: Optional[str] = None):
        self.base_url = base_url.rstrip('/')
        self.headers = {'Content-Type': 'application/json'}
        if auth_token:
            self.headers['Authorization'] = f'Bearer {auth_token}'

    def get_all(self, resource: str) -> list:
        """Get all items"""
        response = requests.get(f"{self.base_url}/{resource}", headers=self.headers)
        response.raise_for_status()
        return response.json()

    def get_by_id(self, resource: str, item_id: Any) -> Dict[str, Any]:
        """Get item by ID"""
        response = requests.get(f"{self.base_url}/{resource}/{item_id}", headers=self.headers)
        response.raise_for_status()
        return response.json()

    def create(self, resource: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Create new item"""
        response = requests.post(
            f"{self.base_url}/{resource}",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        return response.json()

    def update(self, resource: str, item_id: Any, data: Dict[str, Any]) -> Dict[str, Any]:
        """Update item"""
        response = requests.put(
            f"{self.base_url}/{resource}/{item_id}",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        return response.json()

    def partial_update(self, resource: str, item_id: Any, data: Dict[str, Any]) -> Dict[str, Any]:
        """Partial update item"""
        response = requests.patch(
            f"{self.base_url}/{resource}/{item_id}",
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        return response.json()

    def delete(self, resource: str, item_id: Any) -> Dict[str, Any]:
        """Delete item"""
        response = requests.delete(
            f"{self.base_url}/{resource}/{item_id}",
            headers=self.headers
        )
        response.raise_for_status()
        return response.json()

# 7. Async HTTP Client (using aiohttp)
async def async_get_request(url: str) -> Dict[str, Any]:
    """Async GET request using aiohttp"""
    try:
        import aiohttp
        import asyncio

        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
    except ImportError:
        print("aiohttp not installed. Install with: pip install aiohttp")
        return {}

# 8. Request Logging
class LoggedHttpClient:
    """HTTP client with request logging"""

    def __init__(self):
        self.request_log = []

    def get(self, url: str, **kwargs) -> Dict[str, Any]:
        """Send GET request with logging"""
        import time

        start_time = time.time()
        response = requests.get(url, **kwargs)
        duration = time.time() - start_time

        log_entry = {
            'method': 'GET',
            'url': url,
            'status_code': response.status_code,
            'duration': duration,
            'success': response.ok
        }

        self.request_log.append(log_entry)
        response.raise_for_status()
        return response.json()

    def get_logs(self) -> list:
        """Get all request logs"""
        return self.request_log

    def get_stats(self) -> Dict[str, Any]:
        """Get request statistics"""
        if not self.request_log:
            return {}

        total = len(self.request_log)
        successful = sum(1 for log in self.request_log if log['success'])
        avg_duration = sum(log['duration'] for log in self.request_log) / total

        return {
            'total_requests': total,
            'successful': successful,
            'failed': total - successful,
            'avg_duration': avg_duration
        }

# Usage Examples
def demonstrate_http_requests():
    print("=== Web Python HTTP Requests Examples ===\n")

    # 1. Basic HTTP methods
    print("--- 1. Basic HTTP Methods ---")
    client = HttpClient('https://jsonplaceholder.typicode.com')

    try:
        posts = client.get('/posts')
        print(f"GET /posts: Retrieved {len(posts)} posts")

        new_post = client.post('/posts', {
            'title': 'Test Post',
            'body': 'This is a test post',
            'userId': 1
        })
        print(f"POST /posts: Created post with ID {new_post.get('id')}")
    except requests.RequestException as e:
        print(f"Error: {e}")

    # 2. Advanced features
    print("\n--- 2. Advanced HTTP Features ---")
    advanced_client = AdvancedHttpClient()

    try:
        filtered_posts = advanced_client.get_with_params(
            'https://jsonplaceholder.typicode.com/posts',
            {'userId': 1, '_limit': 5}
        )
        print(f"Filtered posts: {len(filtered_posts)} results")
    except requests.RequestException as e:
        print(f"Error: {e}")

    # 3. Session management
    print("\n--- 3. Session Management ---")
    session_client = SessionClient()

    try:
        session_client.set_cookie('session_id', 'abc123')
        data = session_client.get('https://jsonplaceholder.typicode.com/posts/1')
        print(f"Session cookies: {session_client.get_cookies()}")
    except requests.RequestException as e:
        print(f"Error: {e}")

    # 4. Retry logic
    print("\n--- 4. Retry Logic ---")
    retry_client = RetryClient(max_retries=3)

    try:
        result = retry_client.get_with_retry('https://jsonplaceholder.typicode.com/posts/1')
        print(f"Retry successful: {result.get('title')}")
    except requests.RequestException as e:
        print(f"Error after retries: {e}")

    # 5. REST API client
    print("\n--- 5. REST API Client ---")
    api_client = ApiClient('https://jsonplaceholder.typicode.com')

    try:
        todos = api_client.get_all('todos')
        print(f"GET /todos: {len(todos)} items")

        todo = api_client.get_by_id('todos', 1)
        print(f"GET /todos/1: {todo.get('title')}")
    except requests.RequestException as e:
        print(f"Error: {e}")

    # 6. Request logging
    print("\n--- 6. Request Logging ---")
    logged_client = LoggedHttpClient()

    try:
        logged_client.get('https://jsonplaceholder.typicode.com/posts')
        logged_client.get('https://jsonplaceholder.typicode.com/users')

        stats = logged_client.get_stats()
        print(f"Request stats: {stats}")
    except requests.RequestException as e:
        print(f"Error: {e}")

    print("\n=== All HTTP Requests Examples Completed ===")

# Export functions
export { HttpClient, AdvancedHttpClient, SessionClient, RetryClient, ApiClient, LoggedHttpClient }
export { handle_response, check_status_code, async_get_request, demonstrate_http_requests }

💻 HTTP Server python

🟡 intermediate ⭐⭐⭐

Create simple HTTP server using http.server module with routing, request handling, and static file serving

⏱️ 30 min 🏷️ python, web, networking, server
Prerequisites: Intermediate Python, http.server module
# Web Python HTTP Server Examples
# Creating HTTP servers using http.server module

# 1. Basic HTTP Server
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
from typing import Dict, Any, Optional
import os
import mimetypes
import urllib.parse

class BasicHttpRequestHandler(BaseHTTPRequestHandler):
    """Basic HTTP request handler"""

    def do_GET(self):
        """Handle GET requests"""
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        response = "<html><body><h1>Hello, World!</h1></body></html>"
        self.wfile.write(response.encode())

    def do_POST(self):
        """Handle POST requests"""
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        response = {'received': post_data.decode(), 'status': 'success'}
        self.wfile.write(json.dumps(response).encode())

    def log_message(self, format: str, *args):
        """Custom log message"""
        print(f"[{self.log_date_time_string()}] {format % args}")

# 2. REST API Server
class RestApiHandler(BaseHTTPRequestHandler):
    """REST API request handler with routing"""

    # Simple in-memory database
    database: Dict[str, Dict[str, Any]] = {}

    def do_GET(self):
        """Handle GET requests"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path

        if path == '/items':
            self._get_all_items()
        elif path.startswith('/items/'):
            item_id = path.split('/')[-1]
            self._get_item(item_id)
        else:
            self._send_error(404, 'Not Found')

    def do_POST(self):
        """Handle POST requests"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path

        if path == '/items':
            self._create_item()
        else:
            self._send_error(404, 'Not Found')

    def do_PUT(self):
        """Handle PUT requests"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path

        if path.startswith('/items/'):
            item_id = path.split('/')[-1]
            self._update_item(item_id)
        else:
            self._send_error(404, 'Not Found')

    def do_DELETE(self):
        """Handle DELETE requests"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path

        if path.startswith('/items/'):
            item_id = path.split('/')[-1]
            self._delete_item(item_id)
        else:
            self._send_error(404, 'Not Found')

    def _get_all_items(self):
        """Get all items"""
        self._send_json_response(list(RestApiHandler.database.values()))

    def _get_item(self, item_id: str):
        """Get specific item"""
        if item_id in RestApiHandler.database:
            self._send_json_response(RestApiHandler.database[item_id])
        else:
            self._send_error(404, 'Item not found')

    def _create_item(self):
        """Create new item"""
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        try:
            data = json.loads(post_data.decode())
            item_id = str(len(RestApiHandler.database) + 1)
            data['id'] = item_id
            RestApiHandler.database[item_id] = data
            self._send_json_response(data, 201)
        except json.JSONDecodeError:
            self._send_error(400, 'Invalid JSON')

    def _update_item(self, item_id: str):
        """Update item"""
        if item_id not in RestApiHandler.database:
            self._send_error(404, 'Item not found')
            return

        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        try:
            data = json.loads(post_data.decode())
            RestApiHandler.database[item_id].update(data)
            self._send_json_response(RestApiHandler.database[item_id])
        except json.JSONDecodeError:
            self._send_error(400, 'Invalid JSON')

    def _delete_item(self, item_id: str):
        """Delete item"""
        if item_id in RestApiHandler.database:
            del RestApiHandler.database[item_id]
            self._send_json_response({'message': 'Item deleted'})
        else:
            self._send_error(404, 'Item not found')

    def _send_json_response(self, data: Any, status_code: int = 200):
        """Send JSON response"""
        self.send_response(status_code)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(json.dumps(data).encode())

    def _send_error(self, status_code: int, message: str):
        """Send error response"""
        self._send_json_response({'error': message}, status_code)

    def log_message(self, format: str, *args):
        """Suppress default logging"""
        pass

# 3. Static File Server
class StaticFileHandler(BaseHTTPRequestHandler):
    """Handler for serving static files"""

    def __init__(self, *args, root_dir: str = '.', **kwargs):
        self.root_dir = root_dir
        super().__init__(*args, **kwargs)

    def do_GET(self):
        """Handle GET requests for static files"""
        parsed_path = urllib.parse.urlparse(self.path)
        file_path = os.path.join(self.root_dir, parsed_path.path.lstrip('/'))

        # Default to index.html
        if os.path.isdir(file_path):
            file_path = os.path.join(file_path, 'index.html')

        # Security check
        if not os.path.abspath(file_path).startswith(os.path.abspath(self.root_dir)):
            self._send_error(403, 'Forbidden')
            return

        if os.path.exists(file_path) and os.path.isfile(file_path):
            self._serve_file(file_path)
        else:
            self._send_error(404, 'File not found')

    def _serve_file(self, file_path: str):
        """Serve file content"""
        content_type, _ = mimetypes.guess_type(file_path)
        if content_type is None:
            content_type = 'application/octet-stream'

        self.send_response(200)
        self.send_header('Content-type', content_type)

        # Add CORS headers
        self.send_header('Access-Control-Allow-Origin', '*')

        # Add cache control
        self.send_header('Cache-Control', 'max-age=3600')

        file_size = os.path.getsize(file_path)
        self.send_header('Content-Length', str(file_size))

        self.end_headers()

        with open(file_path, 'rb') as f:
            self.wfile.write(f.read())

    def _send_error(self, status_code: int, message: str):
        """Send error response"""
        self.send_response(status_code)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write(f"<html><body><h1>{status_code} - {message}</h1></body></html>".encode())

    def log_message(self, format: str, *args):
        """Custom logging"""
        pass

# 4. Multi-threaded Server
class ThreadedHTTPServer(HTTPServer):
    """Multi-threaded HTTP server"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

def start_multi_threaded_server(port: int = 8000):
    """Start multi-threaded server"""
    from threading import Thread

    class ThreadedRequestHandler(BaseHTTPRequestHandler):
        def do_GET(self):
            """Handle GET request"""
            import threading
            thread_id = threading.get_ident()

            self.send_response(200)
            self.send_header('Content-type', 'text/html')
            self.end_headers()

            response = f"<html><body><h1>Hello from thread {thread_id}!</h1></body></html>"
            self.wfile.write(response.encode())

        def log_message(self, format: str, *args):
            pass

    server = ThreadedHTTPServer(('', port), ThreadedRequestHandler)
    print(f"Multi-threaded server running on port {port}")
    server.serve_forever()

# 5. Server with Middleware
class MiddlewareHandler(BaseHTTPRequestHandler):
    """Request handler with middleware support"""

    middleware_chain = []

    @classmethod
    def add_middleware(cls, middleware_func):
        """Add middleware to chain"""
        cls.middleware_chain.append(middleware_func)

    def handle_one_request(self):
        """Override to add middleware"""
        # Run middleware before handling request
        for middleware in self.middleware_chain:
            middleware(self)

        super().handle_one_request()

    def do_GET(self):
        """Handle GET request"""
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        response = {'message': 'Hello with middleware!'}
        self.wfile.write(json.dumps(response).encode())

    def log_message(self, format: str, *args):
        pass

# Middleware functions
def logging_middleware(handler):
    """Log all requests"""
    print(f"[LOG] {handler.command} {handler.path}")

def cors_middleware(handler):
    """Add CORS headers"""
    def send_response(code, message=None):
        handler.send_response(code, message)
        handler.send_header('Access-Control-Allow-Origin', '*')
        handler.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
        handler.send_header('Access-Control-Allow-Headers', 'Content-Type')

    # Store original send_response
    handler._original_send_response = handler.send_response
    handler.send_response = send_response

def auth_middleware(handler):
    """Simple authentication middleware"""
    auth_header = handler.headers.get('Authorization')

    if not auth_header or not auth_header.startswith('Bearer '):
        handler.send_response(401)
        handler.end_headers()
        handler.wfile.write(b'Unauthorized')
        return False

    return True

# 6. WebSocket-like Server (using Server-Sent Events)
class SSEHandler(BaseHTTPRequestHandler):
    """Server-Sent Events handler"""

    def do_GET(self):
        """Handle SSE connection"""
        if self.path == '/events':
            self.send_response(200)
            self.send_header('Content-type', 'text/event-stream')
            self.send_header('Cache-Control', 'no-cache')
            self.send_header('Connection', 'keep-alive')
            self.end_headers()

            # Send events
            import time
            for i in range(5):
                event_data = f"data: Message {i}\n\n"
                self.wfile.write(event_data.encode())
                self.wfile.flush()
                time.sleep(1)

    def log_message(self, format: str, *args):
        pass

# 7. Server with Request Logging
class LoggedRequestHandler(BaseHTTPRequestHandler):
    """Request handler with detailed logging"""

    def log_request(self, code: int = '-', size: int = '-'):
        """Log request details"""
        user_agent = self.headers.get('User-Agent', 'Unknown')
        print(f"[{self.log_date_time_string()}] {self.client_address[0]} - {self.command} {self.path} - {code} - {user_agent}")

    def do_GET(self):
        """Handle GET request"""
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        response = {
            'path': self.path,
            'method': self.command,
            'headers': dict(self.headers)
        }
        self.wfile.write(json.dumps(response, indent=2).encode())

    def do_POST(self):
        """Handle POST request"""
        content_length = int(self.headers.get('Content-Length', 0))
        post_data = self.rfile.read(content_length) if content_length > 0 else b''

        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        response = {
            'path': self.path,
            'method': self.command,
            'data': post_data.decode()
        }
        self.wfile.write(json.dumps(response, indent=2).encode())

# Server Management Functions
def start_basic_server(port: int = 8000):
    """Start basic HTTP server"""
    server = HTTPServer(('', port), BasicHttpRequestHandler)
    print(f"Basic server running on http://localhost:{port}")
    server.serve_forever()

def start_rest_api_server(port: int = 8000):
    """Start REST API server"""
    server = HTTPServer(('', port), RestApiHandler)
    print(f"REST API server running on http://localhost:{port}")
    server.serve_forever()

def start_static_server(port: int = 8000, directory: str = '.'):
    """Start static file server"""
    def handler(*args, **kwargs):
        StaticFileHandler(*args, root_dir=directory, **kwargs)

    server = HTTPServer(('', port), handler)
    print(f"Static file server running on http://localhost:{port}")
    print(f"Serving directory: {os.path.abspath(directory)}")
    server.serve_forever()

def start_middleware_server(port: int = 8000):
    """Start server with middleware"""
    MiddlewareHandler.add_middleware(logging_middleware)

    server = HTTPServer(('', port), MiddlewareHandler)
    print(f"Server with middleware running on http://localhost:{port}")
    server.serve_forever()

# Usage Examples
def demonstrate_http_server():
    print("=== Web Python HTTP Server Examples ===\n")
    print("Note: Server examples require running the server in a separate process")
    print("Choose one of the following servers to start:\n")

    print("--- 1. Basic Server ---")
    print("start_basic_server(8000)")
    print("Serves simple HTML page\n")

    print("--- 2. REST API Server ---")
    print("start_rest_api_server(8000)")
    print("Endpoints:")
    print("  GET    /items      - Get all items")
    print("  GET    /items/{id} - Get item by ID")
    print("  POST   /items      - Create new item")
    print("  PUT    /items/{id} - Update item")
    print("  DELETE /items/{id} - Delete item\n")

    print("--- 3. Static File Server ---")
    print("start_static_server(8000, './public')")
    print("Serves static files from directory\n")

    print("--- 4. Multi-threaded Server ---")
    print("start_multi_threaded_server(8000)")
    print("Handles multiple requests concurrently\n")

    print("--- 5. Server with Middleware ---")
    print("start_middleware_server(8000)")
    print("Includes logging and CORS middleware\n")

    print("--- Example Client Usage ---")
    print("# Using requests library")
    print("import requests")
    print("")
    print("# Get all items")
    print("response = requests.get('http://localhost:8000/items')")
    print("print(response.json())")
    print("")
    print("# Create new item")
    print("response = requests.post('http://localhost:8000/items',")
    print("    json={'name': 'Test Item', 'value': 123})")
    print("print(response.json())")

    print("\n=== All HTTP Server Examples Listed ===")

# Export functions and classes
export { BasicHttpRequestHandler, RestApiHandler, StaticFileHandler, SSEHandler, LoggedRequestHandler }
export { start_basic_server, start_rest_api_server, start_static_server, start_multi_threaded_server, start_middleware_server }
export { logging_middleware, cors_middleware, auth_middleware, demonstrate_http_server }

💻 TCP Socket python

🔴 complex ⭐⭐⭐⭐

Create TCP client and server using socket module for bidirectional communication

⏱️ 35 min 🏷️ python, web, networking, socket, tcp
Prerequisites: Advanced Python, socket module, threading
# Web Python TCP Socket Examples
# Creating TCP clients and servers using socket module

# 1. Basic TCP Server
import socket
import threading
import json
from typing import Callable, Optional

class TcpServer:
    """Basic TCP server"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start the TCP server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True

        print(f"TCP server listening on {self.host}:{self.port}")

        try:
            while self.running:
                client_socket, address = self.server_socket.accept()
                print(f"New connection from {address}")
                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket, address)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\nServer shutting down...")
        finally:
            self.stop()

    def _handle_client(self, client_socket: socket.socket, address: tuple):
        """Handle client connection"""
        try:
            while self.running:
                data = client_socket.recv(1024)
                if not data:
                    break

                message = data.decode('utf-8')
                print(f"Received from {address}: {message}")

                # Echo back
                response = f"Echo: {message}"
                client_socket.send(response.encode('utf-8'))
        except Exception as e:
            print(f"Error handling client {address}: {e}")
        finally:
            client_socket.close()
            print(f"Connection closed: {address}")

    def stop(self):
        """Stop the server"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()

# 2. Basic TCP Client
class TcpClient:
    """Basic TCP client"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.client_socket: Optional[socket.socket] = None

    def connect(self):
        """Connect to server"""
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((self.host, self.port))
        print(f"Connected to {self.host}:{self.port}")

    def send(self, message: str):
        """Send message to server"""
        if self.client_socket:
            self.client_socket.send(message.encode('utf-8'))

    def receive(self, buffer_size: int = 1024) -> str:
        """Receive message from server"""
        if self.client_socket:
            data = self.client_socket.recv(buffer_size)
            return data.decode('utf-8')
        return ''

    def close(self):
        """Close connection"""
        if self.client_socket:
            self.client_socket.close()

# 3. Chat Server
class ChatServer:
    """Multi-client chat server"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.clients: list[socket.socket] = []
        self.nicknames: dict[socket.socket, str] = {}
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start chat server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True

        print(f"Chat server listening on {self.host}:{self.port}")

        try:
            while self.running:
                client_socket, address = self.server_socket.accept()
                print(f"New connection from {address}")
                self.clients.append(client_socket)

                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket,)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\nChat server shutting down...")
        finally:
            self.stop()

    def broadcast(self, message: str, sender_socket: Optional[socket.socket] = None):
        """Broadcast message to all clients"""
        for client in self.clients:
            if client != sender_socket:
                try:
                    client.send(message.encode('utf-8'))
                except:
                    self.clients.remove(client)

    def _handle_client(self, client_socket: socket.socket):
        """Handle client communication"""
        try:
            # Request nickname
            client_socket.send("NICK".encode('utf-8'))
            nickname = client_socket.recv(1024).decode('utf-8')
            self.nicknames[client_socket] = nickname

            self.broadcast(f"{nickname} joined the chat!")

            while self.running:
                message = client_socket.recv(1024).decode('utf-8')
                if message:
                    self.broadcast(f"{nickname}: {message}", client_socket)
                else:
                    break
        except Exception as e:
            print(f"Error handling client: {e}")
        finally:
            if client_socket in self.clients:
                nickname = self.nicknames.get(client_socket, 'Unknown')
                self.clients.remove(client_socket)
                self.broadcast(f"{nickname} left the chat!")
                client_socket.close()

    def stop(self):
        """Stop chat server"""
        self.running = False
        for client in self.clients:
            client.close()
        if self.server_socket:
            self.server_socket.close()

# 4. Chat Client
class ChatClient:
    """Chat client with interactive interface"""

    def __init__(self, host: str = 'localhost', port: int = 9999, nickname: str = 'Anonymous'):
        self.host = host
        self.port = port
        self.nickname = nickname
        self.client_socket: Optional[socket.socket] = None
        self.running = False

    def connect(self):
        """Connect to chat server"""
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((self.host, self.port))

        # Send nickname
        self.client_socket.send(self.nickname.encode('utf-8'))
        self.running = True

        # Start receive thread
        receive_thread = threading.Thread(target=self._receive_messages)
        receive_thread.daemon = True
        receive_thread.start()

    def _receive_messages(self):
        """Receive messages from server"""
        while self.running:
            try:
                message = self.client_socket.recv(1024).decode('utf-8')
                if message:
                    print(f"\r{message}")
                    print(f"{self.nickname}: ", end='', flush=True)
            except:
                break

    def send_message(self, message: str):
        """Send message to server"""
        if self.client_socket:
            self.client_socket.send(message.encode('utf-8'))

    def close(self):
        """Close connection"""
        self.running = False
        if self.client_socket:
            self.client_socket.close()

# 5. Message Protocol Server (JSON-based)
class ProtocolServer:
    """Server using JSON message protocol"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start protocol server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True

        print(f"Protocol server listening on {self.host}:{self.port}")

        try:
            while self.running:
                client_socket, address = self.server_socket.accept()
                print(f"New connection from {address}")

                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket, address)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\nServer shutting down...")
        finally:
            self.stop()

    def _handle_client(self, client_socket: socket.socket, address: tuple):
        """Handle client with protocol"""
        try:
            while self.running:
                data = client_socket.recv(4096)
                if not data:
                    break

                try:
                    message = json.loads(data.decode('utf-8'))
                    response = self._process_message(message)
                    client_socket.send(json.dumps(response).encode('utf-8'))
                except json.JSONDecodeError:
                    error_response = {'status': 'error', 'message': 'Invalid JSON'}
                    client_socket.send(json.dumps(error_response).encode('utf-8'))
        except Exception as e:
            print(f"Error handling client {address}: {e}")
        finally:
            client_socket.close()

    def _process_message(self, message: dict) -> dict:
        """Process incoming message"""
        msg_type = message.get('type')

        if msg_type == 'ping':
            return {'type': 'pong', 'timestamp': message.get('timestamp')}
        elif msg_type == 'echo':
            return {'type': 'echo_response', 'data': message.get('data')}
        elif msg_type == 'add':
            a = message.get('a', 0)
            b = message.get('b', 0)
            return {'type': 'result', 'operation': 'add', 'result': a + b}
        elif msg_type == 'get_time':
            from datetime import datetime
            return {'type': 'time', 'timestamp': datetime.now().isoformat()}
        else:
            return {'type': 'error', 'message': 'Unknown message type'}

    def stop(self):
        """Stop server"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()

# 6. File Transfer Server
class FileTransferServer:
    """Server for file transfer"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start file transfer server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True

        print(f"File transfer server listening on {self.host}:{self.port}")

        try:
            while self.running:
                client_socket, address = self.server_socket.accept()
                print(f"New connection from {address}")

                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket, address)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("\nServer shutting down...")
        finally:
            self.stop()

    def _handle_client(self, client_socket: socket.socket, address: tuple):
        """Handle file transfer"""
        try:
            # Receive file info
            file_info = client_socket.recv(1024).decode('utf-8')
            filename, filesize = file_info.split('|')
            filesize = int(filesize)

            print(f"Receiving file: {filename} ({filesize} bytes)")

            # Receive file data
            received = 0
            with open(filename, 'wb') as f:
                while received < filesize:
                    data = client_socket.recv(4096)
                    if not data:
                        break
                    f.write(data)
                    received += len(data)
                    progress = (received / filesize) * 100
                    print(f"\rProgress: {progress:.1f}%", end='')

            print(f"\nFile received successfully!")
            client_socket.send(b"ACK")

        except Exception as e:
            print(f"Error: {e}")
        finally:
            client_socket.close()

    def stop(self):
        """Stop server"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()

# 7. File Transfer Client
class FileTransferClient:
    """Client for file transfer"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.client_socket: Optional[socket.socket] = None

    def connect(self):
        """Connect to server"""
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((self.host, self.port))
        print(f"Connected to {self.host}:{self.port}")

    def send_file(self, filename: str):
        """Send file to server"""
        import os

        if not os.path.exists(filename):
            print(f"File not found: {filename}")
            return

        filesize = os.path.getsize(filename)

        # Send file info
        file_info = f"{filename}|{filesize}"
        self.client_socket.send(file_info.encode('utf-8'))

        # Send file data
        sent = 0
        with open(filename, 'rb') as f:
            while sent < filesize:
                data = f.read(4096)
                self.client_socket.send(data)
                sent += len(data)
                progress = (sent / filesize) * 100
                print(f"\rProgress: {progress:.1f}%", end='')

        print(f"\nFile sent successfully!")

        # Wait for acknowledgment
        ack = self.client_socket.recv(1024)
        print(f"Server response: {ack.decode('utf-8')}")

    def close(self):
        """Close connection"""
        if self.client_socket:
            self.client_socket.close()

# 8. UDP Socket (for comparison)
class UdpServer:
    """Basic UDP server"""

    def __init__(self, host: str = 'localhost', port: int = 9999):
        self.host = host
        self.port = port
        self.server_socket: Optional[socket.socket] = None
        self.running = False

    def start(self):
        """Start UDP server"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.server_socket.bind((self.host, self.port))
        self.running = True

        print(f"UDP server listening on {self.host}:{self.port}")

        try:
            while self.running:
                data, address = self.server_socket.recvfrom(1024)
                message = data.decode('utf-8')
                print(f"Received from {address}: {message}")

                response = f"Echo: {message}"
                self.server_socket.sendto(response.encode('utf-8'), address)
        except KeyboardInterrupt:
            print("\nUDP server shutting down...")
        finally:
            self.stop()

    def stop(self):
        """Stop UDP server"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()

# Usage Examples
def demonstrate_tcp_socket():
    print("=== Web Python TCP Socket Examples ===\n")
    print("Note: Socket examples require running server and client in separate processes")
    print("Choose one of the following examples:\n")

    print("--- 1. Basic TCP Server ---")
    print("# Server")
    print("server = TcpServer('localhost', 9999)")
    print("server.start()")
    print("")
    print("# Client")
    print("client = TcpClient('localhost', 9999)")
    print("client.connect()")
    print("client.send('Hello, Server!')")
    print("response = client.receive()")
    print("print(response)")
    print("client.close()\n")

    print("--- 2. Chat Server ---")
    print("# Server")
    print("chat_server = ChatServer('localhost', 9999)")
    print("chat_server.start()")
    print("")
    print("# Client")
    print("chat_client = ChatClient('localhost', 9999, 'Alice')")
    print("chat_client.connect()")
    print("# Then send messages:")
    print("chat_client.send_message('Hello everyone!')\n")

    print("--- 3. Protocol Server (JSON-based) ---")
    print("# Server")
    print("protocol_server = ProtocolServer('localhost', 9999)")
    print("protocol_server.start()")
    print("")
    print("# Client sends JSON messages:")
    print('client.send(json.dumps({"type": "ping", "timestamp": 123456}))')
    print('client.send(json.dumps({"type": "add", "a": 5, "b": 3}))')
    print('client.send(json.dumps({"type": "echo", "data": "Hello"}))\n')

    print("--- 4. File Transfer ---")
    print("# Server")
    print("file_server = FileTransferServer('localhost', 9999)")
    print("file_server.start()")
    print("")
    print("# Client")
    print("file_client = FileTransferClient('localhost', 9999)")
    print("file_client.connect()")
    print("file_client.send_file('example.txt')")
    print("file_client.close()\n")

    print("--- 5. UDP Server (for comparison) ---")
    print("# Server")
    print("udp_server = UdpServer('localhost', 9999)")
    print("udp_server.start()")
    print("")
    print("# Client")
    print("sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)")
    print("sock.sendto(b'Hello UDP!', ('localhost', 9999))")
    print("data, addr = sock.recvfrom(1024)")
    print("print(data.decode())\n")

    print("\n=== All TCP Socket Examples Listed ===")

# Export functions and classes
export { TcpServer, TcpClient, ChatServer, ChatClient, ProtocolServer, FileTransferServer, FileTransferClient, UdpServer }
export { demonstrate_tcp_socket }