Web Python Web端功能示例

Web Python Web端功能示例,包括路由处理、中间件和静态文件服务

💻 路由处理 python

🟡 intermediate ⭐⭐⭐

解析URL路由、处理哈希路由和管理浏览器历史

⏱️ 25 min 🏷️ python, web, web features
Prerequisites: Intermediate Python, History API, URL API, Pyodide
# Web Python Routing Examples
# Client-side routing with URL parsing, hash routing, and history management

import js
import re
from typing import Dict, Callable, Any, Optional, Tuple
from urllib.parse import parse_qs, urlparse

# 1. Route Handler
class Route:
    def __init__(self, path: str, handler: Callable):
        self.path = path
        self.handler = handler

# 2. Router
class Router:
    def __init__(self):
        self.routes = []
        self.not_found_handler = None
        self.current_params = {}
        self.current_query = {}

    def add_route(self, path: str, handler: Callable):
        """
        Add route

        Args:
            path: Route path (can include :params)
            handler: Route handler function
        """
        self.routes.append(Route(path, handler))

    def set_not_found(self, handler: Callable):
        """
        Set 404 handler

        Args:
            handler: Handler for not found routes
        """
        self.not_found_handler = handler

    def navigate(self, path: str, params: Dict[str, str] = None):
        """
        Navigate to path

        Args:
            path: Path to navigate to
            params: Route parameters to replace
        """
        url = path
        if params:
            url = self._replace_params(path, params)

        # Update browser URL
        js.window.history.pushState(js.object(), '', url)

        # Handle route
        self._handle_route(url)

    def replace(self, path: str, params: Dict[str, str] = None):
        """
        Replace current route

        Args:
            path: Path to replace with
            params: Route parameters to replace
        """
        url = path
        if params:
            url = self._replace_params(path, params)

        js.window.history.replaceState(js.object(), '', url)
        self._handle_route(url)

    def back(self):
        """Go back in history"""
        js.window.history.back()

    def forward(self):
        """Go forward in history"""
        js.window.history.forward()

    def go(self, delta: int):
        """
        Go to specific history entry

        Args:
            delta: Number of steps to go
        """
        js.window.history.go(delta)

    def _handle_route(self, url: str):
        """Handle route change"""
        path, query = self._parse_url(url)

        # Find matching route
        route = self._find_route(path)

        if route:
            params = self._extract_params(path, route.path)
            self.current_params = params
            self.current_query = query
            route.handler(params, query)
        elif self.not_found_handler:
            self.not_found_handler()

    def _find_route(self, path: str) -> Optional[Route]:
        """Find matching route"""
        for route in self.routes:
            if self._match_route(path, route.path):
                return route
        return None

    def _match_route(self, path: str, pattern: str) -> bool:
        """Check if path matches route pattern"""
        # Convert pattern to regex
        regex_pattern = pattern.replace(':', '[^/]+').replace('*', '.*')
        regex = re.compile(f'^{regex_pattern}$')
        return bool(regex.match(path))

    def _extract_params(self, path: str, pattern: str) -> Dict[str, str]:
        """Extract params from path"""
        params = {}

        pattern_parts = pattern.split('/')
        path_parts = path.split('/')

        for i, part in enumerate(pattern_parts):
            if part.startswith(':'):
                param_name = part[1:]
                params[param_name] = path_parts[i] if i < len(path_parts) else ''

        return params

    def _replace_params(self, path: str, params: Dict[str, str]) -> str:
        """Replace params in path"""
        result = path
        for key, value in params.items():
            result = result.replace(f':{key}', value)
        return result

    def _parse_url(self, url: str) -> Tuple[str, Dict[str, str]]:
        """Parse URL into path and query"""
        if '?' in url:
            path, query_string = url.split('?', 1)
            query = dict(parse_qs(query_string))
            # Flatten values (take first)
            query = {k: v[0] if isinstance(v, list) else v for k, v in query.items()}
        else:
            path = url
            query = {}

        return path, query

    def initialize(self):
        """Initialize router (set up event listeners)"""
        # Note: In Pyodide, you'd need to set up JavaScript event handlers
        pass


