Exemples de Fonctionnalités Web Web Python

Exemples de fonctionnalités web Web Python incluant le routage, le middleware et le service de fichiers statiques

Key Facts

Category
Python
Items
3
Format Families
sample

Sample Overview

Exemples de fonctionnalités web Web Python incluant le routage, le middleware et le service de fichiers statiques This sample set belongs to Python and can be used to test related workflows inside Elysia Tools.

💻 Routage python

🟡 intermediate ⭐⭐⭐

Analyser les routes URL, gérer le routage de hachage et gérer l'historique du navigateur

⏱️ 25 min 🏷️ python, web, web features
Prerequisites: Intermediate Python, History API, URL API, Pyodide
# Web Python Routing Examples
# Client-side routing with URL parsing, hash routing, and history management

import js
import re
from typing import Dict, Callable, Any, Optional, Tuple
from urllib.parse import parse_qs, urlparse

# 1. Route Handler
class Route:
    def __init__(self, path: str, handler: Callable):
        self.path = path
        self.handler = handler

# 2. Router
class Router:
    def __init__(self):
        self.routes = []
        self.not_found_handler = None
        self.current_params = {}
        self.current_query = {}

    def add_route(self, path: str, handler: Callable):
        """
        Add route

        Args:
            path: Route path (can include :params)
            handler: Route handler function
        """
        self.routes.append(Route(path, handler))

    def set_not_found(self, handler: Callable):
        """
        Set 404 handler

        Args:
            handler: Handler for not found routes
        """
        self.not_found_handler = handler

    def navigate(self, path: str, params: Dict[str, str] = None):
        """
        Navigate to path

        Args:
            path: Path to navigate to
            params: Route parameters to replace
        """
        url = path
        if params:
            url = self._replace_params(path, params)

        # Update browser URL
        js.window.history.pushState(js.Object.new(), '', url)

        # Handle route
        self._handle_route(url)

    def replace(self, path: str, params: Dict[str, str] = None):
        """
        Replace current route

        Args:
            path: Path to replace with
            params: Route parameters to replace
        """
        url = path
        if params:
            url = self._replace_params(path, params)

        js.window.history.replaceState(js.Object.new(), '', url)
        self._handle_route(url)

    def back(self):
        """Go back in history"""
        js.window.history.back()

    def forward(self):
        """Go forward in history"""
        js.window.history.forward()

    def go(self, delta: int):
        """
        Go to specific history entry

        Args:
            delta: Number of steps to go
        """
        js.window.history.go(delta)

    def _handle_route(self, url: str):
        """Handle route change"""
        path, query = self._parse_url(url)

        # Find matching route
        route = self._find_route(path)

        if route:
            params = self._extract_params(path, route.path)
            self.current_params = params
            self.current_query = query
            route.handler(params, query)
        elif self.not_found_handler:
            self.not_found_handler()

    def _find_route(self, path: str) -> Optional[Route]:
        """Find matching route"""
        for route in self.routes:
            if self._match_route(path, route.path):
                return route
        return None

    def _match_route(self, path: str, pattern: str) -> bool:
        """Check if path matches route pattern"""
        # Convert pattern to regex
        regex_pattern = re.sub(r':[A-Za-z0-9_]+', r'[^/]+', pattern).replace('*', '.*')
        regex = re.compile(f'^{regex_pattern}$')
        return bool(regex.match(path))

    def _extract_params(self, path: str, pattern: str) -> Dict[str, str]:
        """Extract params from path"""
        params = {}

        pattern_parts = pattern.split('/')
        path_parts = path.split('/')

        for i, part in enumerate(pattern_parts):
            if part.startswith(':'):
                param_name = part[1:]
                params[param_name] = path_parts[i] if i < len(path_parts) else ''

        return params

    def _replace_params(self, path: str, params: Dict[str, str]) -> str:
        """Replace params in path"""
        result = path
        for key, value in params.items():
            result = result.replace(f':{key}', value)
        return result

    def _parse_url(self, url: str) -> Tuple[str, Dict[str, str]]:
        """Parse URL into path and query"""
        if '?' in url:
            path, query_string = url.split('?', 1)
            query = dict(parse_qs(query_string))
            # Flatten values (take first)
            query = {k: v[0] if isinstance(v, list) else v for k, v in query.items()}
        else:
            path = url
            query = {}

        return path, query

    def initialize(self):
        """Initialize router (set up event listeners)"""
        # Note: In Pyodide, you'd need to set up JavaScript event handlers
        pass


