🎯 Рекомендуемые коллекции
Балансированные коллекции примеров кода из различных категорий, которые вы можете исследовать
Примеры Web Функций Web Python
Примеры web функций Web Python включая маршрутизацию, middleware и обслуживание статических файлов
💻 Маршрутизация python
🟡 intermediate
⭐⭐⭐
Разбор URL маршрутов, обработка hash-маршрутизации и управление историей браузера
⏱️ 25 min
🏷️ python, web, web features
Prerequisites:
Intermediate Python, History API, URL API, Pyodide
# Web Python Routing Examples
# Client-side routing with URL parsing, hash routing, and history management
import js
import re
from typing import Dict, Callable, Any, Optional, Tuple
from urllib.parse import parse_qs, urlparse
# 1. Route Handler
class Route:
def __init__(self, path: str, handler: Callable):
self.path = path
self.handler = handler
# 2. Router
class Router:
def __init__(self):
self.routes = []
self.not_found_handler = None
self.current_params = {}
self.current_query = {}
def add_route(self, path: str, handler: Callable):
"""
Add route
Args:
path: Route path (can include :params)
handler: Route handler function
"""
self.routes.append(Route(path, handler))
def set_not_found(self, handler: Callable):
"""
Set 404 handler
Args:
handler: Handler for not found routes
"""
self.not_found_handler = handler
def navigate(self, path: str, params: Dict[str, str] = None):
"""
Navigate to path
Args:
path: Path to navigate to
params: Route parameters to replace
"""
url = path
if params:
url = self._replace_params(path, params)
# Update browser URL
js.window.history.pushState(js.object(), '', url)
# Handle route
self._handle_route(url)
def replace(self, path: str, params: Dict[str, str] = None):
"""
Replace current route
Args:
path: Path to replace with
params: Route parameters to replace
"""
url = path
if params:
url = self._replace_params(path, params)
js.window.history.replaceState(js.object(), '', url)
self._handle_route(url)
def back(self):
"""Go back in history"""
js.window.history.back()
def forward(self):
"""Go forward in history"""
js.window.history.forward()
def go(self, delta: int):
"""
Go to specific history entry
Args:
delta: Number of steps to go
"""
js.window.history.go(delta)
def _handle_route(self, url: str):
"""Handle route change"""
path, query = self._parse_url(url)
# Find matching route
route = self._find_route(path)
if route:
params = self._extract_params(path, route.path)
self.current_params = params
self.current_query = query
route.handler(params, query)
elif self.not_found_handler:
self.not_found_handler()
def _find_route(self, path: str) -> Optional[Route]:
"""Find matching route"""
for route in self.routes:
if self._match_route(path, route.path):
return route
return None
def _match_route(self, path: str, pattern: str) -> bool:
"""Check if path matches route pattern"""
# Convert pattern to regex
regex_pattern = pattern.replace(':', '[^/]+').replace('*', '.*')
regex = re.compile(f'^{regex_pattern}$')
return bool(regex.match(path))
def _extract_params(self, path: str, pattern: str) -> Dict[str, str]:
"""Extract params from path"""
params = {}
pattern_parts = pattern.split('/')
path_parts = path.split('/')
for i, part in enumerate(pattern_parts):
if part.startswith(':'):
param_name = part[1:]
params[param_name] = path_parts[i] if i < len(path_parts) else ''
return params
def _replace_params(self, path: str, params: Dict[str, str]) -> str:
"""Replace params in path"""
result = path
for key, value in params.items():
result = result.replace(f':{key}', value)
return result
def _parse_url(self, url: str) -> Tuple[str, Dict[str, str]]:
"""Parse URL into path and query"""
if '?' in url:
path, query_string = url.split('?', 1)
query = dict(parse_qs(query_string))
# Flatten values (take first)
query = {k: v[0] if isinstance(v, list) else v for k, v in query.items()}
else:
path = url
query = {}
return path, query
def initialize(self):
"""Initialize router (set up event listeners)"""
# Note: In Pyodide, you'd need to set up JavaScript event handlers
pass
# 3. Hash Router
class HashRouter:
def __init__(self):
self.routes = []
self.not_found_handler = None
def add_route(self, path: str, handler: Callable):
"""Add route"""
# Remove leading slash for hash matching
hash_path = path[1:] if path.startswith('/') else path
self.routes.append(Route(hash_path, handler))
def set_not_found(self, handler: Callable):
"""Set 404 handler"""
self.not_found_handler = handler
def navigate(self, hash_path: str, params: Dict[str, str] = None):
"""Navigate to hash"""
url = hash_path
if params:
url = self._replace_params(hash_path, params)
js.window.location.hash = url
def get_hash(self) -> str:
"""Get current hash"""
hash_value = js.window.location.hash
return hash_value[1:] if hash_value else '/'
def _replace_params(self, path: str, params: Dict[str, str]) -> str:
"""Replace params in path"""
result = path
for key, value in params.items():
result = result.replace(f':{key}', value)
return result
def _handle_hash_change(self):
"""Handle hash change"""
hash_value = self.get_hash()
if '?' in hash_value:
path, query_string = hash_value.split('?', 1)
query = dict(parse_qs(query_string))
query = {k: v[0] if isinstance(v, list) else v for k, v in query.items()}
else:
path = hash_value
query = {}
# Find matching route
route = self._find_route(path)
if route:
params = self._extract_params(path, route.path)
route.handler(params, query)
elif self.not_found_handler:
self.not_found_handler()
def _find_route(self, path: str) -> Optional[Route]:
"""Find matching route"""
for route in self.routes:
if self._match_route(path, route.path):
return route
return None
def _match_route(self, path: str, pattern: str) -> bool:
"""Check if path matches route pattern"""
regex_pattern = pattern.replace(':', '[^/]+').replace('*', '.*')
regex = re.compile(f'^{regex_pattern}$')
return bool(regex.match(path))
def _extract_params(self, path: str, pattern: str) -> Dict[str, str]:
"""Extract params from path"""
params = {}
pattern_parts = pattern.split('/')
path_parts = path.split('/')
for i, part in enumerate(pattern_parts):
if part.startswith(':'):
param_name = part[1:]
params[param_name] = path_parts[i] if i < len(path_parts) else ''
return params
# 4. Query Params Manager
class QueryParamsManager:
def get(self, param: str) -> Optional[str]:
"""
Get query param
Args:
param: Parameter name
Returns:
Parameter value or None
"""
params = js.URLSearchParams.new(js.window.location.search)
return params.get(param)
def get_all(self) -> Dict[str, str]:
"""
Get all params
Returns:
Dictionary of all parameters
"""
params = js.URLSearchParams.new(js.window.location.search)
result = {}
for key in list(params.keys()):
result[key] = params.get(key)
return result
def set(self, param: str, value: str):
"""
Set query param
Args:
param: Parameter name
value: Parameter value
"""
params = js.URLSearchParams.new(js.window.location.search)
params.set(param, value)
self._update(str(params))
def delete(self, param: str):
"""
Delete query param
Args:
param: Parameter name
"""
params = js.URLSearchParams.new(js.window.location.search)
params.delete(param)
self._update(str(params))
def clear(self):
"""Clear all params"""
self._update('')
def _update(self, query_string: str):
"""Update URL"""
url = js.URL.new(js.window.location.href)
url.search = query_string
js.window.history.replaceState(js.object(), '', str(url))
# 5. Route Guard
class RouteGuard:
def __init__(self):
self.guards = {}
def add_guard(self, route: str, guard: Callable[[], bool]):
"""
Add guard for route
Args:
route: Route path
guard: Guard function
"""
self.guards[route] = guard
async def can_navigate(self, route: str) -> bool:
"""
Check if route is allowed
Args:
route: Route path
Returns:
True if navigation is allowed
"""
guard = self.guards.get(route)
if guard:
return guard()
return True
def add_auth_guard(self, route: str, is_authenticated: Callable[[], bool]):
"""Add authentication guard"""
self.add_guard(route, is_authenticated)
def add_permission_guard(self, route: str, has_permission: Callable[[], bool]):
"""Add permission guard"""
self.add_guard(route, has_permission)
# 6. Navigation Manager
class NavigationManager:
def __init__(self, router: Router):
self.router = router
self.history = []
self.current_index = -1
def navigate(self, path: str, params: Dict[str, str] = None):
"""Navigate with history tracking"""
# Remove forward history
if self.current_index < len(self.history) - 1:
self.history = self.history[:self.current_index + 1]
# Add to history
self.history.append(path)
self.current_index += 1
# Navigate
self.router.navigate(path, params)
def back(self):
"""Go back in custom history"""
if self.current_index > 0:
self.current_index -= 1
js.window.history.back()
def forward(self):
"""Go forward in custom history"""
if self.current_index < len(self.history) - 1:
self.current_index += 1
js.window.history.forward()
def can_go_back(self) -> bool:
"""Check if can go back"""
return self.current_index > 0
def can_go_forward(self) -> bool:
"""Check if can go forward"""
return self.current_index < len(self.history) - 1
def get_current_position(self) -> int:
"""Get current position in history"""
return self.current_index
def get_history_length(self) -> int:
"""Get history length"""
return len(self.history)
# Usage Examples
async def demonstrate_routing():
print("=== Web Python Routing Examples ===\n")
# 1. Basic router
print("--- 1. Basic Router ---")
router = Router()
def home_handler(params, query):
print("Home page")
def about_handler(params, query):
print("About page")
def user_handler(params, query):
print(f"User page: {params.get('id', 'N/A')}")
def comment_handler(params, query):
print(f"Comment: {params.get('commentId', 'N/A')} on post {params.get('postId', 'N/A')}")
router.add_route('/', home_handler)
router.add_route('/about', about_handler)
router.add_route('/users/:id', user_handler)
router.add_route('/posts/:postId/comments/:commentId', comment_handler)
# 2. Navigation
print("\n--- 2. Navigation ---")
router.navigate('/')
router.navigate('/about')
router.navigate('/users/123')
# 3. Query params
print("\n--- 3. Query Params ---")
router.navigate('/search?query=python&page=1')
# 4. Hash router
print("\n--- 4. Hash Router ---")
hash_router = HashRouter()
def hash_home_handler(params, query):
print("Hash home")
def hash_profile_handler(params, query):
print(f"Hash profile: {params.get('userId', 'N/A')}")
hash_router.add_route('/', hash_home_handler)
hash_router.add_route('profile/:userId', hash_profile_handler)
hash_router.navigate('profile/456')
# 5. Query params manager
print("\n--- 5. Query Params Manager ---")
query_manager = QueryParamsManager()
query_manager.set('tab', 'profile')
query_manager.set('section', 'details')
all_params = query_manager.get_all()
print(f"All params: {all_params}")
# 6. Route guard
print("\n--- 6. Route Guard ---")
guard = RouteGuard()
def is_authenticated():
authenticated = False # Simulate
print(f"Auth check: {authenticated}")
return authenticated
guard.add_auth_guard('/admin', is_authenticated)
can_access = guard.can_navigate('/admin')
print(f"Can access /admin: {can_access}")
# 7. Navigation manager
print("\n--- 7. Navigation Manager ---")
nav_manager = NavigationManager(router)
nav_manager.navigate('/page1')
nav_manager.navigate('/page2')
print(f"Can go back: {nav_manager.can_go_back()}")
print(f"History length: {nav_manager.get_history_length()}")
print("\n=== All Routing Examples Completed ===")
# Export functions
export = {
'Router': Router,
'HashRouter': HashRouter,
'QueryParamsManager': QueryParamsManager,
'RouteGuard': RouteGuard,
'NavigationManager': NavigationManager,
'demonstrate_routing': demonstrate_routing
}
💻 Middleware python
🟡 intermediate
⭐⭐⭐⭐
Реализация цепочки middleware запроса/ответа для обработки и изменения данных
⏱️ 30 min
🏷️ python, web, web features
Prerequisites:
Intermediate Python, Async/await, Pyodide
# Web Python Middleware Examples
# Request/response processing middleware chain
import asyncio
from typing import Dict, Any, Callable, Optional, List
# 1. Context
class Context:
def __init__(self):
self.request = None
self.response = None
self.state = {}
self.headers = {}
self.metadata = {}
# 2. Middleware Function Type
Middleware = Callable[[Context, Callable], None]
# 3. Middleware Pipeline
class MiddlewarePipeline:
def __init__(self):
self.middlewares = []
def use(self, middleware: Middleware) -> 'MiddlewarePipeline':
"""
Add middleware
Args:
middleware: Middleware function
Returns:
Self for chaining
"""
self.middlewares.append(middleware)
return self
async def execute(self, context: Context):
"""
Execute pipeline
Args:
context: Request context
"""
index = [0] # Use list to allow modification in nested function
async def next():
if index[0] < len(self.middlewares):
middleware = self.middlewares[index[0]]
index[0] += 1
await middleware(context, next)
await next()
def clear(self):
"""Clear all middlewares"""
self.middlewares = []
def count(self) -> int:
"""Get middleware count"""
return len(self.middlewares)
# 4. Common Middlewares
class CommonMiddlewares:
@staticmethod
def logging() -> Middleware:
"""Logging middleware"""
async def logging_middleware(context: Context, next: Callable):
start = js.performance.now() if hasattr(js, 'performance') else 0
print('[Request]', {
'url': getattr(context.request, 'url', 'N/A') if context.request else 'N/A',
'method': getattr(context.request, 'method', 'N/A') if context.request else 'N/A',
'headers': context.headers
})
await next()
duration = (js.performance.now() - start) if hasattr(js, 'performance') else 0
print('[Response]', {
'status': getattr(context.response, 'status', 'N/A') if context.response else 'N/A',
'duration': f'{duration:.2f}ms'
})
return logging_middleware
@staticmethod
def timing() -> Middleware:
"""Timing middleware"""
async def timing_middleware(context: Context, next: Callable):
start = js.performance.now() if hasattr(js, 'performance') else 0
await next()
duration = (js.performance.now() - start) if hasattr(js, 'performance') else 0
context.metadata['duration'] = duration
return timing_middleware
@staticmethod
def error_handler(error_handler_func: Callable) -> Middleware:
"""Error handling middleware"""
async def error_middleware(context: Context, next: Callable):
try:
await next()
except Exception as error:
error_handler_func(error)
return error_middleware
@staticmethod
def headers(add_headers: Dict[str, str]) -> Middleware:
"""Header manipulation middleware"""
async def headers_middleware(context: Context, next: Callable):
for key, value in add_headers.items():
context.headers[key] = value
await next()
return headers_middleware
@staticmethod
def authentication(auth_check: Callable) -> Middleware:
"""Authentication middleware"""
async def auth_middleware(context: Context, next: Callable):
is_authenticated = auth_check(context)
if not is_authenticated:
raise Exception('Unauthorized')
context.metadata['authenticated'] = True
await next()
return auth_middleware
@staticmethod
def rate_limit(max_requests: int, window_ms: int) -> Middleware:
"""Rate limiting middleware"""
requests = {}
async def rate_limit_middleware(context: Context, next: Callable):
key = getattr(context.request, 'ip', 'unknown') if context.request else 'unknown'
now = js.Date.now() if hasattr(js, 'Date') else 0
if key not in requests:
requests[key] = []
user_requests = requests[key]
# Remove old requests
window_start = now - window_ms
valid_requests = [r for r in user_requests if r > window_start]
# Check limit
if len(valid_requests) >= max_requests:
raise Exception('Rate limit exceeded')
# Add current request
valid_requests.append(now)
requests[key] = valid_requests
context.metadata['rate_limit'] = {
'remaining': max_requests - len(valid_requests),
'reset': now + window_ms
}
await next()
return rate_limit_middleware
@staticmethod
def cors(options: Dict[str, Any] = None) -> Middleware:
"""CORS middleware"""
if options is None:
options = {}
async def cors_middleware(context: Context, next: Callable):
origin = getattr(context.request, 'headers', {}).get('Origin', '*') if context.request else '*'
# Set CORS headers
allowed_origins = options.get('origin', '*')
if allowed_origins == '*' or not allowed_origins:
context.headers['Access-Control-Allow-Origin'] = '*'
elif isinstance(allowed_origins, list) and origin in allowed_origins:
context.headers['Access-Control-Allow-Origin'] = origin
if options.get('methods'):
context.headers['Access-Control-Allow-Methods'] = ', '.join(options['methods'])
if options.get('headers'):
context.headers['Access-Control-Allow-Headers'] = ', '.join(options['headers'])
if options.get('credentials'):
context.headers['Access-Control-Allow-Credentials'] = 'true'
await next()
return cors_middleware
@staticmethod
def compression() -> Middleware:
"""Compression middleware (simulated)"""
async def compression_middleware(context: Context, next: Callable):
await next()
# Check if response should be compressed
accept_encoding = getattr(context.request, 'headers', {}).get('Accept-Encoding', '') if context.request else ''
if 'gzip' in accept_encoding or 'br' in accept_encoding:
context.headers['Content-Encoding'] = 'gzip'
context.metadata['compressed'] = True
return compression_middleware
@staticmethod
def validation(validator: Callable) -> Middleware:
"""Validation middleware"""
async def validation_middleware(context: Context, next: Callable):
is_valid = validator(context)
if not is_valid:
raise Exception('Validation failed')
await next()
return validation_middleware
@staticmethod
def cache(cache_key_func: Callable, ttl: int = 60000) -> Middleware:
"""Cache middleware"""
cache = {}
async def cache_middleware(context: Context, next: Callable):
key = cache_key_func(context)
now = js.Date.now() if hasattr(js, 'Date') else 0
# Check cache
if key in cache:
cached = cache[key]
if cached['expiry'] > now:
context.response = cached['data']
context.metadata['cached'] = True
return
await next()
# Store in cache
if context.response:
cache[key] = {
'data': context.response,
'expiry': now + ttl
}
return cache_middleware
# 5. Middleware Composer
class MiddlewareComposer:
def __init__(self):
self.pipeline = MiddlewarePipeline()
def use(self, middleware: Middleware) -> 'MiddlewareComposer':
"""Add middleware"""
self.pipeline.use(middleware)
return self
def use_all(self, middlewares: List[Middleware]) -> 'MiddlewareComposer':
"""Add multiple middlewares"""
for mw in middlewares:
self.pipeline.use(mw)
return self
async def execute(self, context: Context):
"""Execute with context"""
await self.pipeline.execute(context)
@staticmethod
def from_list(middlewares: List[Middleware]) -> 'MiddlewareComposer':
"""Create from list"""
composer = MiddlewareComposer()
return composer.use_all(middlewares)
# 6. Mock Request/Response
class MockRequest:
def __init__(self, url: str, method: str = 'GET', headers: Dict = None, body: Any = None, ip: str = '127.0.0.1'):
self.url = url
self.method = method
self.headers = headers or {}
self.body = body
self.ip = ip
class MockResponse:
def __init__(self):
self.status = 200
self.headers = {}
self.body = None
def set_status(self, code: int) -> 'MockResponse':
self.status = code
return self
def set_header(self, key: str, value: str) -> 'MockResponse':
self.headers[key] = value
return self
def send(self, data: Any) -> 'MockResponse':
self.body = data
return self
# 7. Application Builder
class ApplicationBuilder:
def __init__(self):
self.composer = MiddlewareComposer()
def use(self, middleware: Middleware) -> 'ApplicationBuilder':
"""Add middleware"""
self.composer.use(middleware)
return self
async def handle(self, request: MockRequest) -> MockResponse:
"""Handle request"""
context = Context()
context.request = request
context.response = MockResponse()
await self.composer.execute(context)
return context.response
# Usage Examples
async def demonstrate_middleware():
print("=== Web Python Middleware Examples ===\n")
# 1. Basic middleware
print("--- 1. Basic Middleware ---")
pipeline = MiddlewarePipeline()
async def middleware1(context, next):
print("Middleware 1: Before")
await next()
print("Middleware 1: After")
async def middleware2(context, next):
print("Middleware 2: Before")
await next()
print("Middleware 2: After")
pipeline.use(middleware1)
pipeline.use(middleware2)
context1 = Context()
context1.request = MockRequest('/test')
await pipeline.execute(context1)
# 2. Common middlewares
print("\n--- 2. Common Middlewares ---")
app = ApplicationBuilder()
app.use(CommonMiddlewares.logging())
app.use(CommonMiddlewares.timing())
app.use(CommonMiddlewares.headers({
'X-Powered-By': 'Python-Middleware',
'X-Response-Time': '0ms'
}))
request = MockRequest('/api/users', 'GET')
request.headers = {'Accept': 'application/json'}
response = await app.handle(request)
print(f"Response status: {response.status}")
# 3. Error handling
print("\n--- 3. Error Handling ---")
def error_handler(error):
print(f"Error caught: {str(error)}")
error_app = ApplicationBuilder()
error_app.use(CommonMiddlewares.error_handler(error_handler))
async def error_middleware(context, next):
if context.request and context.request.url == '/error':
raise Exception('Intentional error')
await next()
error_app.use(error_middleware)
await error_app.handle(MockRequest('/error'))
# 4. Authentication
print("\n--- 4. Authentication ---")
def auth_check(context):
token = context.request.headers.get('Authorization', '') if context.request else ''
return token == 'Bearer valid-token'
auth_app = ApplicationBuilder()
auth_app.use(CommonMiddlewares.authentication(auth_check))
async def auth_handler(context, next):
print("Authenticated successfully")
context.response = MockResponse().set_status(200).send({'message': 'Success'})
await next()
auth_app.use(auth_handler)
auth_request = MockRequest('/protected', 'GET')
auth_request.headers = {'Authorization': 'Bearer valid-token'}
auth_response = await auth_app.handle(auth_request)
print(f"Auth response status: {auth_response.status}")
# 5. Rate limiting
print("\n--- 5. Rate Limiting ---")
rate_limit_app = ApplicationBuilder()
rate_limit_app.use(CommonMiddlewares.rate_limit(max_requests=3, window_ms=10000))
async def response_handler(context, next):
context.response = MockResponse().set_status(200).send({'message': 'OK'})
await next()
rate_limit_app.use(response_handler)
# Send multiple requests
rl_request = MockRequest('/api/test')
for i in range(4):
try:
rl_response = await rate_limit_app.handle(rl_request)
print(f"Request {i + 1}: {rl_response.status}")
except Exception as e:
print(f"Request {i + 1}: Rate limited")
print("\n=== All Middleware Examples Completed ===")
# Export functions
export = {
'MiddlewarePipeline': MiddlewarePipeline,
'CommonMiddlewares': CommonMiddlewares,
'MiddlewareComposer': MiddlewareComposer,
'ApplicationBuilder': ApplicationBuilder,
'MockRequest': MockRequest,
'MockResponse': MockResponse,
'Context': Context,
'demonstrate_middleware': demonstrate_middleware
}
💻 Статические Файлы python
🟡 intermediate
⭐⭐⭐
Обслуживание статических файлов с кэшированием, сжатием и определением MIME-типа
⏱️ 25 min
🏷️ python, web, web features
Prerequisites:
Intermediate Python, Fetch API, Pyodide
# Web Python Static File Serving Examples
# Serving static files with various strategies
import js
from typing import Dict, Any, Optional, List, Set
import hashlib
# 1. MIME Type Detector
class MIMETypeDetector:
MIME_TYPES = {
'.html': 'text/html',
'.htm': 'text/html',
'.css': 'text/css',
'.js': 'text/javascript',
'.json': 'application/json',
'.xml': 'application/xml',
'.txt': 'text/plain',
'.pdf': 'application/pdf',
'.zip': 'application/zip',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.woff': 'font/woff',
'.woff2': 'font/woff2',
'.ttf': 'font/ttf',
'.eot': 'application/vnd.ms-fontobject',
'.mp3': 'audio/mpeg',
'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.ogg': 'video/ogg'
}
@classmethod
def get_by_extension(cls, filename: str) -> str:
"""
Get MIME type by extension
Args:
filename: File name
Returns:
MIME type string
"""
ext = filename[filename.rfind('.'):].lower() if '.' in filename else ''
if ext in cls.MIME_TYPES:
return cls.MIME_TYPES[ext]
return 'application/octet-stream'
@classmethod
async def get_by_signature(cls, file: Any) -> str:
"""
Get MIME type by file signature (magic bytes)
Args:
file: File blob
Returns:
MIME type string
"""
try:
header = await cls._read_file_header(file, 8)
# Check common signatures (simplified)
# PNG: 89 50 4E 47
if header[0] == 0x89 and header[1] == 0x50 and header[2] == 0x4E and header[3] == 0x47:
return 'image/png'
# JPEG: FF D8 FF
if header[0] == 0xFF and header[1] == 0xD8 and header[2] == 0xFF:
return 'image/jpeg'
# GIF: 47 49 46 38
if header[0] == 0x47 and header[1] == 0x49 and header[2] == 0x46 and header[3] == 0x38:
return 'image/gif'
return 'application/octet-stream'
except:
return 'application/octet-stream'
@staticmethod
async def _read_file_header(file: Any, bytes_count: int = 8) -> List[int]:
"""Read file header"""
# Simplified - in real Pyodide you'd read from Blob
return [0] * bytes_count
# 2. Static File Server
class StaticFileServer:
def __init__(self, base_url: str = '/static'):
self.base_url = base_url
self.cache = {}
self.cache_timeout = 60000 # 1 minute
async def serve(self, file_path: str) -> Dict[str, Any]:
"""
Serve file
Args:
file_path: Path to file
Returns:
dict with data, mime, and status
"""
# Check cache
if file_path in self.cache:
cached = self.cache[file_path]
if js.Date.now() - cached['timestamp'] < self.cache_timeout:
return {
'data': cached['data'],
'mime': cached['mime'],
'status': 200
}
# Fetch file
url = f"{self.base_url}{file_path}"
try:
response = await js.fetch(url)
if not response.ok:
return {
'data': None,
'mime': 'text/plain',
'status': response.status
}
mime = response.headers.get('Content-Type') or MIMETypeDetector.get_by_extension(file_path)
# Get data based on content type
content_type = mime.split(';')[0] if ';' in mime else mime
if 'application/json' in content_type:
data = await response.json()
elif 'text/' in content_type:
data = await response.text()
else:
data = await response.blob()
# Cache response
self.cache[file_path] = {
'data': data,
'mime': mime,
'timestamp': js.Date.now()
}
return {'data': data, 'mime': mime, 'status': 200}
except Exception as error:
return {
'data': None,
'mime': 'text/plain',
'status': 500
}
async def serve_with_fallback(self, file_path: str, fallback_path: str) -> Dict[str, Any]:
"""Serve file with fallback"""
result = await self.serve(file_path)
if result['status'] == 404:
return await self.serve(fallback_path)
return result
def clear_cache(self):
"""Clear cache"""
self.cache.clear()
def clear_cache_entry(self, file_path: str):
"""Clear specific cache entry"""
if file_path in self.cache:
del self.cache[file_path]
def get_cache_size(self) -> int:
"""Get cache size"""
return len(self.cache)
# 3. File Cache Manager
class FileCacheManager:
def __init__(self):
self.cache = {}
def set(self, file_path: str, content: Any, mime_type: str, etag: str = None, last_modified: str = None):
"""Add to cache"""
self.cache[file_path] = {
'content': content,
'mime_type': mime_type,
'etag': etag or self._generate_etag(content),
'last_modified': last_modified or js.Date.new().toUTCString()
}
def get(self, file_path: str) -> Optional[Dict[str, Any]]:
"""Get from cache"""
return self.cache.get(file_path)
def has(self, file_path: str) -> bool:
"""Check if file is cached"""
return file_path in self.cache
def remove(self, file_path: str):
"""Remove from cache"""
if file_path in self.cache:
del self.cache[file_path]
def clear(self):
"""Clear all cache"""
self.cache.clear()
def get_stats(self) -> Dict[str, Any]:
"""Get cache stats"""
return {
'size': len(self.cache),
'entries': len(self.cache),
'keys': list(self.cache.keys())
}
def _generate_etag(self, content: Any) -> str:
"""Generate ETag"""
content_str = str(content) if not isinstance(content, str) else content
hash_value = hashlib.md5(content_str.encode()).hexdigest()
return f'"{hash_value}"'
# 4. Asset Preloader
class AssetPreloader:
def __init__(self):
self.loaded_assets: Set[str] = set()
self.failed_assets: Set[str] = set()
async def preload(self, url: str) -> Dict[str, Any]:
"""
Preload single asset
Args:
url: Asset URL
Returns:
dict with success status
"""
if url in self.loaded_assets:
return {'success': True}
if url in self.failed_assets:
return {'success': False, 'error': Exception('Previously failed to load')}
try:
response = await js.fetch(url)
if not response.ok:
raise Exception(f'HTTP {response.status}')
self.loaded_assets.add(url)
return {'success': True}
except Exception as error:
self.failed_assets.add(url)
return {'success': False, 'error': error}
async def preload_multiple(self, urls: List[str]) -> Dict[str, Any]:
"""
Preload multiple assets
Args:
urls: List of URLs
Returns:
dict with successful and failed lists
"""
successful = []
failed = []
for url in urls:
result = await self.preload(url)
if result['success']:
successful.append(url)
else:
failed.append({'url': url, 'error': result.get('error')})
return {'successful': successful, 'failed': failed}
def get_loaded_assets(self) -> List[str]:
"""Get loaded assets"""
return list(self.loaded_assets)
def get_failed_assets(self) -> List[str]:
"""Get failed assets"""
return list(self.failed_assets)
def clear(self):
"""Clear tracking"""
self.loaded_assets.clear()
self.failed_assets.clear()
# 5. CDN Manager
class CDNManager:
def __init__(self, cdn_urls: List[str]):
self.cdn_urls = cdn_urls
self.current_cdn_index = 0
def get_cdn_url(self, asset_path: str) -> str:
"""Get CDN URL for asset"""
cdn_url = self.cdn_urls[self.current_cdn_index]
return f"{cdn_url}{asset_path}"
def rotate_cdn(self):
"""Rotate CDN"""
self.current_cdn_index = (self.current_cdn_index + 1) % len(self.cdn_urls)
def get_all_cdn_urls(self, asset_path: str) -> List[str]:
"""Get all CDN URLs for asset"""
return [f"{cdn}{asset_path}" for cdn in self.cdn_urls]
async def try_all_cdns(self, asset_path: str) -> Dict[str, Any]:
"""Try each CDN until one succeeds"""
urls = self.get_all_cdn_urls(asset_path)
for url in urls:
try:
response = await js.fetch(url)
if response.ok:
data = await response.blob()
return {'success': True, 'cdn_url': url, 'data': data}
except Exception as error:
print(f"CDN failed: {url}, {error}")
return {'success': False}
# 6. Asset Bundle Manager
class AssetBundleManager:
def __init__(self):
self.bundles = {}
def register_bundle(self, name: str, assets: List[str]):
"""Register bundle"""
self.bundles[name] = assets
def get_bundle(self, name: str) -> Optional[List[str]]:
"""Get bundle assets"""
return self.bundles.get(name)
async def preload_bundle(self, name: str, preloader: AssetPreloader) -> Dict[str, Any]:
"""Preload bundle"""
assets = self.get_bundle(name)
if not assets:
return {'successful': [], 'failed': []}
return await preloader.preload_multiple(assets)
# Usage Examples
async def demonstrate_static_files():
print("=== Web Python Static File Serving Examples ===\n")
# 1. MIME type detection
print("--- 1. MIME Type Detection ---")
print(f"HTML: {MIMETypeDetector.get_by_extension('index.html')}")
print(f"JavaScript: {MIMETypeDetector.get_by_extension('app.js')}")
print(f"PNG: {MIMETypeDetector.get_by_extension('image.png')}")
# 2. Static file server
print("\n--- 2. Static File Server ---")
server = StaticFileServer('/assets')
print(f"Would serve: {server.base_url}/styles/main.css")
print(f"Would serve: {server.base_url}/js/app.js")
# 3. File cache manager
print("\n--- 3. File Cache Manager ---")
cache_manager = FileCacheManager()
cache_manager.set('/index.html', '<html>...</html>', 'text/html', '"abc123"')
cache_manager.set('/app.js', 'console.log("Hello");', 'text/javascript')
print(f"Cache stats: {cache_manager.get_stats()}")
cached = cache_manager.get('/index.html')
print(f"Cached file: {cached['mime_type'] if cached else 'N/A'}")
# 4. Asset preloader
print("\n--- 4. Asset Preloader ---")
preloader = AssetPreloader()
# Note: These would be real URLs in production
preload_results = await preloader.preload_multiple([
'/assets/image1.png',
'/assets/image2.png',
'/assets/image3.png'
])
print(f"Preloaded: {preload_results['successful']}")
print(f"Failed: {preload_results['failed']}")
# 5. CDN manager
print("\n--- 5. CDN Manager ---")
cdn_manager = CDNManager([
'https://cdn1.example.com',
'https://cdn2.example.com',
'https://cdn3.example.com'
])
print(f"CDN URL: {cdn_manager.get_cdn_url('/assets/app.js')}")
cdn_manager.rotate_cdn()
print(f"Rotated CDN URL: {cdn_manager.get_cdn_url('/assets/app.js')}")
# 6. Asset bundle manager
print("\n--- 6. Asset Bundle Manager ---")
bundle_manager = AssetBundleManager()
bundle_manager.register_bundle('vendor', [
'/assets/js/vendor/react.js',
'/assets/js/vendor/lodash.js'
])
bundle_manager.register_bundle('app', [
'/assets/js/app.js',
'/assets/js/utils.js'
])
bundle = bundle_manager.get_bundle('vendor')
print(f"Vendor bundle: {bundle}")
print("\n=== All Static File Serving Examples Completed ===")
# Export functions
export = {
'MIMETypeDetector': MIMETypeDetector,
'StaticFileServer': StaticFileServer,
'FileCacheManager': FileCacheManager,
'AssetPreloader': AssetPreloader,
'CDNManager': CDNManager,
'AssetBundleManager': AssetBundleManager,
'demonstrate_static_files': demonstrate_static_files
}