# 3. Hash Router
class HashRouter:
    def __init__(self):
        self.routes = []
        self.not_found_handler = None

    def add_route(self, path: str, handler: Callable):
        """Add route"""
        # Remove leading slash for hash matching
        hash_path = path[1:] if path.startswith('/') else path
        self.routes.append(Route(hash_path, handler))

    def set_not_found(self, handler: Callable):
        """Set 404 handler"""
        self.not_found_handler = handler

    def navigate(self, hash_path: str, params: Dict[str, str] = None):
        """Navigate to hash"""
        url = hash_path
        if params:
            url = self._replace_params(hash_path, params)

        js.window.location.hash = url

    def get_hash(self) -> str:
        """Get current hash"""
        hash_value = js.window.location.hash
        return hash_value[1:] if hash_value else '/'

    def _replace_params(self, path: str, params: Dict[str, str]) -> str:
        """Replace params in path"""
        result = path
        for key, value in params.items():
            result = result.replace(f':{key}', value)
        return result

    def _handle_hash_change(self):
        """Handle hash change"""
        hash_value = self.get_hash()

        if '?' in hash_value:
            path, query_string = hash_value.split('?', 1)
            query = dict(parse_qs(query_string))
            query = {k: v[0] if isinstance(v, list) else v for k, v in query.items()}
        else:
            path = hash_value
            query = {}

        # Find matching route
        route = self._find_route(path)

        if route:
            params = self._extract_params(path, route.path)
            route.handler(params, query)
        elif self.not_found_handler:
            self.not_found_handler()

    def _find_route(self, path: str) -> Optional[Route]:
        """Find matching route"""
        for route in self.routes:
            if self._match_route(path, route.path):
                return route
        return None

    def _match_route(self, path: str, pattern: str) -> bool:
        """Check if path matches route pattern"""
        regex_pattern = pattern.replace(':', '[^/]+').replace('*', '.*')
        regex = re.compile(f'^{regex_pattern}$')
        return bool(regex.match(path))

    def _extract_params(self, path: str, pattern: str) -> Dict[str, str]:
        """Extract params from path"""
        params = {}
        pattern_parts = pattern.split('/')
        path_parts = path.split('/')

        for i, part in enumerate(pattern_parts):
            if part.startswith(':'):
                param_name = part[1:]
                params[param_name] = path_parts[i] if i < len(path_parts) else ''

        return params


# 4. Query Params Manager
class QueryParamsManager:
    def get(self, param: str) -> Optional[str]:
        """
        Get query param

        Args:
            param: Parameter name

        Returns:
            Parameter value or None
        """
        params = js.URLSearchParams.new(js.window.location.search)
        return params.get(param)

    def get_all(self) -> Dict[str, str]:
        """
        Get all params

        Returns:
            Dictionary of all parameters
        """
        params = js.URLSearchParams.new(js.window.location.search)
        result = {}

        for key in list(params.keys()):
            result[key] = params.get(key)

        return result

    def set(self, param: str, value: str):
        """
        Set query param

        Args:
            param: Parameter name
            value: Parameter value
        """
        params = js.URLSearchParams.new(js.window.location.search)
        params.set(param, value)
        self._update(str(params))

    def delete(self, param: str):
        """
        Delete query param

        Args:
            param: Parameter name
        """
        params = js.URLSearchParams.new(js.window.location.search)
        params.delete(param)
        self._update(str(params))

    def clear(self):
        """Clear all params"""
        self._update('')

    def _update(self, query_string: str):
        """Update URL"""
        url = js.URL.new(js.window.location.href)
        url.search = query_string
        js.window.history.replaceState(js.object(), '', str(url))


# 5. Route Guard
class RouteGuard:
    def __init__(self):
        self.guards = {}

    def add_guard(self, route: str, guard: Callable[[], bool]):
        """
        Add guard for route

        Args:
            route: Route path
            guard: Guard function
        """
        self.guards[route] = guard

    async def can_navigate(self, route: str) -> bool:
        """
        Check if route is allowed

        Args:
            route: Route path

        Returns:
            True if navigation is allowed
        """
        guard = self.guards.get(route)
        if guard:
            return guard()
        return True

    def add_auth_guard(self, route: str, is_authenticated: Callable[[], bool]):
        """Add authentication guard"""
        self.add_guard(route, is_authenticated)

    def add_permission_guard(self, route: str, has_permission: Callable[[], bool]):
        """Add permission guard"""
        self.add_guard(route, has_permission)


# 6. Navigation Manager
class NavigationManager:
    def __init__(self, router: Router):
        self.router = router
        self.history = []
        self.current_index = -1

    def navigate(self, path: str, params: Dict[str, str] = None):
        """Navigate with history tracking"""
        # Remove forward history
        if self.current_index < len(self.history) - 1:
            self.history = self.history[:self.current_index + 1]

        # Add to history
        self.history.append(path)
        self.current_index += 1

        # Navigate
        self.router.navigate(path, params)

    def back(self):
        """Go back in custom history"""
        if self.current_index > 0:
            self.current_index -= 1
            js.window.history.back()

    def forward(self):
        """Go forward in custom history"""
        if self.current_index < len(self.history) - 1:
            self.current_index += 1
            js.window.history.forward()

    def can_go_back(self) -> bool:
        """Check if can go back"""
        return self.current_index > 0

    def can_go_forward(self) -> bool:
        """Check if can go forward"""
        return self.current_index < len(self.history) - 1

    def get_current_position(self) -> int:
        """Get current position in history"""
        return self.current_index

    def get_history_length(self) -> int:
        """Get history length"""
        return len(self.history)