# 3. Hash Router
class HashRouter:
    def __init__(self):
        self.routes = []
        self.not_found_handler = None

    def add_route(self, path: str, handler: Callable):
        """Add route"""
        # Remove leading slash for hash matching
        hash_path = path[1:] if path.startswith('/') else path
        self.routes.append(Route(hash_path, handler))

    def set_not_found(self, handler: Callable):
        """Set 404 handler"""
        self.not_found_handler = handler

    def navigate(self, hash_path: str, params: Dict[str, str] = None):
        """Navigate to hash"""
        url = hash_path
        if params:
            url = self._replace_params(hash_path, params)

        js.window.location.hash = url

    def get_hash(self) -> str:
        """Get current hash"""
        hash_value = js.window.location.hash
        return hash_value[1:] if hash_value else '/'

    def _replace_params(self, path: str, params: Dict[str, str]) -> str:
        """Replace params in path"""
        result = path
        for key, value in params.items():
            result = result.replace(f':{key}', value)
        return result

    def _handle_hash_change(self):
        """Handle hash change"""
        hash_value = self.get_hash()

        if '?' in hash_value:
            path, query_string = hash_value.split('?', 1)
            query = dict(parse_qs(query_string))
            query = {k: v[0] if isinstance(v, list) else v for k, v in query.items()}
        else:
            path = hash_value
            query = {}

        # Find matching route
        route = self._find_route(path)

        if route:
            params = self._extract_params(path, route.path)
            route.handler(params, query)
        elif self.not_found_handler:
            self.not_found_handler()

    def _find_route(self, path: str) -> Optional[Route]:
        """Find matching route"""
        for route in self.routes:
            if self._match_route(path, route.path):
                return route
        return None

    def _match_route(self, path: str, pattern: str) -> bool:
        """Check if path matches route pattern"""
        regex_pattern = re.sub(r':[A-Za-z0-9_]+', r'[^/]+', pattern).replace('*', '.*')
        regex = re.compile(f'^{regex_pattern}$')
        return bool(regex.match(path))

    def _extract_params(self, path: str, pattern: str) -> Dict[str, str]:
        """Extract params from path"""
        params = {}
        pattern_parts = pattern.split('/')
        path_parts = path.split('/')

        for i, part in enumerate(pattern_parts):
            if part.startswith(':'):
                param_name = part[1:]
                params[param_name] = path_parts[i] if i < len(path_parts) else ''

        return params


# 4. Query Params Manager
class QueryParamsManager:
    def get(self, param: str) -> Optional[str]:
        """
        Get query param

        Args:
            param: Parameter name

        Returns:
            Parameter value or None
        """
        params = js.URLSearchParams.new(js.window.location.search)
        return params.get(param)

    def get_all(self) -> Dict[str, str]:
        """
        Get all params

        Returns:
            Dictionary of all parameters
        """
        params = js.URLSearchParams.new(js.window.location.search)
        result = {}

        for key in list(params.keys()):
            result[key] = params.get(key)

        return result

    def set(self, param: str, value: str):
        """
        Set query param

        Args:
            param: Parameter name
            value: Parameter value
        """
        params = js.URLSearchParams.new(js.window.location.search)
        params.set(param, value)
        self._update(str(params))

    def delete(self, param: str):
        """
        Delete query param

        Args:
            param: Parameter name
        """
        params = js.URLSearchParams.new(js.window.location.search)
        params.delete(param)
        self._update(str(params))

    def clear(self):
        """Clear all params"""
        self._update('')

    def _update(self, query_string: str):
        """Update URL"""
        url = js.URL.new(js.window.location.href)
        url.search = query_string
        js.window.history.replaceState(js.Object.new(), '', str(url))


# 5. Route Guard
class RouteGuard:
    def __init__(self):
        self.guards = {}

    def add_guard(self, route: str, guard: Callable[[], bool]):
        """
        Add guard for route

        Args:
            route: Route path
            guard: Guard function
        """
        self.guards[route] = guard

    async def can_navigate(self, route: str) -> bool:
        """
        Check if route is allowed

        Args:
            route: Route path

        Returns:
            True if navigation is allowed
        """
        guard = self.guards.get(route)
        if guard:
            return guard()
        return True

    def add_auth_guard(self, route: str, is_authenticated: Callable[[], bool]):
        """Add authentication guard"""
        self.add_guard(route, is_authenticated)

    def add_permission_guard(self, route: str, has_permission: Callable[[], bool]):
        """Add permission guard"""
        self.add_guard(route, has_permission)


