🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples de Fonctionnalités Web Web Python
Exemples de fonctionnalités web Web Python incluant le routage, le middleware et le service de fichiers statiques
💻 Routage python
🟡 intermediate
⭐⭐⭐
Analyser les routes URL, gérer le routage de hachage et gérer l'historique du navigateur
⏱️ 25 min
🏷️ python, web, web features
Prerequisites:
Intermediate Python, History API, URL API, Pyodide
# Web Python Routing Examples
# Client-side routing with URL parsing, hash routing, and history management
import js
import re
from typing import Dict, Callable, Any, Optional, Tuple
from urllib.parse import parse_qs, urlparse
# 1. Route Handler
class Route:
def __init__(self, path: str, handler: Callable):
self.path = path
self.handler = handler
# 2. Router
class Router:
def __init__(self):
self.routes = []
self.not_found_handler = None
self.current_params = {}
self.current_query = {}
def add_route(self, path: str, handler: Callable):
"""
Add route
Args:
path: Route path (can include :params)
handler: Route handler function
"""
self.routes.append(Route(path, handler))
def set_not_found(self, handler: Callable):
"""
Set 404 handler
Args:
handler: Handler for not found routes
"""
self.not_found_handler = handler
def navigate(self, path: str, params: Dict[str, str] = None):
"""
Navigate to path
Args:
path: Path to navigate to
params: Route parameters to replace
"""
url = path
if params:
url = self._replace_params(path, params)
# Update browser URL
js.window.history.pushState(js.object(), '', url)
# Handle route
self._handle_route(url)
def replace(self, path: str, params: Dict[str, str] = None):
"""
Replace current route
Args:
path: Path to replace with
params: Route parameters to replace
"""
url = path
if params:
url = self._replace_params(path, params)
js.window.history.replaceState(js.object(), '', url)
self._handle_route(url)
def back(self):
"""Go back in history"""
js.window.history.back()
def forward(self):
"""Go forward in history"""
js.window.history.forward()
def go(self, delta: int):
"""
Go to specific history entry
Args:
delta: Number of steps to go
"""
js.window.history.go(delta)
def _handle_route(self, url: str):
"""Handle route change"""
path, query = self._parse_url(url)
# Find matching route
route = self._find_route(path)
if route:
params = self._extract_params(path, route.path)
self.current_params = params
self.current_query = query
route.handler(params, query)
elif self.not_found_handler:
self.not_found_handler()
def _find_route(self, path: str) -> Optional[Route]:
"""Find matching route"""
for route in self.routes:
if self._match_route(path, route.path):
return route
return None
def _match_route(self, path: str, pattern: str) -> bool:
"""Check if path matches route pattern"""
# Convert pattern to regex
regex_pattern = pattern.replace(':', '[^/]+').replace('*', '.*')
regex = re.compile(f'^{regex_pattern}$')
return bool(regex.match(path))
def _extract_params(self, path: str, pattern: str) -> Dict[str, str]:
"""Extract params from path"""
params = {}
pattern_parts = pattern.split('/')
path_parts = path.split('/')
for i, part in enumerate(pattern_parts):
if part.startswith(':'):
param_name = part[1:]
params[param_name] = path_parts[i] if i < len(path_parts) else ''
return params
def _replace_params(self, path: str, params: Dict[str, str]) -> str:
"""Replace params in path"""
result = path
for key, value in params.items():
result = result.replace(f':{key}', value)
return result
def _parse_url(self, url: str) -> Tuple[str, Dict[str, str]]:
"""Parse URL into path and query"""
if '?' in url:
path, query_string = url.split('?', 1)
query = dict(parse_qs(query_string))
# Flatten values (take first)
query = {k: v[0] if isinstance(v, list) else v for k, v in query.items()}
else:
path = url
query = {}
return path, query
def initialize(self):
"""Initialize router (set up event listeners)"""
# Note: In Pyodide, you'd need to set up JavaScript event handlers
pass
# 3. Hash Router
class HashRouter:
def __init__(self):
self.routes = []
self.not_found_handler = None
def add_route(self, path: str, handler: Callable):
"""Add route"""
# Remove leading slash for hash matching
hash_path = path[1:] if path.startswith('/') else path
self.routes.append(Route(hash_path, handler))
def set_not_found(self, handler: Callable):
"""Set 404 handler"""
self.not_found_handler = handler
def navigate(self, hash_path: str, params: Dict[str, str] = None):
"""Navigate to hash"""
url = hash_path
if params:
url = self._replace_params(hash_path, params)
js.window.location.hash = url
def get_hash(self) -> str:
"""Get current hash"""
hash_value = js.window.location.hash
return hash_value[1:] if hash_value else '/'
def _replace_params(self, path: str, params: Dict[str, str]) -> str:
"""Replace params in path"""
result = path
for key, value in params.items():
result = result.replace(f':{key}', value)
return result
def _handle_hash_change(self):
"""Handle hash change"""
hash_value = self.get_hash()
if '?' in hash_value:
path, query_string = hash_value.split('?', 1)
query = dict(parse_qs(query_string))
query = {k: v[0] if isinstance(v, list) else v for k, v in query.items()}
else:
path = hash_value
query = {}
# Find matching route
route = self._find_route(path)
if route:
params = self._extract_params(path, route.path)
route.handler(params, query)
elif self.not_found_handler:
self.not_found_handler()
def _find_route(self, path: str) -> Optional[Route]:
"""Find matching route"""
for route in self.routes:
if self._match_route(path, route.path):
return route
return None
def _match_route(self, path: str, pattern: str) -> bool:
"""Check if path matches route pattern"""
regex_pattern = pattern.replace(':', '[^/]+').replace('*', '.*')
regex = re.compile(f'^{regex_pattern}$')
return bool(regex.match(path))
def _extract_params(self, path: str, pattern: str) -> Dict[str, str]:
"""Extract params from path"""
params = {}
pattern_parts = pattern.split('/')
path_parts = path.split('/')
for i, part in enumerate(pattern_parts):
if part.startswith(':'):
param_name = part[1:]
params[param_name] = path_parts[i] if i < len(path_parts) else ''
return params
# 4. Query Params Manager
class QueryParamsManager:
def get(self, param: str) -> Optional[str]:
"""
Get query param
Args:
param: Parameter name
Returns:
Parameter value or None
"""
params = js.URLSearchParams.new(js.window.location.search)
return params.get(param)
def get_all(self) -> Dict[str, str]:
"""
Get all params
Returns:
Dictionary of all parameters
"""
params = js.URLSearchParams.new(js.window.location.search)
result = {}
for key in list(params.keys()):
result[key] = params.get(key)
return result
def set(self, param: str, value: str):
"""
Set query param
Args:
param: Parameter name
value: Parameter value
"""
params = js.URLSearchParams.new(js.window.location.search)
params.set(param, value)
self._update(str(params))
def delete(self, param: str):
"""
Delete query param
Args:
param: Parameter name
"""
params = js.URLSearchParams.new(js.window.location.search)
params.delete(param)
self._update(str(params))
def clear(self):
"""Clear all params"""
self._update('')
def _update(self, query_string: str):
"""Update URL"""
url = js.URL.new(js.window.location.href)
url.search = query_string
js.window.history.replaceState(js.object(), '', str(url))
# 5. Route Guard
class RouteGuard:
def __init__(self):
self.guards = {}
def add_guard(self, route: str, guard: Callable[[], bool]):
"""
Add guard for route
Args:
route: Route path
guard: Guard function
"""
self.guards[route] = guard
async def can_navigate(self, route: str) -> bool:
"""
Check if route is allowed
Args:
route: Route path
Returns:
True if navigation is allowed
"""
guard = self.guards.get(route)
if guard:
return guard()
return True
def add_auth_guard(self, route: str, is_authenticated: Callable[[], bool]):
"""Add authentication guard"""
self.add_guard(route, is_authenticated)
def add_permission_guard(self, route: str, has_permission: Callable[[], bool]):
"""Add permission guard"""
self.add_guard(route, has_permission)
# 6. Navigation Manager
class NavigationManager:
def __init__(self, router: Router):
self.router = router
self.history = []
self.current_index = -1
def navigate(self, path: str, params: Dict[str, str] = None):
"""Navigate with history tracking"""
# Remove forward history
if self.current_index < len(self.history) - 1:
self.history = self.history[:self.current_index + 1]
# Add to history
self.history.append(path)
self.current_index += 1
# Navigate
self.router.navigate(path, params)
def back(self):
"""Go back in custom history"""
if self.current_index > 0:
self.current_index -= 1
js.window.history.back()
def forward(self):
"""Go forward in custom history"""
if self.current_index < len(self.history) - 1:
self.current_index += 1
js.window.history.forward()
def can_go_back(self) -> bool:
"""Check if can go back"""
return self.current_index > 0
def can_go_forward(self) -> bool:
"""Check if can go forward"""
return self.current_index < len(self.history) - 1
def get_current_position(self) -> int:
"""Get current position in history"""
return self.current_index
def get_history_length(self) -> int:
"""Get history length"""
return len(self.history)
# Usage Examples
async def demonstrate_routing():
print("=== Web Python Routing Examples ===\n")
# 1. Basic router
print("--- 1. Basic Router ---")
router = Router()
def home_handler(params, query):
print("Home page")
def about_handler(params, query):
print("About page")
def user_handler(params, query):
print(f"User page: {params.get('id', 'N/A')}")
def comment_handler(params, query):
print(f"Comment: {params.get('commentId', 'N/A')} on post {params.get('postId', 'N/A')}")
router.add_route('/', home_handler)
router.add_route('/about', about_handler)
router.add_route('/users/:id', user_handler)
router.add_route('/posts/:postId/comments/:commentId', comment_handler)
# 2. Navigation
print("\n--- 2. Navigation ---")
router.navigate('/')
router.navigate('/about')
router.navigate('/users/123')
# 3. Query params
print("\n--- 3. Query Params ---")
router.navigate('/search?query=python&page=1')
# 4. Hash router
print("\n--- 4. Hash Router ---")
hash_router = HashRouter()
def hash_home_handler(params, query):
print("Hash home")
def hash_profile_handler(params, query):
print(f"Hash profile: {params.get('userId', 'N/A')}")
hash_router.add_route('/', hash_home_handler)
hash_router.add_route('profile/:userId', hash_profile_handler)
hash_router.navigate('profile/456')
# 5. Query params manager
print("\n--- 5. Query Params Manager ---")
query_manager = QueryParamsManager()
query_manager.set('tab', 'profile')
query_manager.set('section', 'details')
all_params = query_manager.get_all()
print(f"All params: {all_params}")
# 6. Route guard
print("\n--- 6. Route Guard ---")
guard = RouteGuard()
def is_authenticated():
authenticated = False # Simulate
print(f"Auth check: {authenticated}")
return authenticated
guard.add_auth_guard('/admin', is_authenticated)
can_access = guard.can_navigate('/admin')
print(f"Can access /admin: {can_access}")
# 7. Navigation manager
print("\n--- 7. Navigation Manager ---")
nav_manager = NavigationManager(router)
nav_manager.navigate('/page1')
nav_manager.navigate('/page2')
print(f"Can go back: {nav_manager.can_go_back()}")
print(f"History length: {nav_manager.get_history_length()}")
print("\n=== All Routing Examples Completed ===")
# Export functions
export = {
'Router': Router,
'HashRouter': HashRouter,
'QueryParamsManager': QueryParamsManager,
'RouteGuard': RouteGuard,
'NavigationManager': NavigationManager,
'demonstrate_routing': demonstrate_routing
}
💻 Middleware python
🟡 intermediate
⭐⭐⭐⭐
Implémenter une chaîne de middleware requête/réponse pour traiter et modifier des données
⏱️ 30 min
🏷️ python, web, web features
Prerequisites:
Intermediate Python, Async/await, Pyodide
# Web Python Middleware Examples
# Request/response processing middleware chain
import asyncio
from typing import Dict, Any, Callable, Optional, List
# 1. Context
class Context:
def __init__(self):
self.request = None
self.response = None
self.state = {}
self.headers = {}
self.metadata = {}
# 2. Middleware Function Type
Middleware = Callable[[Context, Callable], None]
# 3. Middleware Pipeline
class MiddlewarePipeline:
def __init__(self):
self.middlewares = []
def use(self, middleware: Middleware) -> 'MiddlewarePipeline':
"""
Add middleware
Args:
middleware: Middleware function
Returns:
Self for chaining
"""
self.middlewares.append(middleware)
return self
async def execute(self, context: Context):
"""
Execute pipeline
Args:
context: Request context
"""
index = [0] # Use list to allow modification in nested function
async def next():
if index[0] < len(self.middlewares):
middleware = self.middlewares[index[0]]
index[0] += 1
await middleware(context, next)
await next()
def clear(self):
"""Clear all middlewares"""
self.middlewares = []
def count(self) -> int:
"""Get middleware count"""
return len(self.middlewares)
# 4. Common Middlewares
class CommonMiddlewares:
@staticmethod
def logging() -> Middleware:
"""Logging middleware"""
async def logging_middleware(context: Context, next: Callable):
start = js.performance.now() if hasattr(js, 'performance') else 0
print('[Request]', {
'url': getattr(context.request, 'url', 'N/A') if context.request else 'N/A',
'method': getattr(context.request, 'method', 'N/A') if context.request else 'N/A',
'headers': context.headers
})
await next()
duration = (js.performance.now() - start) if hasattr(js, 'performance') else 0
print('[Response]', {
'status': getattr(context.response, 'status', 'N/A') if context.response else 'N/A',
'duration': f'{duration:.2f}ms'
})
return logging_middleware
@staticmethod
def timing() -> Middleware:
"""Timing middleware"""
async def timing_middleware(context: Context, next: Callable):
start = js.performance.now() if hasattr(js, 'performance') else 0
await next()
duration = (js.performance.now() - start) if hasattr(js, 'performance') else 0
context.metadata['duration'] = duration
return timing_middleware
@staticmethod
def error_handler(error_handler_func: Callable) -> Middleware:
"""Error handling middleware"""
async def error_middleware(context: Context, next: Callable):
try:
await next()
except Exception as error:
error_handler_func(error)
return error_middleware
@staticmethod
def headers(add_headers: Dict[str, str]) -> Middleware:
"""Header manipulation middleware"""
async def headers_middleware(context: Context, next: Callable):
for key, value in add_headers.items():
context.headers[key] = value
await next()
return headers_middleware
@staticmethod
def authentication(auth_check: Callable) -> Middleware:
"""Authentication middleware"""
async def auth_middleware(context: Context, next: Callable):
is_authenticated = auth_check(context)
if not is_authenticated:
raise Exception('Unauthorized')
context.metadata['authenticated'] = True
await next()
return auth_middleware
@staticmethod
def rate_limit(max_requests: int, window_ms: int) -> Middleware:
"""Rate limiting middleware"""
requests = {}
async def rate_limit_middleware(context: Context, next: Callable):
key = getattr(context.request, 'ip', 'unknown') if context.request else 'unknown'
now = js.Date.now() if hasattr(js, 'Date') else 0
if key not in requests:
requests[key] = []
user_requests = requests[key]
# Remove old requests
window_start = now - window_ms
valid_requests = [r for r in user_requests if r > window_start]
# Check limit
if len(valid_requests) >= max_requests:
raise Exception('Rate limit exceeded')
# Add current request
valid_requests.append(now)
requests[key] = valid_requests
context.metadata['rate_limit'] = {
'remaining': max_requests - len(valid_requests),
'reset': now + window_ms
}
await next()
return rate_limit_middleware
@staticmethod
def cors(options: Dict[str, Any] = None) -> Middleware:
"""CORS middleware"""
if options is None:
options = {}
async def cors_middleware(context: Context, next: Callable):
origin = getattr(context.request, 'headers', {}).get('Origin', '*') if context.request else '*'
# Set CORS headers
allowed_origins = options.get('origin', '*')
if allowed_origins == '*' or not allowed_origins:
context.headers['Access-Control-Allow-Origin'] = '*'
elif isinstance(allowed_origins, list) and origin in allowed_origins:
context.headers['Access-Control-Allow-Origin'] = origin
if options.get('methods'):
context.headers['Access-Control-Allow-Methods'] = ', '.join(options['methods'])
if options.get('headers'):
context.headers['Access-Control-Allow-Headers'] = ', '.join(options['headers'])
if options.get('credentials'):
context.headers['Access-Control-Allow-Credentials'] = 'true'
await next()
return cors_middleware
@staticmethod
def compression() -> Middleware:
"""Compression middleware (simulated)"""
async def compression_middleware(context: Context, next: Callable):
await next()
# Check if response should be compressed
accept_encoding = getattr(context.request, 'headers', {}).get('Accept-Encoding', '') if context.request else ''
if 'gzip' in accept_encoding or 'br' in accept_encoding:
context.headers['Content-Encoding'] = 'gzip'
context.metadata['compressed'] = True
return compression_middleware
@staticmethod
def validation(validator: Callable) -> Middleware:
"""Validation middleware"""
async def validation_middleware(context: Context, next: Callable):
is_valid = validator(context)
if not is_valid:
raise Exception('Validation failed')
await next()
return validation_middleware
@staticmethod
def cache(cache_key_func: Callable, ttl: int = 60000) -> Middleware:
"""Cache middleware"""
cache = {}
async def cache_middleware(context: Context, next: Callable):
key = cache_key_func(context)
now = js.Date.now() if hasattr(js, 'Date') else 0
# Check cache
if key in cache:
cached = cache[key]
if cached['expiry'] > now:
context.response = cached['data']
context.metadata['cached'] = True
return
await next()
# Store in cache
if context.response:
cache[key] = {
'data': context.response,
'expiry': now + ttl
}
return cache_middleware
# 5. Middleware Composer
class MiddlewareComposer:
def __init__(self):
self.pipeline = MiddlewarePipeline()
def use(self, middleware: Middleware) -> 'MiddlewareComposer':
"""Add middleware"""
self.pipeline.use(middleware)
return self
def use_all(self, middlewares: List[Middleware]) -> 'MiddlewareComposer':
"""Add multiple middlewares"""
for mw in middlewares:
self.pipeline.use(mw)
return self
async def execute(self, context: Context):
"""Execute with context"""
await self.pipeline.execute(context)
@staticmethod
def from_list(middlewares: List[Middleware]) -> 'MiddlewareComposer':
"""Create from list"""
composer = MiddlewareComposer()
return composer.use_all(middlewares)
# 6. Mock Request/Response
class MockRequest:
def __init__(self, url: str, method: str = 'GET', headers: Dict = None, body: Any = None, ip: str = '127.0.0.1'):
self.url = url
self.method = method
self.headers = headers or {}
self.body = body
self.ip = ip
class MockResponse:
def __init__(self):
self.status = 200
self.headers = {}
self.body = None
def set_status(self, code: int) -> 'MockResponse':
self.status = code
return self
def set_header(self, key: str, value: str) -> 'MockResponse':
self.headers[key] = value
return self
def send(self, data: Any) -> 'MockResponse':
self.body = data
return self
# 7. Application Builder
class ApplicationBuilder:
def __init__(self):
self.composer = MiddlewareComposer()
def use(self, middleware: Middleware) -> 'ApplicationBuilder':
"""Add middleware"""
self.composer.use(middleware)
return self
async def handle(self, request: MockRequest) -> MockResponse:
"""Handle request"""
context = Context()
context.request = request
context.response = MockResponse()
await self.composer.execute(context)
return context.response
# Usage Examples
async def demonstrate_middleware():
print("=== Web Python Middleware Examples ===\n")
# 1. Basic middleware
print("--- 1. Basic Middleware ---")
pipeline = MiddlewarePipeline()
async def middleware1(context, next):
print("Middleware 1: Before")
await next()
print("Middleware 1: After")
async def middleware2(context, next):
print("Middleware 2: Before")
await next()
print("Middleware 2: After")
pipeline.use(middleware1)
pipeline.use(middleware2)
context1 = Context()
context1.request = MockRequest('/test')
await pipeline.execute(context1)
# 2. Common middlewares
print("\n--- 2. Common Middlewares ---")
app = ApplicationBuilder()
app.use(CommonMiddlewares.logging())
app.use(CommonMiddlewares.timing())
app.use(CommonMiddlewares.headers({
'X-Powered-By': 'Python-Middleware',
'X-Response-Time': '0ms'
}))
request = MockRequest('/api/users', 'GET')
request.headers = {'Accept': 'application/json'}
response = await app.handle(request)
print(f"Response status: {response.status}")
# 3. Error handling
print("\n--- 3. Error Handling ---")
def error_handler(error):
print(f"Error caught: {str(error)}")
error_app = ApplicationBuilder()
error_app.use(CommonMiddlewares.error_handler(error_handler))
async def error_middleware(context, next):
if context.request and context.request.url == '/error':
raise Exception('Intentional error')
await next()
error_app.use(error_middleware)
await error_app.handle(MockRequest('/error'))
# 4. Authentication
print("\n--- 4. Authentication ---")
def auth_check(context):
token = context.request.headers.get('Authorization', '') if context.request else ''
return token == 'Bearer valid-token'
auth_app = ApplicationBuilder()
auth_app.use(CommonMiddlewares.authentication(auth_check))
async def auth_handler(context, next):
print("Authenticated successfully")
context.response = MockResponse().set_status(200).send({'message': 'Success'})
await next()
auth_app.use(auth_handler)
auth_request = MockRequest('/protected', 'GET')
auth_request.headers = {'Authorization': 'Bearer valid-token'}
auth_response = await auth_app.handle(auth_request)
print(f"Auth response status: {auth_response.status}")
# 5. Rate limiting
print("\n--- 5. Rate Limiting ---")
rate_limit_app = ApplicationBuilder()
rate_limit_app.use(CommonMiddlewares.rate_limit(max_requests=3, window_ms=10000))
async def response_handler(context, next):
context.response = MockResponse().set_status(200).send({'message': 'OK'})
await next()
rate_limit_app.use(response_handler)
# Send multiple requests
rl_request = MockRequest('/api/test')
for i in range(4):
try:
rl_response = await rate_limit_app.handle(rl_request)
print(f"Request {i + 1}: {rl_response.status}")
except Exception as e:
print(f"Request {i + 1}: Rate limited")
print("\n=== All Middleware Examples Completed ===")
# Export functions
export = {
'MiddlewarePipeline': MiddlewarePipeline,
'CommonMiddlewares': CommonMiddlewares,
'MiddlewareComposer': MiddlewareComposer,
'ApplicationBuilder': ApplicationBuilder,
'MockRequest': MockRequest,
'MockResponse': MockResponse,
'Context': Context,
'demonstrate_middleware': demonstrate_middleware
}
💻 Fichiers Statiques python
🟡 intermediate
⭐⭐⭐
Servir des fichiers statiques avec mise en cache, compression et détection de type MIME
⏱️ 25 min
🏷️ python, web, web features
Prerequisites:
Intermediate Python, Fetch API, Pyodide
# Web Python Static File Serving Examples
# Serving static files with various strategies
import js
from typing import Dict, Any, Optional, List, Set
import hashlib
# 1. MIME Type Detector
class MIMETypeDetector:
MIME_TYPES = {
'.html': 'text/html',
'.htm': 'text/html',
'.css': 'text/css',
'.js': 'text/javascript',
'.json': 'application/json',
'.xml': 'application/xml',
'.txt': 'text/plain',
'.pdf': 'application/pdf',
'.zip': 'application/zip',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.woff': 'font/woff',
'.woff2': 'font/woff2',
'.ttf': 'font/ttf',
'.eot': 'application/vnd.ms-fontobject',
'.mp3': 'audio/mpeg',
'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.ogg': 'video/ogg'
}
@classmethod
def get_by_extension(cls, filename: str) -> str:
"""
Get MIME type by extension
Args:
filename: File name
Returns:
MIME type string
"""
ext = filename[filename.rfind('.'):].lower() if '.' in filename else ''
if ext in cls.MIME_TYPES:
return cls.MIME_TYPES[ext]
return 'application/octet-stream'
@classmethod
async def get_by_signature(cls, file: Any) -> str:
"""
Get MIME type by file signature (magic bytes)
Args:
file: File blob
Returns:
MIME type string
"""
try:
header = await cls._read_file_header(file, 8)
# Check common signatures (simplified)
# PNG: 89 50 4E 47
if header[0] == 0x89 and header[1] == 0x50 and header[2] == 0x4E and header[3] == 0x47:
return 'image/png'
# JPEG: FF D8 FF
if header[0] == 0xFF and header[1] == 0xD8 and header[2] == 0xFF:
return 'image/jpeg'
# GIF: 47 49 46 38
if header[0] == 0x47 and header[1] == 0x49 and header[2] == 0x46 and header[3] == 0x38:
return 'image/gif'
return 'application/octet-stream'
except:
return 'application/octet-stream'
@staticmethod
async def _read_file_header(file: Any, bytes_count: int = 8) -> List[int]:
"""Read file header"""
# Simplified - in real Pyodide you'd read from Blob
return [0] * bytes_count
# 2. Static File Server
class StaticFileServer:
def __init__(self, base_url: str = '/static'):
self.base_url = base_url
self.cache = {}
self.cache_timeout = 60000 # 1 minute
async def serve(self, file_path: str) -> Dict[str, Any]:
"""
Serve file
Args:
file_path: Path to file
Returns:
dict with data, mime, and status
"""
# Check cache
if file_path in self.cache:
cached = self.cache[file_path]
if js.Date.now() - cached['timestamp'] < self.cache_timeout:
return {
'data': cached['data'],
'mime': cached['mime'],
'status': 200
}
# Fetch file
url = f"{self.base_url}{file_path}"
try:
response = await js.fetch(url)
if not response.ok:
return {
'data': None,
'mime': 'text/plain',
'status': response.status
}
mime = response.headers.get('Content-Type') or MIMETypeDetector.get_by_extension(file_path)
# Get data based on content type
content_type = mime.split(';')[0] if ';' in mime else mime
if 'application/json' in content_type:
data = await response.json()
elif 'text/' in content_type:
data = await response.text()
else:
data = await response.blob()
# Cache response
self.cache[file_path] = {
'data': data,
'mime': mime,
'timestamp': js.Date.now()
}
return {'data': data, 'mime': mime, 'status': 200}
except Exception as error:
return {
'data': None,
'mime': 'text/plain',
'status': 500
}
async def serve_with_fallback(self, file_path: str, fallback_path: str) -> Dict[str, Any]:
"""Serve file with fallback"""
result = await self.serve(file_path)
if result['status'] == 404:
return await self.serve(fallback_path)
return result
def clear_cache(self):
"""Clear cache"""
self.cache.clear()
def clear_cache_entry(self, file_path: str):
"""Clear specific cache entry"""
if file_path in self.cache:
del self.cache[file_path]
def get_cache_size(self) -> int:
"""Get cache size"""
return len(self.cache)
# 3. File Cache Manager
class FileCacheManager:
def __init__(self):
self.cache = {}
def set(self, file_path: str, content: Any, mime_type: str, etag: str = None, last_modified: str = None):
"""Add to cache"""
self.cache[file_path] = {
'content': content,
'mime_type': mime_type,
'etag': etag or self._generate_etag(content),
'last_modified': last_modified or js.Date.new().toUTCString()
}
def get(self, file_path: str) -> Optional[Dict[str, Any]]:
"""Get from cache"""
return self.cache.get(file_path)
def has(self, file_path: str) -> bool:
"""Check if file is cached"""
return file_path in self.cache
def remove(self, file_path: str):
"""Remove from cache"""
if file_path in self.cache:
del self.cache[file_path]
def clear(self):
"""Clear all cache"""
self.cache.clear()
def get_stats(self) -> Dict[str, Any]:
"""Get cache stats"""
return {
'size': len(self.cache),
'entries': len(self.cache),
'keys': list(self.cache.keys())
}
def _generate_etag(self, content: Any) -> str:
"""Generate ETag"""
content_str = str(content) if not isinstance(content, str) else content
hash_value = hashlib.md5(content_str.encode()).hexdigest()
return f'"{hash_value}"'
# 4. Asset Preloader
class AssetPreloader:
def __init__(self):
self.loaded_assets: Set[str] = set()
self.failed_assets: Set[str] = set()
async def preload(self, url: str) -> Dict[str, Any]:
"""
Preload single asset
Args:
url: Asset URL
Returns:
dict with success status
"""
if url in self.loaded_assets:
return {'success': True}
if url in self.failed_assets:
return {'success': False, 'error': Exception('Previously failed to load')}
try:
response = await js.fetch(url)
if not response.ok:
raise Exception(f'HTTP {response.status}')
self.loaded_assets.add(url)
return {'success': True}
except Exception as error:
self.failed_assets.add(url)
return {'success': False, 'error': error}
async def preload_multiple(self, urls: List[str]) -> Dict[str, Any]:
"""
Preload multiple assets
Args:
urls: List of URLs
Returns:
dict with successful and failed lists
"""
successful = []
failed = []
for url in urls:
result = await self.preload(url)
if result['success']:
successful.append(url)
else:
failed.append({'url': url, 'error': result.get('error')})
return {'successful': successful, 'failed': failed}
def get_loaded_assets(self) -> List[str]:
"""Get loaded assets"""
return list(self.loaded_assets)
def get_failed_assets(self) -> List[str]:
"""Get failed assets"""
return list(self.failed_assets)
def clear(self):
"""Clear tracking"""
self.loaded_assets.clear()
self.failed_assets.clear()
# 5. CDN Manager
class CDNManager:
def __init__(self, cdn_urls: List[str]):
self.cdn_urls = cdn_urls
self.current_cdn_index = 0
def get_cdn_url(self, asset_path: str) -> str:
"""Get CDN URL for asset"""
cdn_url = self.cdn_urls[self.current_cdn_index]
return f"{cdn_url}{asset_path}"
def rotate_cdn(self):
"""Rotate CDN"""
self.current_cdn_index = (self.current_cdn_index + 1) % len(self.cdn_urls)
def get_all_cdn_urls(self, asset_path: str) -> List[str]:
"""Get all CDN URLs for asset"""
return [f"{cdn}{asset_path}" for cdn in self.cdn_urls]
async def try_all_cdns(self, asset_path: str) -> Dict[str, Any]:
"""Try each CDN until one succeeds"""
urls = self.get_all_cdn_urls(asset_path)
for url in urls:
try:
response = await js.fetch(url)
if response.ok:
data = await response.blob()
return {'success': True, 'cdn_url': url, 'data': data}
except Exception as error:
print(f"CDN failed: {url}, {error}")
return {'success': False}
# 6. Asset Bundle Manager
class AssetBundleManager:
def __init__(self):
self.bundles = {}
def register_bundle(self, name: str, assets: List[str]):
"""Register bundle"""
self.bundles[name] = assets
def get_bundle(self, name: str) -> Optional[List[str]]:
"""Get bundle assets"""
return self.bundles.get(name)
async def preload_bundle(self, name: str, preloader: AssetPreloader) -> Dict[str, Any]:
"""Preload bundle"""
assets = self.get_bundle(name)
if not assets:
return {'successful': [], 'failed': []}
return await preloader.preload_multiple(assets)
# Usage Examples
async def demonstrate_static_files():
print("=== Web Python Static File Serving Examples ===\n")
# 1. MIME type detection
print("--- 1. MIME Type Detection ---")
print(f"HTML: {MIMETypeDetector.get_by_extension('index.html')}")
print(f"JavaScript: {MIMETypeDetector.get_by_extension('app.js')}")
print(f"PNG: {MIMETypeDetector.get_by_extension('image.png')}")
# 2. Static file server
print("\n--- 2. Static File Server ---")
server = StaticFileServer('/assets')
print(f"Would serve: {server.base_url}/styles/main.css")
print(f"Would serve: {server.base_url}/js/app.js")
# 3. File cache manager
print("\n--- 3. File Cache Manager ---")
cache_manager = FileCacheManager()
cache_manager.set('/index.html', '<html>...</html>', 'text/html', '"abc123"')
cache_manager.set('/app.js', 'console.log("Hello");', 'text/javascript')
print(f"Cache stats: {cache_manager.get_stats()}")
cached = cache_manager.get('/index.html')
print(f"Cached file: {cached['mime_type'] if cached else 'N/A'}")
# 4. Asset preloader
print("\n--- 4. Asset Preloader ---")
preloader = AssetPreloader()
# Note: These would be real URLs in production
preload_results = await preloader.preload_multiple([
'/assets/image1.png',
'/assets/image2.png',
'/assets/image3.png'
])
print(f"Preloaded: {preload_results['successful']}")
print(f"Failed: {preload_results['failed']}")
# 5. CDN manager
print("\n--- 5. CDN Manager ---")
cdn_manager = CDNManager([
'https://cdn1.example.com',
'https://cdn2.example.com',
'https://cdn3.example.com'
])
print(f"CDN URL: {cdn_manager.get_cdn_url('/assets/app.js')}")
cdn_manager.rotate_cdn()
print(f"Rotated CDN URL: {cdn_manager.get_cdn_url('/assets/app.js')}")
# 6. Asset bundle manager
print("\n--- 6. Asset Bundle Manager ---")
bundle_manager = AssetBundleManager()
bundle_manager.register_bundle('vendor', [
'/assets/js/vendor/react.js',
'/assets/js/vendor/lodash.js'
])
bundle_manager.register_bundle('app', [
'/assets/js/app.js',
'/assets/js/utils.js'
])
bundle = bundle_manager.get_bundle('vendor')
print(f"Vendor bundle: {bundle}")
print("\n=== All Static File Serving Examples Completed ===")
# Export functions
export = {
'MIMETypeDetector': MIMETypeDetector,
'StaticFileServer': StaticFileServer,
'FileCacheManager': FileCacheManager,
'AssetPreloader': AssetPreloader,
'CDNManager': CDNManager,
'AssetBundleManager': AssetBundleManager,
'demonstrate_static_files': demonstrate_static_files
}