# Usage Examples
async def demonstrate_routing():
    print("=== Web Python Routing Examples ===\n")

    # 1. Basic router
    print("--- 1. Basic Router ---")
    router = Router()

    def home_handler(params, query):
        print("Home page")

    def about_handler(params, query):
        print("About page")

    def user_handler(params, query):
        print(f"User page: {params.get('id', 'N/A')}")

    def comment_handler(params, query):
        print(f"Comment: {params.get('commentId', 'N/A')} on post {params.get('postId', 'N/A')}")

    router.add_route('/', home_handler)
    router.add_route('/about', about_handler)
    router.add_route('/users/:id', user_handler)
    router.add_route('/posts/:postId/comments/:commentId', comment_handler)

    # 2. Navigation
    print("\n--- 2. Navigation ---")
    router.navigate('/')
    router.navigate('/about')
    router.navigate('/users/123')

    # 3. Query params
    print("\n--- 3. Query Params ---")
    router.navigate('/search?query=python&page=1')

    # 4. Hash router
    print("\n--- 4. Hash Router ---")
    hash_router = HashRouter()

    def hash_home_handler(params, query):
        print("Hash home")

    def hash_profile_handler(params, query):
        print(f"Hash profile: {params.get('userId', 'N/A')}")

    hash_router.add_route('/', hash_home_handler)
    hash_router.add_route('profile/:userId', hash_profile_handler)
    hash_router.navigate('profile/456')

    # 5. Query params manager
    print("\n--- 5. Query Params Manager ---")
    query_manager = QueryParamsManager()
    query_manager.set('tab', 'profile')
    query_manager.set('section', 'details')
    all_params = query_manager.get_all()
    print(f"All params: {all_params}")

    # 6. Route guard
    print("\n--- 6. Route Guard ---")
    guard = RouteGuard()

    def is_authenticated():
        authenticated = False  # Simulate
        print(f"Auth check: {authenticated}")
        return authenticated

    guard.add_auth_guard('/admin', is_authenticated)
    can_access = guard.can_navigate('/admin')
    print(f"Can access /admin: {can_access}")

    # 7. Navigation manager
    print("\n--- 7. Navigation Manager ---")
    nav_manager = NavigationManager(router)
    nav_manager.navigate('/page1')
    nav_manager.navigate('/page2')
    print(f"Can go back: {nav_manager.can_go_back()}")
    print(f"History length: {nav_manager.get_history_length()}")

    print("\n=== All Routing Examples Completed ===")

# Export functions
export = {
    'Router': Router,
    'HashRouter': HashRouter,
    'QueryParamsManager': QueryParamsManager,
    'RouteGuard': RouteGuard,
    'NavigationManager': NavigationManager,
    'demonstrate_routing': demonstrate_routing
}

💻 中间件 python

🟡 intermediate ⭐⭐⭐⭐

实现请求/响应中间件链以处理和修改数据

⏱️ 30 min 🏷️ python, web, web features
Prerequisites: Intermediate Python, Async/await, Pyodide
# Web Python Middleware Examples
# Request/response processing middleware chain

import asyncio
from typing import Dict, Any, Callable, Optional, List

# 1. Context
class Context:
    def __init__(self):
        self.request = None
        self.response = None
        self.state = {}
        self.headers = {}
        self.metadata = {}

# 2. Middleware Function Type
Middleware = Callable[[Context, Callable], None]

# 3. Middleware Pipeline
class MiddlewarePipeline:
    def __init__(self):
        self.middlewares = []

    def use(self, middleware: Middleware) -> 'MiddlewarePipeline':
        """
        Add middleware

        Args:
            middleware: Middleware function

        Returns:
            Self for chaining
        """
        self.middlewares.append(middleware)
        return self

    async def execute(self, context: Context):
        """
        Execute pipeline

        Args:
            context: Request context
        """
        index = [0]  # Use list to allow modification in nested function

        async def next():
            if index[0] < len(self.middlewares):
                middleware = self.middlewares[index[0]]
                index[0] += 1
                await middleware(context, next)

        await next()

    def clear(self):
        """Clear all middlewares"""
        self.middlewares = []

    def count(self) -> int:
        """Get middleware count"""
        return len(self.middlewares)