# 6. Navigation Manager
class NavigationManager:
    def __init__(self, router: Router):
        self.router = router
        self.history = []
        self.current_index = -1

    def navigate(self, path: str, params: Dict[str, str] = None):
        """Navigate with history tracking"""
        # Remove forward history
        if self.current_index < len(self.history) - 1:
            self.history = self.history[:self.current_index + 1]

        # Add to history
        self.history.append(path)
        self.current_index += 1

        # Navigate
        self.router.navigate(path, params)

    def back(self):
        """Go back in custom history"""
        if self.current_index > 0:
            self.current_index -= 1
            js.window.history.back()

    def forward(self):
        """Go forward in custom history"""
        if self.current_index < len(self.history) - 1:
            self.current_index += 1
            js.window.history.forward()

    def can_go_back(self) -> bool:
        """Check if can go back"""
        return self.current_index > 0

    def can_go_forward(self) -> bool:
        """Check if can go forward"""
        return self.current_index < len(self.history) - 1

    def get_current_position(self) -> int:
        """Get current position in history"""
        return self.current_index

    def get_history_length(self) -> int:
        """Get history length"""
        return len(self.history)


# Usage Examples
async def demonstrate_routing():
    print("=== Web Python Routing Examples ===\n")

    # 1. Basic router
    print("--- 1. Basic Router ---")
    router = Router()

    def home_handler(params, query):
        print("Home page")

    def about_handler(params, query):
        print("About page")

    def user_handler(params, query):
        print(f"User page: {params.get('id', 'N/A')}")

    def comment_handler(params, query):
        print(f"Comment: {params.get('commentId', 'N/A')} on post {params.get('postId', 'N/A')}")

    router.add_route('/', home_handler)
    router.add_route('/about', about_handler)
    router.add_route('/users/:id', user_handler)
    router.add_route('/posts/:postId/comments/:commentId', comment_handler)

    # 2. Navigation
    print("\n--- 2. Navigation ---")
    router.navigate('/')
    router.navigate('/about')
    router.navigate('/users/123')

    # 3. Query params
    print("\n--- 3. Query Params ---")
    router.navigate('/search?query=python&page=1')

    # 4. Hash router
    print("\n--- 4. Hash Router ---")
    hash_router = HashRouter()

    def hash_home_handler(params, query):
        print("Hash home")

    def hash_profile_handler(params, query):
        print(f"Hash profile: {params.get('userId', 'N/A')}")

    hash_router.add_route('/', hash_home_handler)
    hash_router.add_route('profile/:userId', hash_profile_handler)
    hash_router.navigate('profile/456')

    # 5. Query params manager
    print("\n--- 5. Query Params Manager ---")
    query_manager = QueryParamsManager()
    query_manager.set('tab', 'profile')
    query_manager.set('section', 'details')
    all_params = query_manager.get_all()
    print(f"All params: {all_params}")

    # 6. Route guard
    print("\n--- 6. Route Guard ---")
    guard = RouteGuard()

    def is_authenticated():
        authenticated = False  # Simulate
        print(f"Auth check: {authenticated}")
        return authenticated

    guard.add_auth_guard('/admin', is_authenticated)
    can_access = guard.can_navigate('/admin')
    print(f"Can access /admin: {can_access}")

    # 7. Navigation manager
    print("\n--- 7. Navigation Manager ---")
    nav_manager = NavigationManager(router)
    nav_manager.navigate('/page1')
    nav_manager.navigate('/page2')
    print(f"Can go back: {nav_manager.can_go_back()}")
    print(f"History length: {nav_manager.get_history_length()}")

    print("\n=== All Routing Examples Completed ===")

# Export functions
export = {
    'Router': Router,
    'HashRouter': HashRouter,
    'QueryParamsManager': QueryParamsManager,
    'RouteGuard': RouteGuard,
    'NavigationManager': NavigationManager,
    'demonstrate_routing': demonstrate_routing
}

💻 Middleware python

🟡 intermediate ⭐⭐⭐⭐

Implémenter une chaîne de middleware requête/réponse pour traiter et modifier des données

⏱️ 30 min 🏷️ python, web, web features
Prerequisites: Intermediate Python, Async/await, Pyodide
# Web Python Middleware Examples
# Request/response processing middleware chain

import asyncio
from typing import Dict, Any, Callable, Optional, List

# 1. Context
class Context:
    def __init__(self):
        self.request = None
        self.response = None
        self.state = {}
        self.headers = {}
        self.metadata = {}

# 2. Middleware Function Type
Middleware = Callable[[Context, Callable], None]

# 3. Middleware Pipeline
class MiddlewarePipeline:
    def __init__(self):
        self.middlewares = []

    def use(self, middleware: Middleware) -> 'MiddlewarePipeline':
        """
        Add middleware

        Args:
            middleware: Middleware function

        Returns:
            Self for chaining
        """
        self.middlewares.append(middleware)
        return self

    async def execute(self, context: Context):
        """
        Execute pipeline

        Args:
            context: Request context
        """
        index = [0]  # Use list to allow modification in nested function

        async def next():
            if index[0] < len(self.middlewares):
                middleware = self.middlewares[index[0]]
                index[0] += 1
                await middleware(context, next)

        await next()

    def clear(self):
        """Clear all middlewares"""
        self.middlewares = []

    def count(self) -> int:
        """Get middleware count"""
        return len(self.middlewares)


