🎯 empfohlene Sammlungen
Balanced sample collections from various categories for you to explore
Web Python Netzwerkbeispiele
Web Python Netzwerkbeispiele einschließlich HTTP-Anfragen, HTTP-Server und TCP-Socket-Programmierung
💻 HTTP-Anfragen python
🟡 intermediate
⭐⭐⭐
GET, POST, PUT, DELETE-Anfragen mit der Bibliothek requests senden, mit Fehlerbehandlung und Antwortanalyse
⏱️ 25 min
🏷️ python, web, networking, http
Prerequisites:
Intermediate Python, requests library
# Web Python HTTP Requests Examples
# Using requests library for HTTP communication with servers
# 1. Basic HTTP Request Methods
import requests
import json
from typing import Any, Dict, Optional
class HttpClient:
"""Simple HTTP client for making requests"""
def __init__(self, base_url: str = '', default_headers: Optional[Dict[str, str]] = None):
self.base_url = base_url
self.default_headers = {
'Content-Type': 'application/json',
**(default_headers or {})
}
def get(self, url: str, headers: Optional[Dict[str, str]] = None, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Send GET request"""
response = requests.get(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})},
params=params
)
response.raise_for_status()
return response.json()
def post(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Send POST request"""
response = requests.post(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})},
json=data
)
response.raise_for_status()
return response.json()
def put(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Send PUT request"""
response = requests.put(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})},
json=data
)
response.raise_for_status()
return response.json()
def delete(self, url: str, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Send DELETE request"""
response = requests.delete(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})}
)
response.raise_for_status()
return response.json()
def patch(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Send PATCH request"""
response = requests.patch(
f"{self.base_url}{url}",
headers={**self.default_headers, **(headers or {})},
json=data
)
response.raise_for_status()
return response.json()
# 2. Advanced HTTP Features
class AdvancedHttpClient:
"""Advanced HTTP client with additional features"""
def get_with_params(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Send GET request with query parameters"""
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
def get_with_timeout(self, url: str, timeout: int = 5) -> Dict[str, Any]:
"""Send GET request with timeout"""
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response.json()
def post_with_files(self, url: str, files: Dict[str, Any], data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Upload files via POST request"""
response = requests.post(url, files=files, data=data)
response.raise_for_status()
return response.json()
def download_file(self, url: str, filename: str) -> None:
"""Download file from URL"""
response = requests.get(url, stream=True)
response.raise_for_status()
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
def download_file_with_progress(self, url: str, filename: str) -> None:
"""Download file with progress indicator"""
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"\rProgress: {progress:.1f}%", end='')
print() # New line after progress
def get_with_auth(self, url: str, username: str, password: str) -> Dict[str, Any]:
"""Send GET request with basic authentication"""
response = requests.get(url, auth=(username, password))
response.raise_for_status()
return response.json()
def get_with_bearer_token(self, url: str, token: str) -> Dict[str, Any]:
"""Send GET request with Bearer token"""
headers = {'Authorization': f'Bearer {token}'}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
# 3. Request with Session and Cookies
class SessionClient:
"""HTTP client with session management"""
def __init__(self):
self.session = requests.Session()
def get(self, url: str, **kwargs) -> Dict[str, Any]:
"""Send GET request using session"""
response = self.session.get(url, **kwargs)
response.raise_for_status()
return response.json()
def post(self, url: str, **kwargs) -> Dict[str, Any]:
"""Send POST request using session"""
response = self.session.post(url, **kwargs)
response.raise_for_status()
return response.json()
def set_cookie(self, name: str, value: str):
"""Set cookie in session"""
self.session.cookies.set(name, value)
def get_cookies(self) -> Dict[str, str]:
"""Get all cookies from session"""
return dict(self.session.cookies)
def close(self):
"""Close session"""
self.session.close()
# 4. Retry Logic
class RetryClient:
"""HTTP client with automatic retry logic"""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
def get_with_retry(self, url: str, **kwargs) -> Dict[str, Any]:
"""Send GET request with retry on failure"""
import time
last_error = None
for attempt in range(self.max_retries + 1):
try:
response = requests.get(url, **kwargs)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
last_error = e
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt)
print(f"Retry {attempt + 1}/{self.max_retries} after {delay}s")
time.sleep(delay)
raise last_error
# 5. Response Handling
def handle_response(response: requests.Response) -> Dict[str, Any]:
"""Handle different response types"""
result = {
'status_code': response.status_code,
'headers': dict(response.headers),
'success': response.ok
}
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
result['data'] = response.json()
elif 'text/' in content_type:
result['data'] = response.text
else:
result['data'] = response.content
return result
def check_status_code(status_code: int) -> str:
"""Get status code description"""
status_codes = {
200: 'OK',
201: 'Created',
204: 'No Content',
400: 'Bad Request',
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
500: 'Internal Server Error',
502: 'Bad Gateway',
503: 'Service Unavailable'
}
return status_codes.get(status_code, 'Unknown')
# 6. API Client Builder
class ApiClient:
"""Generic REST API client"""
def __init__(self, base_url: str, auth_token: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.headers = {'Content-Type': 'application/json'}
if auth_token:
self.headers['Authorization'] = f'Bearer {auth_token}'
def get_all(self, resource: str) -> list:
"""Get all items"""
response = requests.get(f"{self.base_url}/{resource}", headers=self.headers)
response.raise_for_status()
return response.json()
def get_by_id(self, resource: str, item_id: Any) -> Dict[str, Any]:
"""Get item by ID"""
response = requests.get(f"{self.base_url}/{resource}/{item_id}", headers=self.headers)
response.raise_for_status()
return response.json()
def create(self, resource: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create new item"""
response = requests.post(
f"{self.base_url}/{resource}",
headers=self.headers,
json=data
)
response.raise_for_status()
return response.json()
def update(self, resource: str, item_id: Any, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update item"""
response = requests.put(
f"{self.base_url}/{resource}/{item_id}",
headers=self.headers,
json=data
)
response.raise_for_status()
return response.json()
def partial_update(self, resource: str, item_id: Any, data: Dict[str, Any]) -> Dict[str, Any]:
"""Partial update item"""
response = requests.patch(
f"{self.base_url}/{resource}/{item_id}",
headers=self.headers,
json=data
)
response.raise_for_status()
return response.json()
def delete(self, resource: str, item_id: Any) -> Dict[str, Any]:
"""Delete item"""
response = requests.delete(
f"{self.base_url}/{resource}/{item_id}",
headers=self.headers
)
response.raise_for_status()
return response.json()
# 7. Async HTTP Client (using aiohttp)
async def async_get_request(url: str) -> Dict[str, Any]:
"""Async GET request using aiohttp"""
try:
import aiohttp
import asyncio
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except ImportError:
print("aiohttp not installed. Install with: pip install aiohttp")
return {}
# 8. Request Logging
class LoggedHttpClient:
"""HTTP client with request logging"""
def __init__(self):
self.request_log = []
def get(self, url: str, **kwargs) -> Dict[str, Any]:
"""Send GET request with logging"""
import time
start_time = time.time()
response = requests.get(url, **kwargs)
duration = time.time() - start_time
log_entry = {
'method': 'GET',
'url': url,
'status_code': response.status_code,
'duration': duration,
'success': response.ok
}
self.request_log.append(log_entry)
response.raise_for_status()
return response.json()
def get_logs(self) -> list:
"""Get all request logs"""
return self.request_log
def get_stats(self) -> Dict[str, Any]:
"""Get request statistics"""
if not self.request_log:
return {}
total = len(self.request_log)
successful = sum(1 for log in self.request_log if log['success'])
avg_duration = sum(log['duration'] for log in self.request_log) / total
return {
'total_requests': total,
'successful': successful,
'failed': total - successful,
'avg_duration': avg_duration
}
# Usage Examples
def demonstrate_http_requests():
print("=== Web Python HTTP Requests Examples ===\n")
# 1. Basic HTTP methods
print("--- 1. Basic HTTP Methods ---")
client = HttpClient('https://jsonplaceholder.typicode.com')
try:
posts = client.get('/posts')
print(f"GET /posts: Retrieved {len(posts)} posts")
new_post = client.post('/posts', {
'title': 'Test Post',
'body': 'This is a test post',
'userId': 1
})
print(f"POST /posts: Created post with ID {new_post.get('id')}")
except requests.RequestException as e:
print(f"Error: {e}")
# 2. Advanced features
print("\n--- 2. Advanced HTTP Features ---")
advanced_client = AdvancedHttpClient()
try:
filtered_posts = advanced_client.get_with_params(
'https://jsonplaceholder.typicode.com/posts',
{'userId': 1, '_limit': 5}
)
print(f"Filtered posts: {len(filtered_posts)} results")
except requests.RequestException as e:
print(f"Error: {e}")
# 3. Session management
print("\n--- 3. Session Management ---")
session_client = SessionClient()
try:
session_client.set_cookie('session_id', 'abc123')
data = session_client.get('https://jsonplaceholder.typicode.com/posts/1')
print(f"Session cookies: {session_client.get_cookies()}")
except requests.RequestException as e:
print(f"Error: {e}")
# 4. Retry logic
print("\n--- 4. Retry Logic ---")
retry_client = RetryClient(max_retries=3)
try:
result = retry_client.get_with_retry('https://jsonplaceholder.typicode.com/posts/1')
print(f"Retry successful: {result.get('title')}")
except requests.RequestException as e:
print(f"Error after retries: {e}")
# 5. REST API client
print("\n--- 5. REST API Client ---")
api_client = ApiClient('https://jsonplaceholder.typicode.com')
try:
todos = api_client.get_all('todos')
print(f"GET /todos: {len(todos)} items")
todo = api_client.get_by_id('todos', 1)
print(f"GET /todos/1: {todo.get('title')}")
except requests.RequestException as e:
print(f"Error: {e}")
# 6. Request logging
print("\n--- 6. Request Logging ---")
logged_client = LoggedHttpClient()
try:
logged_client.get('https://jsonplaceholder.typicode.com/posts')
logged_client.get('https://jsonplaceholder.typicode.com/users')
stats = logged_client.get_stats()
print(f"Request stats: {stats}")
except requests.RequestException as e:
print(f"Error: {e}")
print("\n=== All HTTP Requests Examples Completed ===")
# Export functions
export { HttpClient, AdvancedHttpClient, SessionClient, RetryClient, ApiClient, LoggedHttpClient }
export { handle_response, check_status_code, async_get_request, demonstrate_http_requests }
💻 HTTP-Server python
🟡 intermediate
⭐⭐⭐
Einfachen HTTP-Server mit dem Modul http.server erstellen, mit Routing, Anfrageverarbeitung und statischem Dateidienst
⏱️ 30 min
🏷️ python, web, networking, server
Prerequisites:
Intermediate Python, http.server module
# Web Python HTTP Server Examples
# Creating HTTP servers using http.server module
# 1. Basic HTTP Server
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
from typing import Dict, Any, Optional
import os
import mimetypes
import urllib.parse
class BasicHttpRequestHandler(BaseHTTPRequestHandler):
"""Basic HTTP request handler"""
def do_GET(self):
"""Handle GET requests"""
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
response = "<html><body><h1>Hello, World!</h1></body></html>"
self.wfile.write(response.encode())
def do_POST(self):
"""Handle POST requests"""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'received': post_data.decode(), 'status': 'success'}
self.wfile.write(json.dumps(response).encode())
def log_message(self, format: str, *args):
"""Custom log message"""
print(f"[{self.log_date_time_string()}] {format % args}")
# 2. REST API Server
class RestApiHandler(BaseHTTPRequestHandler):
"""REST API request handler with routing"""
# Simple in-memory database
database: Dict[str, Dict[str, Any]] = {}
def do_GET(self):
"""Handle GET requests"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path == '/items':
self._get_all_items()
elif path.startswith('/items/'):
item_id = path.split('/')[-1]
self._get_item(item_id)
else:
self._send_error(404, 'Not Found')
def do_POST(self):
"""Handle POST requests"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path == '/items':
self._create_item()
else:
self._send_error(404, 'Not Found')
def do_PUT(self):
"""Handle PUT requests"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path.startswith('/items/'):
item_id = path.split('/')[-1]
self._update_item(item_id)
else:
self._send_error(404, 'Not Found')
def do_DELETE(self):
"""Handle DELETE requests"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path.startswith('/items/'):
item_id = path.split('/')[-1]
self._delete_item(item_id)
else:
self._send_error(404, 'Not Found')
def _get_all_items(self):
"""Get all items"""
self._send_json_response(list(RestApiHandler.database.values()))
def _get_item(self, item_id: str):
"""Get specific item"""
if item_id in RestApiHandler.database:
self._send_json_response(RestApiHandler.database[item_id])
else:
self._send_error(404, 'Item not found')
def _create_item(self):
"""Create new item"""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode())
item_id = str(len(RestApiHandler.database) + 1)
data['id'] = item_id
RestApiHandler.database[item_id] = data
self._send_json_response(data, 201)
except json.JSONDecodeError:
self._send_error(400, 'Invalid JSON')
def _update_item(self, item_id: str):
"""Update item"""
if item_id not in RestApiHandler.database:
self._send_error(404, 'Item not found')
return
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode())
RestApiHandler.database[item_id].update(data)
self._send_json_response(RestApiHandler.database[item_id])
except json.JSONDecodeError:
self._send_error(400, 'Invalid JSON')
def _delete_item(self, item_id: str):
"""Delete item"""
if item_id in RestApiHandler.database:
del RestApiHandler.database[item_id]
self._send_json_response({'message': 'Item deleted'})
else:
self._send_error(404, 'Item not found')
def _send_json_response(self, data: Any, status_code: int = 200):
"""Send JSON response"""
self.send_response(status_code)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def _send_error(self, status_code: int, message: str):
"""Send error response"""
self._send_json_response({'error': message}, status_code)
def log_message(self, format: str, *args):
"""Suppress default logging"""
pass
# 3. Static File Server
class StaticFileHandler(BaseHTTPRequestHandler):
"""Handler for serving static files"""
def __init__(self, *args, root_dir: str = '.', **kwargs):
self.root_dir = root_dir
super().__init__(*args, **kwargs)
def do_GET(self):
"""Handle GET requests for static files"""
parsed_path = urllib.parse.urlparse(self.path)
file_path = os.path.join(self.root_dir, parsed_path.path.lstrip('/'))
# Default to index.html
if os.path.isdir(file_path):
file_path = os.path.join(file_path, 'index.html')
# Security check
if not os.path.abspath(file_path).startswith(os.path.abspath(self.root_dir)):
self._send_error(403, 'Forbidden')
return
if os.path.exists(file_path) and os.path.isfile(file_path):
self._serve_file(file_path)
else:
self._send_error(404, 'File not found')
def _serve_file(self, file_path: str):
"""Serve file content"""
content_type, _ = mimetypes.guess_type(file_path)
if content_type is None:
content_type = 'application/octet-stream'
self.send_response(200)
self.send_header('Content-type', content_type)
# Add CORS headers
self.send_header('Access-Control-Allow-Origin', '*')
# Add cache control
self.send_header('Cache-Control', 'max-age=3600')
file_size = os.path.getsize(file_path)
self.send_header('Content-Length', str(file_size))
self.end_headers()
with open(file_path, 'rb') as f:
self.wfile.write(f.read())
def _send_error(self, status_code: int, message: str):
"""Send error response"""
self.send_response(status_code)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(f"<html><body><h1>{status_code} - {message}</h1></body></html>".encode())
def log_message(self, format: str, *args):
"""Custom logging"""
pass
# 4. Multi-threaded Server
class ThreadedHTTPServer(HTTPServer):
"""Multi-threaded HTTP server"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def start_multi_threaded_server(port: int = 8000):
"""Start multi-threaded server"""
from threading import Thread
class ThreadedRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""Handle GET request"""
import threading
thread_id = threading.get_ident()
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
response = f"<html><body><h1>Hello from thread {thread_id}!</h1></body></html>"
self.wfile.write(response.encode())
def log_message(self, format: str, *args):
pass
server = ThreadedHTTPServer(('', port), ThreadedRequestHandler)
print(f"Multi-threaded server running on port {port}")
server.serve_forever()
# 5. Server with Middleware
class MiddlewareHandler(BaseHTTPRequestHandler):
"""Request handler with middleware support"""
middleware_chain = []
@classmethod
def add_middleware(cls, middleware_func):
"""Add middleware to chain"""
cls.middleware_chain.append(middleware_func)
def handle_one_request(self):
"""Override to add middleware"""
# Run middleware before handling request
for middleware in self.middleware_chain:
middleware(self)
super().handle_one_request()
def do_GET(self):
"""Handle GET request"""
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'message': 'Hello with middleware!'}
self.wfile.write(json.dumps(response).encode())
def log_message(self, format: str, *args):
pass
# Middleware functions
def logging_middleware(handler):
"""Log all requests"""
print(f"[LOG] {handler.command} {handler.path}")
def cors_middleware(handler):
"""Add CORS headers"""
def send_response(code, message=None):
handler.send_response(code, message)
handler.send_header('Access-Control-Allow-Origin', '*')
handler.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
handler.send_header('Access-Control-Allow-Headers', 'Content-Type')
# Store original send_response
handler._original_send_response = handler.send_response
handler.send_response = send_response
def auth_middleware(handler):
"""Simple authentication middleware"""
auth_header = handler.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
handler.send_response(401)
handler.end_headers()
handler.wfile.write(b'Unauthorized')
return False
return True
# 6. WebSocket-like Server (using Server-Sent Events)
class SSEHandler(BaseHTTPRequestHandler):
"""Server-Sent Events handler"""
def do_GET(self):
"""Handle SSE connection"""
if self.path == '/events':
self.send_response(200)
self.send_header('Content-type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache')
self.send_header('Connection', 'keep-alive')
self.end_headers()
# Send events
import time
for i in range(5):
event_data = f"data: Message {i}\n\n"
self.wfile.write(event_data.encode())
self.wfile.flush()
time.sleep(1)
def log_message(self, format: str, *args):
pass
# 7. Server with Request Logging
class LoggedRequestHandler(BaseHTTPRequestHandler):
"""Request handler with detailed logging"""
def log_request(self, code: int = '-', size: int = '-'):
"""Log request details"""
user_agent = self.headers.get('User-Agent', 'Unknown')
print(f"[{self.log_date_time_string()}] {self.client_address[0]} - {self.command} {self.path} - {code} - {user_agent}")
def do_GET(self):
"""Handle GET request"""
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'path': self.path,
'method': self.command,
'headers': dict(self.headers)
}
self.wfile.write(json.dumps(response, indent=2).encode())
def do_POST(self):
"""Handle POST request"""
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length) if content_length > 0 else b''
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'path': self.path,
'method': self.command,
'data': post_data.decode()
}
self.wfile.write(json.dumps(response, indent=2).encode())
# Server Management Functions
def start_basic_server(port: int = 8000):
"""Start basic HTTP server"""
server = HTTPServer(('', port), BasicHttpRequestHandler)
print(f"Basic server running on http://localhost:{port}")
server.serve_forever()
def start_rest_api_server(port: int = 8000):
"""Start REST API server"""
server = HTTPServer(('', port), RestApiHandler)
print(f"REST API server running on http://localhost:{port}")
server.serve_forever()
def start_static_server(port: int = 8000, directory: str = '.'):
"""Start static file server"""
def handler(*args, **kwargs):
StaticFileHandler(*args, root_dir=directory, **kwargs)
server = HTTPServer(('', port), handler)
print(f"Static file server running on http://localhost:{port}")
print(f"Serving directory: {os.path.abspath(directory)}")
server.serve_forever()
def start_middleware_server(port: int = 8000):
"""Start server with middleware"""
MiddlewareHandler.add_middleware(logging_middleware)
server = HTTPServer(('', port), MiddlewareHandler)
print(f"Server with middleware running on http://localhost:{port}")
server.serve_forever()
# Usage Examples
def demonstrate_http_server():
print("=== Web Python HTTP Server Examples ===\n")
print("Note: Server examples require running the server in a separate process")
print("Choose one of the following servers to start:\n")
print("--- 1. Basic Server ---")
print("start_basic_server(8000)")
print("Serves simple HTML page\n")
print("--- 2. REST API Server ---")
print("start_rest_api_server(8000)")
print("Endpoints:")
print(" GET /items - Get all items")
print(" GET /items/{id} - Get item by ID")
print(" POST /items - Create new item")
print(" PUT /items/{id} - Update item")
print(" DELETE /items/{id} - Delete item\n")
print("--- 3. Static File Server ---")
print("start_static_server(8000, './public')")
print("Serves static files from directory\n")
print("--- 4. Multi-threaded Server ---")
print("start_multi_threaded_server(8000)")
print("Handles multiple requests concurrently\n")
print("--- 5. Server with Middleware ---")
print("start_middleware_server(8000)")
print("Includes logging and CORS middleware\n")
print("--- Example Client Usage ---")
print("# Using requests library")
print("import requests")
print("")
print("# Get all items")
print("response = requests.get('http://localhost:8000/items')")
print("print(response.json())")
print("")
print("# Create new item")
print("response = requests.post('http://localhost:8000/items',")
print(" json={'name': 'Test Item', 'value': 123})")
print("print(response.json())")
print("\n=== All HTTP Server Examples Listed ===")
# Export functions and classes
export { BasicHttpRequestHandler, RestApiHandler, StaticFileHandler, SSEHandler, LoggedRequestHandler }
export { start_basic_server, start_rest_api_server, start_static_server, start_multi_threaded_server, start_middleware_server }
export { logging_middleware, cors_middleware, auth_middleware, demonstrate_http_server }
💻 TCP-Socket python
🔴 complex
⭐⭐⭐⭐
TCP-Client und -Server mit dem Socket-Modul für bidirektionale Kommunikation erstellen
⏱️ 35 min
🏷️ python, web, networking, socket, tcp
Prerequisites:
Advanced Python, socket module, threading
# Web Python TCP Socket Examples
# Creating TCP clients and servers using socket module
# 1. Basic TCP Server
import socket
import threading
import json
from typing import Callable, Optional
class TcpServer:
"""Basic TCP server"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start the TCP server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"TCP server listening on {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
self.stop()
def _handle_client(self, client_socket: socket.socket, address: tuple):
"""Handle client connection"""
try:
while self.running:
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"Received from {address}: {message}")
# Echo back
response = f"Echo: {message}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"Error handling client {address}: {e}")
finally:
client_socket.close()
print(f"Connection closed: {address}")
def stop(self):
"""Stop the server"""
self.running = False
if self.server_socket:
self.server_socket.close()
# 2. Basic TCP Client
class TcpClient:
"""Basic TCP client"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.client_socket: Optional[socket.socket] = None
def connect(self):
"""Connect to server"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
print(f"Connected to {self.host}:{self.port}")
def send(self, message: str):
"""Send message to server"""
if self.client_socket:
self.client_socket.send(message.encode('utf-8'))
def receive(self, buffer_size: int = 1024) -> str:
"""Receive message from server"""
if self.client_socket:
data = self.client_socket.recv(buffer_size)
return data.decode('utf-8')
return ''
def close(self):
"""Close connection"""
if self.client_socket:
self.client_socket.close()
# 3. Chat Server
class ChatServer:
"""Multi-client chat server"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.clients: list[socket.socket] = []
self.nicknames: dict[socket.socket, str] = {}
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start chat server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"Chat server listening on {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
self.clients.append(client_socket)
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket,)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nChat server shutting down...")
finally:
self.stop()
def broadcast(self, message: str, sender_socket: Optional[socket.socket] = None):
"""Broadcast message to all clients"""
for client in self.clients:
if client != sender_socket:
try:
client.send(message.encode('utf-8'))
except:
self.clients.remove(client)
def _handle_client(self, client_socket: socket.socket):
"""Handle client communication"""
try:
# Request nickname
client_socket.send("NICK".encode('utf-8'))
nickname = client_socket.recv(1024).decode('utf-8')
self.nicknames[client_socket] = nickname
self.broadcast(f"{nickname} joined the chat!")
while self.running:
message = client_socket.recv(1024).decode('utf-8')
if message:
self.broadcast(f"{nickname}: {message}", client_socket)
else:
break
except Exception as e:
print(f"Error handling client: {e}")
finally:
if client_socket in self.clients:
nickname = self.nicknames.get(client_socket, 'Unknown')
self.clients.remove(client_socket)
self.broadcast(f"{nickname} left the chat!")
client_socket.close()
def stop(self):
"""Stop chat server"""
self.running = False
for client in self.clients:
client.close()
if self.server_socket:
self.server_socket.close()
# 4. Chat Client
class ChatClient:
"""Chat client with interactive interface"""
def __init__(self, host: str = 'localhost', port: int = 9999, nickname: str = 'Anonymous'):
self.host = host
self.port = port
self.nickname = nickname
self.client_socket: Optional[socket.socket] = None
self.running = False
def connect(self):
"""Connect to chat server"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
# Send nickname
self.client_socket.send(self.nickname.encode('utf-8'))
self.running = True
# Start receive thread
receive_thread = threading.Thread(target=self._receive_messages)
receive_thread.daemon = True
receive_thread.start()
def _receive_messages(self):
"""Receive messages from server"""
while self.running:
try:
message = self.client_socket.recv(1024).decode('utf-8')
if message:
print(f"\r{message}")
print(f"{self.nickname}: ", end='', flush=True)
except:
break
def send_message(self, message: str):
"""Send message to server"""
if self.client_socket:
self.client_socket.send(message.encode('utf-8'))
def close(self):
"""Close connection"""
self.running = False
if self.client_socket:
self.client_socket.close()
# 5. Message Protocol Server (JSON-based)
class ProtocolServer:
"""Server using JSON message protocol"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start protocol server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"Protocol server listening on {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
self.stop()
def _handle_client(self, client_socket: socket.socket, address: tuple):
"""Handle client with protocol"""
try:
while self.running:
data = client_socket.recv(4096)
if not data:
break
try:
message = json.loads(data.decode('utf-8'))
response = self._process_message(message)
client_socket.send(json.dumps(response).encode('utf-8'))
except json.JSONDecodeError:
error_response = {'status': 'error', 'message': 'Invalid JSON'}
client_socket.send(json.dumps(error_response).encode('utf-8'))
except Exception as e:
print(f"Error handling client {address}: {e}")
finally:
client_socket.close()
def _process_message(self, message: dict) -> dict:
"""Process incoming message"""
msg_type = message.get('type')
if msg_type == 'ping':
return {'type': 'pong', 'timestamp': message.get('timestamp')}
elif msg_type == 'echo':
return {'type': 'echo_response', 'data': message.get('data')}
elif msg_type == 'add':
a = message.get('a', 0)
b = message.get('b', 0)
return {'type': 'result', 'operation': 'add', 'result': a + b}
elif msg_type == 'get_time':
from datetime import datetime
return {'type': 'time', 'timestamp': datetime.now().isoformat()}
else:
return {'type': 'error', 'message': 'Unknown message type'}
def stop(self):
"""Stop server"""
self.running = False
if self.server_socket:
self.server_socket.close()
# 6. File Transfer Server
class FileTransferServer:
"""Server for file transfer"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start file transfer server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"File transfer server listening on {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nServer shutting down...")
finally:
self.stop()
def _handle_client(self, client_socket: socket.socket, address: tuple):
"""Handle file transfer"""
try:
# Receive file info
file_info = client_socket.recv(1024).decode('utf-8')
filename, filesize = file_info.split('|')
filesize = int(filesize)
print(f"Receiving file: {filename} ({filesize} bytes)")
# Receive file data
received = 0
with open(filename, 'wb') as f:
while received < filesize:
data = client_socket.recv(4096)
if not data:
break
f.write(data)
received += len(data)
progress = (received / filesize) * 100
print(f"\rProgress: {progress:.1f}%", end='')
print(f"\nFile received successfully!")
client_socket.send(b"ACK")
except Exception as e:
print(f"Error: {e}")
finally:
client_socket.close()
def stop(self):
"""Stop server"""
self.running = False
if self.server_socket:
self.server_socket.close()
# 7. File Transfer Client
class FileTransferClient:
"""Client for file transfer"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.client_socket: Optional[socket.socket] = None
def connect(self):
"""Connect to server"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
print(f"Connected to {self.host}:{self.port}")
def send_file(self, filename: str):
"""Send file to server"""
import os
if not os.path.exists(filename):
print(f"File not found: {filename}")
return
filesize = os.path.getsize(filename)
# Send file info
file_info = f"{filename}|{filesize}"
self.client_socket.send(file_info.encode('utf-8'))
# Send file data
sent = 0
with open(filename, 'rb') as f:
while sent < filesize:
data = f.read(4096)
self.client_socket.send(data)
sent += len(data)
progress = (sent / filesize) * 100
print(f"\rProgress: {progress:.1f}%", end='')
print(f"\nFile sent successfully!")
# Wait for acknowledgment
ack = self.client_socket.recv(1024)
print(f"Server response: {ack.decode('utf-8')}")
def close(self):
"""Close connection"""
if self.client_socket:
self.client_socket.close()
# 8. UDP Socket (for comparison)
class UdpServer:
"""Basic UDP server"""
def __init__(self, host: str = 'localhost', port: int = 9999):
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
def start(self):
"""Start UDP server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((self.host, self.port))
self.running = True
print(f"UDP server listening on {self.host}:{self.port}")
try:
while self.running:
data, address = self.server_socket.recvfrom(1024)
message = data.decode('utf-8')
print(f"Received from {address}: {message}")
response = f"Echo: {message}"
self.server_socket.sendto(response.encode('utf-8'), address)
except KeyboardInterrupt:
print("\nUDP server shutting down...")
finally:
self.stop()
def stop(self):
"""Stop UDP server"""
self.running = False
if self.server_socket:
self.server_socket.close()
# Usage Examples
def demonstrate_tcp_socket():
print("=== Web Python TCP Socket Examples ===\n")
print("Note: Socket examples require running server and client in separate processes")
print("Choose one of the following examples:\n")
print("--- 1. Basic TCP Server ---")
print("# Server")
print("server = TcpServer('localhost', 9999)")
print("server.start()")
print("")
print("# Client")
print("client = TcpClient('localhost', 9999)")
print("client.connect()")
print("client.send('Hello, Server!')")
print("response = client.receive()")
print("print(response)")
print("client.close()\n")
print("--- 2. Chat Server ---")
print("# Server")
print("chat_server = ChatServer('localhost', 9999)")
print("chat_server.start()")
print("")
print("# Client")
print("chat_client = ChatClient('localhost', 9999, 'Alice')")
print("chat_client.connect()")
print("# Then send messages:")
print("chat_client.send_message('Hello everyone!')\n")
print("--- 3. Protocol Server (JSON-based) ---")
print("# Server")
print("protocol_server = ProtocolServer('localhost', 9999)")
print("protocol_server.start()")
print("")
print("# Client sends JSON messages:")
print('client.send(json.dumps({"type": "ping", "timestamp": 123456}))')
print('client.send(json.dumps({"type": "add", "a": 5, "b": 3}))')
print('client.send(json.dumps({"type": "echo", "data": "Hello"}))\n')
print("--- 4. File Transfer ---")
print("# Server")
print("file_server = FileTransferServer('localhost', 9999)")
print("file_server.start()")
print("")
print("# Client")
print("file_client = FileTransferClient('localhost', 9999)")
print("file_client.connect()")
print("file_client.send_file('example.txt')")
print("file_client.close()\n")
print("--- 5. UDP Server (for comparison) ---")
print("# Server")
print("udp_server = UdpServer('localhost', 9999)")
print("udp_server.start()")
print("")
print("# Client")
print("sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)")
print("sock.sendto(b'Hello UDP!', ('localhost', 9999))")
print("data, addr = sock.recvfrom(1024)")
print("print(data.decode())\n")
print("\n=== All TCP Socket Examples Listed ===")
# Export functions and classes
export { TcpServer, TcpClient, ChatServer, ChatClient, ProtocolServer, FileTransferServer, FileTransferClient, UdpServer }
export { demonstrate_tcp_socket }