# 4. Common Middlewares
class CommonMiddlewares:
    @staticmethod
    def logging() -> Middleware:
        """Logging middleware"""
        async def logging_middleware(context: Context, next: Callable):
            start = js.performance.now() if hasattr(js, 'performance') else 0

            print('[Request]', {
                'url': getattr(context.request, 'url', 'N/A') if context.request else 'N/A',
                'method': getattr(context.request, 'method', 'N/A') if context.request else 'N/A',
                'headers': context.headers
            })

            await next()

            duration = (js.performance.now() - start) if hasattr(js, 'performance') else 0
            print('[Response]', {
                'status': getattr(context.response, 'status', 'N/A') if context.response else 'N/A',
                'duration': f'{duration:.2f}ms'
            })

        return logging_middleware

    @staticmethod
    def timing() -> Middleware:
        """Timing middleware"""
        async def timing_middleware(context: Context, next: Callable):
            start = js.performance.now() if hasattr(js, 'performance') else 0
            await next()
            duration = (js.performance.now() - start) if hasattr(js, 'performance') else 0
            context.metadata['duration'] = duration

        return timing_middleware

    @staticmethod
    def error_handler(error_handler_func: Callable) -> Middleware:
        """Error handling middleware"""
        async def error_middleware(context: Context, next: Callable):
            try:
                await next()
            except Exception as error:
                error_handler_func(error)

        return error_middleware

    @staticmethod
    def headers(add_headers: Dict[str, str]) -> Middleware:
        """Header manipulation middleware"""
        async def headers_middleware(context: Context, next: Callable):
            for key, value in add_headers.items():
                context.headers[key] = value
            await next()

        return headers_middleware

    @staticmethod
    def authentication(auth_check: Callable) -> Middleware:
        """Authentication middleware"""
        async def auth_middleware(context: Context, next: Callable):
            is_authenticated = auth_check(context)
            if not is_authenticated:
                raise Exception('Unauthorized')
            context.metadata['authenticated'] = True
            await next()

        return auth_middleware

    @staticmethod
    def rate_limit(max_requests: int, window_ms: int) -> Middleware:
        """Rate limiting middleware"""
        requests = {}

        async def rate_limit_middleware(context: Context, next: Callable):
            key = getattr(context.request, 'ip', 'unknown') if context.request else 'unknown'
            now = js.Date.now() if hasattr(js, 'Date') else 0

            if key not in requests:
                requests[key] = []

            user_requests = requests[key]

            # Remove old requests
            window_start = now - window_ms
            valid_requests = [r for r in user_requests if r > window_start]

            # Check limit
            if len(valid_requests) >= max_requests:
                raise Exception('Rate limit exceeded')

            # Add current request
            valid_requests.append(now)
            requests[key] = valid_requests

            context.metadata['rate_limit'] = {
                'remaining': max_requests - len(valid_requests),
                'reset': now + window_ms
            }

            await next()

        return rate_limit_middleware

    @staticmethod
    def cors(options: Dict[str, Any] = None) -> Middleware:
        """CORS middleware"""
        if options is None:
            options = {}

        async def cors_middleware(context: Context, next: Callable):
            origin = getattr(context.request, 'headers', {}).get('Origin', '*') if context.request else '*'

            # Set CORS headers
            allowed_origins = options.get('origin', '*')
            if allowed_origins == '*' or not allowed_origins:
                context.headers['Access-Control-Allow-Origin'] = '*'
            elif isinstance(allowed_origins, list) and origin in allowed_origins:
                context.headers['Access-Control-Allow-Origin'] = origin

            if options.get('methods'):
                context.headers['Access-Control-Allow-Methods'] = ', '.join(options['methods'])
            if options.get('headers'):
                context.headers['Access-Control-Allow-Headers'] = ', '.join(options['headers'])
            if options.get('credentials'):
                context.headers['Access-Control-Allow-Credentials'] = 'true'

            await next()

        return cors_middleware

    @staticmethod
    def compression() -> Middleware:
        """Compression middleware (simulated)"""
        async def compression_middleware(context: Context, next: Callable):
            await next()

            # Check if response should be compressed
            accept_encoding = getattr(context.request, 'headers', {}).get('Accept-Encoding', '') if context.request else ''

            if 'gzip' in accept_encoding or 'br' in accept_encoding:
                context.headers['Content-Encoding'] = 'gzip'
                context.metadata['compressed'] = True

        return compression_middleware

    @staticmethod
    def validation(validator: Callable) -> Middleware:
        """Validation middleware"""
        async def validation_middleware(context: Context, next: Callable):
            is_valid = validator(context)
            if not is_valid:
                raise Exception('Validation failed')
            await next()

        return validation_middleware

    @staticmethod
    def cache(cache_key_func: Callable, ttl: int = 60000) -> Middleware:
        """Cache middleware"""
        cache = {}

        async def cache_middleware(context: Context, next: Callable):
            key = cache_key_func(context)
            now = js.Date.now() if hasattr(js, 'Date') else 0

            # Check cache
            if key in cache:
                cached = cache[key]
                if cached['expiry'] > now:
                    context.response = cached['data']
                    context.metadata['cached'] = True
                    return

            await next()

            # Store in cache
            if context.response:
                cache[key] = {
                    'data': context.response,
                    'expiry': now + ttl
                }

        return cache_middleware