# 4. Common Middlewares
class CommonMiddlewares:
    @staticmethod
    def logging() -> Middleware:
        """Logging middleware"""
        async def logging_middleware(context: Context, next: Callable):
            start = js.performance.now() if hasattr(js, 'performance') else 0

            print('[Request]', {
                'url': getattr(context.request, 'url', 'N/A') if context.request else 'N/A',
                'method': getattr(context.request, 'method', 'N/A') if context.request else 'N/A',
                'headers': context.headers
            })

            await next()

            duration = (js.performance.now() - start) if hasattr(js, 'performance') else 0
            print('[Response]', {
                'status': getattr(context.response, 'status', 'N/A') if context.response else 'N/A',
                'duration': f'{duration:.2f}ms'
            })

        return logging_middleware

    @staticmethod
    def timing() -> Middleware:
        """Timing middleware"""
        async def timing_middleware(context: Context, next: Callable):
            start = js.performance.now() if hasattr(js, 'performance') else 0
            await next()
            duration = (js.performance.now() - start) if hasattr(js, 'performance') else 0
            context.metadata['duration'] = duration

        return timing_middleware

    @staticmethod
    def error_handler(error_handler_func: Callable) -> Middleware:
        """Error handling middleware"""
        async def error_middleware(context: Context, next: Callable):
            try:
                await next()
            except Exception as error:
                error_handler_func(error)

        return error_middleware

    @staticmethod
    def headers(add_headers: Dict[str, str]) -> Middleware:
        """Header manipulation middleware"""
        async def headers_middleware(context: Context, next: Callable):
            for key, value in add_headers.items():
                context.headers[key] = value
            await next()

        return headers_middleware

    @staticmethod
    def authentication(auth_check: Callable) -> Middleware:
        """Authentication middleware"""
        async def auth_middleware(context: Context, next: Callable):
            is_authenticated = auth_check(context)
            if not is_authenticated:
                raise Exception('Unauthorized')
            context.metadata['authenticated'] = True
            await next()

        return auth_middleware

    @staticmethod
    def rate_limit(max_requests: int, window_ms: int) -> Middleware:
        """Rate limiting middleware"""
        requests = {}

        async def rate_limit_middleware(context: Context, next: Callable):
            key = getattr(context.request, 'ip', 'unknown') if context.request else 'unknown'
            now = js.Date.now() if hasattr(js, 'Date') else 0

            if key not in requests:
                requests[key] = []

            user_requests = requests[key]

            # Remove old requests
            window_start = now - window_ms
            valid_requests = [r for r in user_requests if r > window_start]

            # Check limit
            if len(valid_requests) >= max_requests:
                raise Exception('Rate limit exceeded')

            # Add current request
            valid_requests.append(now)
            requests[key] = valid_requests

            context.metadata['rate_limit'] = {
                'remaining': max_requests - len(valid_requests),
                'reset': now + window_ms
            }

            await next()

        return rate_limit_middleware

    @staticmethod
    def cors(options: Dict[str, Any] = None) -> Middleware:
        """CORS middleware"""
        if options is None:
            options = {}

        async def cors_middleware(context: Context, next: Callable):
            origin = getattr(context.request, 'headers', {}).get('Origin', '*') if context.request else '*'

            # Set CORS headers
            allowed_origins = options.get('origin', '*')
            if allowed_origins == '*' or not allowed_origins:
                context.headers['Access-Control-Allow-Origin'] = '*'
            elif isinstance(allowed_origins, list) and origin in allowed_origins:
                context.headers['Access-Control-Allow-Origin'] = origin

            if options.get('methods'):
                context.headers['Access-Control-Allow-Methods'] = ', '.join(options['methods'])
            if options.get('headers'):
                context.headers['Access-Control-Allow-Headers'] = ', '.join(options['headers'])
            if options.get('credentials'):
                context.headers['Access-Control-Allow-Credentials'] = 'true'

            await next()

        return cors_middleware

    @staticmethod
    def compression() -> Middleware:
        """Compression middleware (simulated)"""
        async def compression_middleware(context: Context, next: Callable):
            await next()

            # Check if response should be compressed
            accept_encoding = getattr(context.request, 'headers', {}).get('Accept-Encoding', '') if context.request else ''

            if 'gzip' in accept_encoding or 'br' in accept_encoding:
                context.headers['Content-Encoding'] = 'gzip'
                context.metadata['compressed'] = True

        return compression_middleware

    @staticmethod
    def validation(validator: Callable) -> Middleware:
        """Validation middleware"""
        async def validation_middleware(context: Context, next: Callable):
            is_valid = validator(context)
            if not is_valid:
                raise Exception('Validation failed')
            await next()

        return validation_middleware

    @staticmethod
    def cache(cache_key_func: Callable, ttl: int = 60000) -> Middleware:
        """Cache middleware"""
        cache = {}

        async def cache_middleware(context: Context, next: Callable):
            key = cache_key_func(context)
            now = js.Date.now() if hasattr(js, 'Date') else 0

            # Check cache
            if key in cache:
                cached = cache[key]
                if cached['expiry'] > now:
                    context.response = cached['data']
                    context.metadata['cached'] = True
                    return

            await next()

            # Store in cache
            if context.response:
                cache[key] = {
                    'data': context.response,
                    'expiry': now + ttl
                }

        return cache_middleware