# 5. Middleware Composer
class MiddlewareComposer:
    def __init__(self):
        self.pipeline = MiddlewarePipeline()

    def use(self, middleware: Middleware) -> 'MiddlewareComposer':
        """Add middleware"""
        self.pipeline.use(middleware)
        return self

    def use_all(self, middlewares: List[Middleware]) -> 'MiddlewareComposer':
        """Add multiple middlewares"""
        for mw in middlewares:
            self.pipeline.use(mw)
        return self

    async def execute(self, context: Context):
        """Execute with context"""
        await self.pipeline.execute(context)

    @staticmethod
    def from_list(middlewares: List[Middleware]) -> 'MiddlewareComposer':
        """Create from list"""
        composer = MiddlewareComposer()
        return composer.use_all(middlewares)


# 6. Mock Request/Response
class MockRequest:
    def __init__(self, url: str, method: str = 'GET', headers: Dict = None, body: Any = None, ip: str = '127.0.0.1'):
        self.url = url
        self.method = method
        self.headers = headers or {}
        self.body = body
        self.ip = ip


class MockResponse:
    def __init__(self):
        self.status = 200
        self.headers = {}
        self.body = None

    def set_status(self, code: int) -> 'MockResponse':
        self.status = code
        return self

    def set_header(self, key: str, value: str) -> 'MockResponse':
        self.headers[key] = value
        return self

    def send(self, data: Any) -> 'MockResponse':
        self.body = data
        return self


# 7. Application Builder
class ApplicationBuilder:
    def __init__(self):
        self.composer = MiddlewareComposer()

    def use(self, middleware: Middleware) -> 'ApplicationBuilder':
        """Add middleware"""
        self.composer.use(middleware)
        return self

    async def handle(self, request: MockRequest) -> MockResponse:
        """Handle request"""
        context = Context()
        context.request = request
        context.response = MockResponse()

        await self.composer.execute(context)

        return context.response


# Usage Examples
async def demonstrate_middleware():
    print("=== Web Python Middleware Examples ===\n")

    # 1. Basic middleware
    print("--- 1. Basic Middleware ---")
    pipeline = MiddlewarePipeline()

    async def middleware1(context, next):
        print("Middleware 1: Before")
        await next()
        print("Middleware 1: After")

    async def middleware2(context, next):
        print("Middleware 2: Before")
        await next()
        print("Middleware 2: After")

    pipeline.use(middleware1)
    pipeline.use(middleware2)

    context1 = Context()
    context1.request = MockRequest('/test')
    await pipeline.execute(context1)

    # 2. Common middlewares
    print("\n--- 2. Common Middlewares ---")
    app = ApplicationBuilder()
    app.use(CommonMiddlewares.logging())
    app.use(CommonMiddlewares.timing())
    app.use(CommonMiddlewares.headers({
        'X-Powered-By': 'Python-Middleware',
        'X-Response-Time': '0ms'
    }))

    request = MockRequest('/api/users', 'GET')
    request.headers = {'Accept': 'application/json'}
    response = await app.handle(request)
    print(f"Response status: {response.status}")

    # 3. Error handling
    print("\n--- 3. Error Handling ---")

    def error_handler(error):
        print(f"Error caught: {str(error)}")

    error_app = ApplicationBuilder()
    error_app.use(CommonMiddlewares.error_handler(error_handler))

    async def error_middleware(context, next):
        if context.request and context.request.url == '/error':
            raise Exception('Intentional error')
        await next()

    error_app.use(error_middleware)
    await error_app.handle(MockRequest('/error'))

    # 4. Authentication
    print("\n--- 4. Authentication ---")

    def auth_check(context):
        token = context.request.headers.get('Authorization', '') if context.request else ''
        return token == 'Bearer valid-token'

    auth_app = ApplicationBuilder()
    auth_app.use(CommonMiddlewares.authentication(auth_check))

    async def auth_handler(context, next):
        print("Authenticated successfully")
        context.response = MockResponse().set_status(200).send({'message': 'Success'})
        await next()

    auth_app.use(auth_handler)

    auth_request = MockRequest('/protected', 'GET')
    auth_request.headers = {'Authorization': 'Bearer valid-token'}
    auth_response = await auth_app.handle(auth_request)
    print(f"Auth response status: {auth_response.status}")

    # 5. Rate limiting
    print("\n--- 5. Rate Limiting ---")
    rate_limit_app = ApplicationBuilder()
    rate_limit_app.use(CommonMiddlewares.rate_limit(max_requests=3, window_ms=10000))

    async def response_handler(context, next):
        context.response = MockResponse().set_status(200).send({'message': 'OK'})
        await next()

    rate_limit_app.use(response_handler)

    # Send multiple requests
    rl_request = MockRequest('/api/test')
    for i in range(4):
        try:
            rl_response = await rate_limit_app.handle(rl_request)
            print(f"Request {i + 1}: {rl_response.status}")
        except Exception as e:
            print(f"Request {i + 1}: Rate limited")

    print("\n=== All Middleware Examples Completed ===")