# 5. Middleware Composer
class MiddlewareComposer:
    def __init__(self):
        self.pipeline = MiddlewarePipeline()

    def use(self, middleware: Middleware) -> 'MiddlewareComposer':
        """Add middleware"""
        self.pipeline.use(middleware)
        return self

    def use_all(self, middlewares: List[Middleware]) -> 'MiddlewareComposer':
        """Add multiple middlewares"""
        for mw in middlewares:
            self.pipeline.use(mw)
        return self

    async def execute(self, context: Context):
        """Execute with context"""
        await self.pipeline.execute(context)

    @staticmethod
    def from_list(middlewares: List[Middleware]) -> 'MiddlewareComposer':
        """Create from list"""
        composer = MiddlewareComposer()
        return composer.use_all(middlewares)


# 6. Mock Request/Response
class MockRequest:
    def __init__(self, url: str, method: str = 'GET', headers: Dict = None, body: Any = None, ip: str = '127.0.0.1'):
        self.url = url
        self.method = method
        self.headers = headers or {}
        self.body = body
        self.ip = ip


class MockResponse:
    def __init__(self):
        self.status = 200
        self.headers = {}
        self.body = None

    def set_status(self, code: int) -> 'MockResponse':
        self.status = code
        return self

    def set_header(self, key: str, value: str) -> 'MockResponse':
        self.headers[key] = value
        return self

    def send(self, data: Any) -> 'MockResponse':
        self.body = data
        return self


# 7. Application Builder
class ApplicationBuilder:
    def __init__(self):
        self.composer = MiddlewareComposer()

    def use(self, middleware: Middleware) -> 'ApplicationBuilder':
        """Add middleware"""
        self.composer.use(middleware)
        return self

    async def handle(self, request: MockRequest) -> MockResponse:
        """Handle request"""
        context = Context()
        context.request = request
        context.response = MockResponse()

        await self.composer.execute(context)

        return context.response


# Usage Examples
async def demonstrate_middleware():
    print("=== Web Python Middleware Examples ===\n")

    # 1. Basic middleware
    print("--- 1. Basic Middleware ---")
    pipeline = MiddlewarePipeline()

    async def middleware1(context, next):
        print("Middleware 1: Before")
        await next()
        print("Middleware 1: After")

    async def middleware2(context, next):
        print("Middleware 2: Before")
        await next()
        print("Middleware 2: After")

    pipeline.use(middleware1)
    pipeline.use(middleware2)

    context1 = Context()
    context1.request = MockRequest('/test')
    await pipeline.execute(context1)

    # 2. Common middlewares
    print("\n--- 2. Common Middlewares ---")
    app = ApplicationBuilder()
    app.use(CommonMiddlewares.logging())
    app.use(CommonMiddlewares.timing())
    app.use(CommonMiddlewares.headers({
        'X-Powered-By': 'Python-Middleware',
        'X-Response-Time': '0ms'
    }))

    request = MockRequest('/api/users', 'GET')
    request.headers = {'Accept': 'application/json'}
    response = await app.handle(request)
    print(f"Response status: {response.status}")

    # 3. Error handling
    print("\n--- 3. Error Handling ---")

    def error_handler(error):
        print(f"Error caught: {str(error)}")

    error_app = ApplicationBuilder()
    error_app.use(CommonMiddlewares.error_handler(error_handler))

    async def error_middleware(context, next):
        if context.request and context.request.url == '/error':
            raise Exception('Intentional error')
        await next()

    error_app.use(error_middleware)
    await error_app.handle(MockRequest('/error'))

    # 4. Authentication
    print("\n--- 4. Authentication ---")

    def auth_check(context):
        token = context.request.headers.get('Authorization', '') if context.request else ''
        return token == 'Bearer valid-token'

    auth_app = ApplicationBuilder()
    auth_app.use(CommonMiddlewares.authentication(auth_check))

    async def auth_handler(context, next):
        print("Authenticated successfully")
        context.response = MockResponse().set_status(200).send({'message': 'Success'})
        await next()

    auth_app.use(auth_handler)

    auth_request = MockRequest('/protected', 'GET')
    auth_request.headers = {'Authorization': 'Bearer valid-token'}
    auth_response = await auth_app.handle(auth_request)
    print(f"Auth response status: {auth_response.status}")

    # 5. Rate limiting
    print("\n--- 5. Rate Limiting ---")
    rate_limit_app = ApplicationBuilder()
    rate_limit_app.use(CommonMiddlewares.rate_limit(max_requests=3, window_ms=10000))

    async def response_handler(context, next):
        context.response = MockResponse().set_status(200).send({'message': 'OK'})
        await next()

    rate_limit_app.use(response_handler)

    # Send multiple requests
    rl_request = MockRequest('/api/test')
    for i in range(4):
        try:
            rl_response = await rate_limit_app.handle(rl_request)
            print(f"Request {i + 1}: {rl_response.status}")
        except Exception as e:
            print(f"Request {i + 1}: Rate limited")

    print("\n=== All Middleware Examples Completed ===")