# Export functions
export = {
    'MiddlewarePipeline': MiddlewarePipeline,
    'CommonMiddlewares': CommonMiddlewares,
    'MiddlewareComposer': MiddlewareComposer,
    'ApplicationBuilder': ApplicationBuilder,
    'MockRequest': MockRequest,
    'MockResponse': MockResponse,
    'Context': Context,
    'demonstrate_middleware': demonstrate_middleware
}

💻 静态文件 python

🟡 intermediate ⭐⭐⭐

提供静态文件服务,包括缓存、压缩和MIME类型检测

⏱️ 25 min 🏷️ python, web, web features
Prerequisites: Intermediate Python, Fetch API, Pyodide
# Web Python Static File Serving Examples
# Serving static files with various strategies

import js
from typing import Dict, Any, Optional, List, Set
import hashlib

# 1. MIME Type Detector
class MIMETypeDetector:
    MIME_TYPES = {
        '.html': 'text/html',
        '.htm': 'text/html',
        '.css': 'text/css',
        '.js': 'text/javascript',
        '.json': 'application/json',
        '.xml': 'application/xml',
        '.txt': 'text/plain',
        '.pdf': 'application/pdf',
        '.zip': 'application/zip',
        '.png': 'image/png',
        '.jpg': 'image/jpeg',
        '.jpeg': 'image/jpeg',
        '.gif': 'image/gif',
        '.svg': 'image/svg+xml',
        '.ico': 'image/x-icon',
        '.woff': 'font/woff',
        '.woff2': 'font/woff2',
        '.ttf': 'font/ttf',
        '.eot': 'application/vnd.ms-fontobject',
        '.mp3': 'audio/mpeg',
        '.mp4': 'video/mp4',
        '.webm': 'video/webm',
        '.ogg': 'video/ogg'
    }

    @classmethod
    def get_by_extension(cls, filename: str) -> str:
        """
        Get MIME type by extension

        Args:
            filename: File name

        Returns:
            MIME type string
        """
        ext = filename[filename.rfind('.'):].lower() if '.' in filename else ''

        if ext in cls.MIME_TYPES:
            return cls.MIME_TYPES[ext]

        return 'application/octet-stream'

    @classmethod
    async def get_by_signature(cls, file: Any) -> str:
        """
        Get MIME type by file signature (magic bytes)

        Args:
            file: File blob

        Returns:
            MIME type string
        """
        try:
            header = await cls._read_file_header(file, 8)

            # Check common signatures (simplified)
            # PNG: 89 50 4E 47
            if header[0] == 0x89 and header[1] == 0x50 and header[2] == 0x4E and header[3] == 0x47:
                return 'image/png'
            # JPEG: FF D8 FF
            if header[0] == 0xFF and header[1] == 0xD8 and header[2] == 0xFF:
                return 'image/jpeg'
            # GIF: 47 49 46 38
            if header[0] == 0x47 and header[1] == 0x49 and header[2] == 0x46 and header[3] == 0x38:
                return 'image/gif'

            return 'application/octet-stream'
        except:
            return 'application/octet-stream'

    @staticmethod
    async def _read_file_header(file: Any, bytes_count: int = 8) -> List[int]:
        """Read file header"""
        # Simplified - in real Pyodide you'd read from Blob
        return [0] * bytes_count