# Export functions
export = {
    'MiddlewarePipeline': MiddlewarePipeline,
    'CommonMiddlewares': CommonMiddlewares,
    'MiddlewareComposer': MiddlewareComposer,
    'ApplicationBuilder': ApplicationBuilder,
    'MockRequest': MockRequest,
    'MockResponse': MockResponse,
    'Context': Context,
    'demonstrate_middleware': demonstrate_middleware
}

💻 Fichiers Statiques python

🟡 intermediate ⭐⭐⭐

Servir des fichiers statiques avec mise en cache, compression et détection de type MIME

⏱️ 25 min 🏷️ python, web, web features
Prerequisites: Intermediate Python, Fetch API, Pyodide
# Web Python Static File Serving Examples
# Serving static files with various strategies

import js
from typing import Dict, Any, Optional, List, Set
import hashlib

# 1. MIME Type Detector
class MIMETypeDetector:
    MIME_TYPES = {
        '.html': 'text/html',
        '.htm': 'text/html',
        '.css': 'text/css',
        '.js': 'text/javascript',
        '.json': 'application/json',
        '.xml': 'application/xml',
        '.txt': 'text/plain',
        '.pdf': 'application/pdf',
        '.zip': 'application/zip',
        '.png': 'image/png',
        '.jpg': 'image/jpeg',
        '.jpeg': 'image/jpeg',
        '.gif': 'image/gif',
        '.svg': 'image/svg+xml',
        '.ico': 'image/x-icon',
        '.woff': 'font/woff',
        '.woff2': 'font/woff2',
        '.ttf': 'font/ttf',
        '.eot': 'application/vnd.ms-fontobject',
        '.mp3': 'audio/mpeg',
        '.mp4': 'video/mp4',
        '.webm': 'video/webm',
        '.ogg': 'video/ogg'
    }

    @classmethod
    def get_by_extension(cls, filename: str) -> str:
        """
        Get MIME type by extension

        Args:
            filename: File name

        Returns:
            MIME type string
        """
        ext = filename[filename.rfind('.'):].lower() if '.' in filename else ''

        if ext in cls.MIME_TYPES:
            return cls.MIME_TYPES[ext]

        return 'application/octet-stream'

    @classmethod
    async def get_by_signature(cls, file: Any) -> str:
        """
        Get MIME type by file signature (magic bytes)

        Args:
            file: File blob

        Returns:
            MIME type string
        """
        try:
            header = await cls._read_file_header(file, 8)

            # Check common signatures (simplified)
            # PNG: 89 50 4E 47
            if header[0] == 0x89 and header[1] == 0x50 and header[2] == 0x4E and header[3] == 0x47:
                return 'image/png'
            # JPEG: FF D8 FF
            if header[0] == 0xFF and header[1] == 0xD8 and header[2] == 0xFF:
                return 'image/jpeg'
            # GIF: 47 49 46 38
            if header[0] == 0x47 and header[1] == 0x49 and header[2] == 0x46 and header[3] == 0x38:
                return 'image/gif'

            return 'application/octet-stream'
        except:
            return 'application/octet-stream'

    @staticmethod
    async def _read_file_header(file: Any, bytes_count: int = 8) -> List[int]:
        """Read file header"""
        # Simplified - in real Pyodide you'd read from Blob
        return [0] * bytes_count


# 2. Static File Server
class StaticFileServer:
    def __init__(self, base_url: str = '/static'):
        self.base_url = base_url
        self.cache = {}
        self.cache_timeout = 60000  # 1 minute

    async def serve(self, file_path: str) -> Dict[str, Any]:
        """
        Serve file

        Args:
            file_path: Path to file

        Returns:
            dict with data, mime, and status
        """
        # Check cache
        if file_path in self.cache:
            cached = self.cache[file_path]
            if js.Date.now() - cached['timestamp'] < self.cache_timeout:
                return {
                    'data': cached['data'],
                    'mime': cached['mime'],
                    'status': 200
                }

        # Fetch file
        url = f"{self.base_url}{file_path}"

        try:
            response = await js.fetch(url)

            if not response.ok:
                return {
                    'data': None,
                    'mime': 'text/plain',
                    'status': response.status
                }

            mime = response.headers.get('Content-Type') or MIMETypeDetector.get_by_extension(file_path)

            # Get data based on content type
            content_type = mime.split(';')[0] if ';' in mime else mime

            if 'application/json' in content_type:
                data = await response.json()
            elif 'text/' in content_type:
                data = await response.text()
            else:
                data = await response.blob()

            # Cache response
            self.cache[file_path] = {
                'data': data,
                'mime': mime,
                'timestamp': js.Date.now()
            }

            return {'data': data, 'mime': mime, 'status': 200}

        except Exception as error:
            return {
                'data': None,
                'mime': 'text/plain',
                'status': 500
            }

    async def serve_with_fallback(self, file_path: str, fallback_path: str) -> Dict[str, Any]:
        """Serve file with fallback"""
        result = await self.serve(file_path)

        if result['status'] == 404:
            return await self.serve(fallback_path)

        return result

    def clear_cache(self):
        """Clear cache"""
        self.cache.clear()

    def clear_cache_entry(self, file_path: str):
        """Clear specific cache entry"""
        if file_path in self.cache:
            del self.cache[file_path]

    def get_cache_size(self) -> int:
        """Get cache size"""
        return len(self.cache)


# 3. File Cache Manager
class FileCacheManager:
    def __init__(self):
        self.cache = {}

    def set(self, file_path: str, content: Any, mime_type: str, etag: str = None, last_modified: str = None):
        """Add to cache"""
        self.cache[file_path] = {
            'content': content,
            'mime_type': mime_type,
            'etag': etag or self._generate_etag(content),
            'last_modified': last_modified or js.Date.new().toUTCString()
        }

    def get(self, file_path: str) -> Optional[Dict[str, Any]]:
        """Get from cache"""
        return self.cache.get(file_path)

    def has(self, file_path: str) -> bool:
        """Check if file is cached"""
        return file_path in self.cache

    def remove(self, file_path: str):
        """Remove from cache"""
        if file_path in self.cache:
            del self.cache[file_path]

    def clear(self):
        """Clear all cache"""
        self.cache.clear()

    def get_stats(self) -> Dict[str, Any]:
        """Get cache stats"""
        return {
            'size': len(self.cache),
            'entries': len(self.cache),
            'keys': list(self.cache.keys())
        }

    def _generate_etag(self, content: Any) -> str:
        """Generate ETag"""
        content_str = str(content) if not isinstance(content, str) else content
        hash_value = hashlib.md5(content_str.encode()).hexdigest()
        return f'"{hash_value}"'


# 4. Asset Preloader
class AssetPreloader:
    def __init__(self):
        self.loaded_assets: Set[str] = set()
        self.failed_assets: Set[str] = set()

    async def preload(self, url: str) -> Dict[str, Any]:
        """
        Preload single asset

        Args:
            url: Asset URL

        Returns:
            dict with success status
        """
        if url in self.loaded_assets:
            return {'success': True}

        if url in self.failed_assets:
            return {'success': False, 'error': Exception('Previously failed to load')}

        try:
            response = await js.fetch(url)

            if not response.ok:
                raise Exception(f'HTTP {response.status}')

            self.loaded_assets.add(url)
            return {'success': True}

        except Exception as error:
            self.failed_assets.add(url)
            return {'success': False, 'error': error}

    async def preload_multiple(self, urls: List[str]) -> Dict[str, Any]:
        """
        Preload multiple assets

        Args:
            urls: List of URLs

        Returns:
            dict with successful and failed lists
        """
        successful = []
        failed = []

        for url in urls:
            result = await self.preload(url)

            if result['success']:
                successful.append(url)
            else:
                failed.append({'url': url, 'error': result.get('error')})

        return {'successful': successful, 'failed': failed}

    def get_loaded_assets(self) -> List[str]:
        """Get loaded assets"""
        return list(self.loaded_assets)

    def get_failed_assets(self) -> List[str]:
        """Get failed assets"""
        return list(self.failed_assets)

    def clear(self):
        """Clear tracking"""
        self.loaded_assets.clear()
        self.failed_assets.clear()