# 2. Static File Server
class StaticFileServer:
    def __init__(self, base_url: str = '/static'):
        self.base_url = base_url
        self.cache = {}
        self.cache_timeout = 60000  # 1 minute

    async def serve(self, file_path: str) -> Dict[str, Any]:
        """
        Serve file

        Args:
            file_path: Path to file

        Returns:
            dict with data, mime, and status
        """
        # Check cache
        if file_path in self.cache:
            cached = self.cache[file_path]
            if js.Date.now() - cached['timestamp'] < self.cache_timeout:
                return {
                    'data': cached['data'],
                    'mime': cached['mime'],
                    'status': 200
                }

        # Fetch file
        url = f"{self.base_url}{file_path}"

        try:
            response = await js.fetch(url)

            if not response.ok:
                return {
                    'data': None,
                    'mime': 'text/plain',
                    'status': response.status
                }

            mime = response.headers.get('Content-Type') or MIMETypeDetector.get_by_extension(file_path)

            # Get data based on content type
            content_type = mime.split(';')[0] if ';' in mime else mime

            if 'application/json' in content_type:
                data = await response.json()
            elif 'text/' in content_type:
                data = await response.text()
            else:
                data = await response.blob()

            # Cache response
            self.cache[file_path] = {
                'data': data,
                'mime': mime,
                'timestamp': js.Date.now()
            }

            return {'data': data, 'mime': mime, 'status': 200}

        except Exception as error:
            return {
                'data': None,
                'mime': 'text/plain',
                'status': 500
            }

    async def serve_with_fallback(self, file_path: str, fallback_path: str) -> Dict[str, Any]:
        """Serve file with fallback"""
        result = await self.serve(file_path)

        if result['status'] == 404:
            return await self.serve(fallback_path)

        return result

    def clear_cache(self):
        """Clear cache"""
        self.cache.clear()

    def clear_cache_entry(self, file_path: str):
        """Clear specific cache entry"""
        if file_path in self.cache:
            del self.cache[file_path]

    def get_cache_size(self) -> int:
        """Get cache size"""
        return len(self.cache)


# 3. File Cache Manager
class FileCacheManager:
    def __init__(self):
        self.cache = {}

    def set(self, file_path: str, content: Any, mime_type: str, etag: str = None, last_modified: str = None):
        """Add to cache"""
        self.cache[file_path] = {
            'content': content,
            'mime_type': mime_type,
            'etag': etag or self._generate_etag(content),
            'last_modified': last_modified or js.Date.new().toUTCString()
        }

    def get(self, file_path: str) -> Optional[Dict[str, Any]]:
        """Get from cache"""
        return self.cache.get(file_path)

    def has(self, file_path: str) -> bool:
        """Check if file is cached"""
        return file_path in self.cache

    def remove(self, file_path: str):
        """Remove from cache"""
        if file_path in self.cache:
            del self.cache[file_path]

    def clear(self):
        """Clear all cache"""
        self.cache.clear()

    def get_stats(self) -> Dict[str, Any]:
        """Get cache stats"""
        return {
            'size': len(self.cache),
            'entries': len(self.cache),
            'keys': list(self.cache.keys())
        }

    def _generate_etag(self, content: Any) -> str:
        """Generate ETag"""
        content_str = str(content) if not isinstance(content, str) else content
        hash_value = hashlib.md5(content_str.encode()).hexdigest()
        return f'"{hash_value}"'


# 4. Asset Preloader
class AssetPreloader:
    def __init__(self):
        self.loaded_assets: Set[str] = set()
        self.failed_assets: Set[str] = set()

    async def preload(self, url: str) -> Dict[str, Any]:
        """
        Preload single asset

        Args:
            url: Asset URL

        Returns:
            dict with success status
        """
        if url in self.loaded_assets:
            return {'success': True}

        if url in self.failed_assets:
            return {'success': False, 'error': Exception('Previously failed to load')}

        try:
            response = await js.fetch(url)

            if not response.ok:
                raise Exception(f'HTTP {response.status}')

            self.loaded_assets.add(url)
            return {'success': True}

        except Exception as error:
            self.failed_assets.add(url)
            return {'success': False, 'error': error}

    async def preload_multiple(self, urls: List[str]) -> Dict[str, Any]:
        """
        Preload multiple assets

        Args:
            urls: List of URLs

        Returns:
            dict with successful and failed lists
        """
        successful = []
        failed = []

        for url in urls:
            result = await self.preload(url)

            if result['success']:
                successful.append(url)
            else:
                failed.append({'url': url, 'error': result.get('error')})

        return {'successful': successful, 'failed': failed}

    def get_loaded_assets(self) -> List[str]:
        """Get loaded assets"""
        return list(self.loaded_assets)

    def get_failed_assets(self) -> List[str]:
        """Get failed assets"""
        return list(self.failed_assets)

    def clear(self):
        """Clear tracking"""
        self.loaded_assets.clear()
        self.failed_assets.clear()