# 5. CDN Manager
class CDNManager:
    def __init__(self, cdn_urls: List[str]):
        self.cdn_urls = cdn_urls
        self.current_cdn_index = 0

    def get_cdn_url(self, asset_path: str) -> str:
        """Get CDN URL for asset"""
        cdn_url = self.cdn_urls[self.current_cdn_index]
        return f"{cdn_url}{asset_path}"

    def rotate_cdn(self):
        """Rotate CDN"""
        self.current_cdn_index = (self.current_cdn_index + 1) % len(self.cdn_urls)

    def get_all_cdn_urls(self, asset_path: str) -> List[str]:
        """Get all CDN URLs for asset"""
        return [f"{cdn}{asset_path}" for cdn in self.cdn_urls]

    async def try_all_cdns(self, asset_path: str) -> Dict[str, Any]:
        """Try each CDN until one succeeds"""
        urls = self.get_all_cdn_urls(asset_path)

        for url in urls:
            try:
                response = await js.fetch(url)
                if response.ok:
                    data = await response.blob()
                    return {'success': True, 'cdn_url': url, 'data': data}
            except Exception as error:
                print(f"CDN failed: {url}, {error}")

        return {'success': False}


# 6. Asset Bundle Manager
class AssetBundleManager:
    def __init__(self):
        self.bundles = {}

    def register_bundle(self, name: str, assets: List[str]):
        """Register bundle"""
        self.bundles[name] = assets

    def get_bundle(self, name: str) -> Optional[List[str]]:
        """Get bundle assets"""
        return self.bundles.get(name)

    async def preload_bundle(self, name: str, preloader: AssetPreloader) -> Dict[str, Any]:
        """Preload bundle"""
        assets = self.get_bundle(name)

        if not assets:
            return {'successful': [], 'failed': []}

        return await preloader.preload_multiple(assets)


# Usage Examples
async def demonstrate_static_files():
    print("=== Web Python Static File Serving Examples ===\n")

    # 1. MIME type detection
    print("--- 1. MIME Type Detection ---")
    print(f"HTML: {MIMETypeDetector.get_by_extension('index.html')}")
    print(f"JavaScript: {MIMETypeDetector.get_by_extension('app.js')}")
    print(f"PNG: {MIMETypeDetector.get_by_extension('image.png')}")

    # 2. Static file server
    print("\n--- 2. Static File Server ---")
    server = StaticFileServer('/assets')
    print(f"Would serve: {server.base_url}/styles/main.css")
    print(f"Would serve: {server.base_url}/js/app.js")

    # 3. File cache manager
    print("\n--- 3. File Cache Manager ---")
    cache_manager = FileCacheManager()

    cache_manager.set('/index.html', '<html>...</html>', 'text/html', '"abc123"')
    cache_manager.set('/app.js', 'console.log("Hello");', 'text/javascript')

    print(f"Cache stats: {cache_manager.get_stats()}")

    cached = cache_manager.get('/index.html')
    print(f"Cached file: {cached['mime_type'] if cached else 'N/A'}")

    # 4. Asset preloader
    print("\n--- 4. Asset Preloader ---")
    preloader = AssetPreloader()

    # Note: These would be real URLs in production
    preload_results = await preloader.preload_multiple([
        '/assets/image1.png',
        '/assets/image2.png',
        '/assets/image3.png'
    ])

    print(f"Preloaded: {preload_results['successful']}")
    print(f"Failed: {preload_results['failed']}")

    # 5. CDN manager
    print("\n--- 5. CDN Manager ---")
    cdn_manager = CDNManager([
        'https://cdn1.example.com',
        'https://cdn2.example.com',
        'https://cdn3.example.com'
    ])

    print(f"CDN URL: {cdn_manager.get_cdn_url('/assets/app.js')}")

    cdn_manager.rotate_cdn()
    print(f"Rotated CDN URL: {cdn_manager.get_cdn_url('/assets/app.js')}")

    # 6. Asset bundle manager
    print("\n--- 6. Asset Bundle Manager ---")
    bundle_manager = AssetBundleManager()

    bundle_manager.register_bundle('vendor', [
        '/assets/js/vendor/react.js',
        '/assets/js/vendor/lodash.js'
    ])

    bundle_manager.register_bundle('app', [
        '/assets/js/app.js',
        '/assets/js/utils.js'
    ])

    bundle = bundle_manager.get_bundle('vendor')
    print(f"Vendor bundle: {bundle}")

    print("\n=== All Static File Serving Examples Completed ===")

# Export functions
export = {
    'MIMETypeDetector': MIMETypeDetector,
    'StaticFileServer': StaticFileServer,
    'FileCacheManager': FileCacheManager,
    'AssetPreloader': AssetPreloader,
    'CDNManager': CDNManager,
    'AssetBundleManager': AssetBundleManager,
    'demonstrate_static_files': demonstrate_static_files
}