# 5. CDN Manager
class CDNManager:
    def __init__(self, cdn_urls: List[str]):
        self.cdn_urls = cdn_urls
        self.current_cdn_index = 0

    def get_cdn_url(self, asset_path: str) -> str:
        """Get CDN URL for asset"""
        cdn_url = self.cdn_urls[self.current_cdn_index]
        return f"{cdn_url}{asset_path}"

    def rotate_cdn(self):
        """Rotate CDN"""
        self.current_cdn_index = (self.current_cdn_index + 1) % len(self.cdn_urls)

    def get_all_cdn_urls(self, asset_path: str) -> List[str]:
        """Get all CDN URLs for asset"""
        return [f"{cdn}{asset_path}" for cdn in self.cdn_urls]

    async def try_all_cdns(self, asset_path: str) -> Dict[str, Any]:
        """Try each CDN until one succeeds"""
        urls = self.get_all_cdn_urls(asset_path)

        for url in urls:
            try:
                response = await js.fetch(url)
                if response.ok:
                    data = await response.blob()
                    return {'success': True, 'cdn_url': url, 'data': data}
            except Exception as error:
                print(f"CDN failed: {url}, {error}")

        return {'success': False}


# 6. Asset Bundle Manager
class AssetBundleManager:
    def __init__(self):
        self.bundles = {}

    def register_bundle(self, name: str, assets: List[str]):
        """Register bundle"""
        self.bundles[name] = assets

    def get_bundle(self, name: str) -> Optional[List[str]]:
        """Get bundle assets"""
        return self.bundles.get(name)

    async def preload_bundle(self, name: str, preloader: AssetPreloader) -> Dict[str, Any]:
        """Preload bundle"""
        assets = self.get_bundle(name)

        if not assets:
            return {'successful': [], 'failed': []}

        return await preloader.preload_multiple(assets)


# Usage Examples
async def demonstrate_static_files():
    print("=== Web Python Static File Serving Examples ===\n")

    # 1. MIME type detection
    print("--- 1. MIME Type Detection ---")
    print(f"HTML: {MIMETypeDetector.get_by_extension('index.html')}")
    print(f"JavaScript: {MIMETypeDetector.get_by_extension('app.js')}")
    print(f"PNG: {MIMETypeDetector.get_by_extension('image.png')}")

    # 2. Static file server
    print("\n--- 2. Static File Server ---")
    server = StaticFileServer('/assets')
    print(f"Would serve: {server.base_url}/styles/main.css")
    print(f"Would serve: {server.base_url}/js/app.js")

    # 3. File cache manager
    print("\n--- 3. File Cache Manager ---")
    cache_manager = FileCacheManager()

    cache_manager.set('/index.html', '<html>...</html>', 'text/html', '"abc123"')
    cache_manager.set('/app.js', 'console.log("Hello");', 'text/javascript')

    print(f"Cache stats: {cache_manager.get_stats()}")

    cached = cache_manager.get('/index.html')
    print(f"Cached file: {cached['mime_type'] if cached else 'N/A'}")

    # 4. Asset preloader
    print("\n--- 4. Asset Preloader ---")
    preloader = AssetPreloader()

    # Note: These would be real URLs in production
    preload_results = await preloader.preload_multiple([
        '/assets/image1.png',
        '/assets/image2.png',
        '/assets/image3.png'
    ])

    print(f"Preloaded: {preload_results['successful']}")
    print(f"Failed: {preload_results['failed']}")

    # 5. CDN manager
    print("\n--- 5. CDN Manager ---")
    cdn_manager = CDNManager([
        'https://cdn1.example.com',
        'https://cdn2.example.com',
        'https://cdn3.example.com'
    ])

    print(f"CDN URL: {cdn_manager.get_cdn_url('/assets/app.js')}")

    cdn_manager.rotate_cdn()
    print(f"Rotated CDN URL: {cdn_manager.get_cdn_url('/assets/app.js')}")

    # 6. Asset bundle manager
    print("\n--- 6. Asset Bundle Manager ---")
    bundle_manager = AssetBundleManager()

    bundle_manager.register_bundle('vendor', [
        '/assets/js/vendor/react.js',
        '/assets/js/vendor/lodash.js'
    ])

    bundle_manager.register_bundle('app', [
        '/assets/js/app.js',
        '/assets/js/utils.js'
    ])

    bundle = bundle_manager.get_bundle('vendor')
    print(f"Vendor bundle: {bundle}")

    print("\n=== All Static File Serving Examples Completed ===")

# Export functions
export = {
    'MIMETypeDetector': MIMETypeDetector,
    'StaticFileServer': StaticFileServer,
    'FileCacheManager': FileCacheManager,
    'AssetPreloader': AssetPreloader,
    'CDNManager': CDNManager,
    'AssetBundleManager': AssetBundleManager,
    'demonstrate_static_files': demonstrate_static_files
}