Flask Samples

Flask web framework examples including routes, templates, Flask-SQLAlchemy, Flask-RESTful, and modern Flask patterns

💻 Flask Hello World python

🟢 simple

Basic Flask application setup and Hello World with routes, templates, and basic functionality

⏱️ 20 min 🏷️ flask, python, web framework, routing
Prerequisites: Python basics, HTML/CSS basics, Web development concepts
# Flask Hello World Examples

# 1. Project Structure
"""
flask_hello_world/
├── app.py
├── requirements.txt
├── config.py
├── templates/
│   ├── base.html
│   ├── index.html
│   ├── hello.html
│   └── forms/
│       └── greeting_form.html
└── static/
    ├── css/
    │   └── style.css
    └── js/
        └── script.js
"""

# 2. requirements.txt
"""
Flask==2.3.3
Flask-SQLAlchemy==3.0.5
Flask-WTF==1.1.1
WTForms==3.0.1
Flask-Migrate==4.0.5
python-dotenv==1.0.0
"""

# 3. config.py
import os
from datetime import timedelta

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///app.db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False

    # Pagination
    POSTS_PER_PAGE = 10

    # Session configuration
    PERMANENT_SESSION_LIFETIME = timedelta(hours=24)

class DevelopmentConfig(Config):
    DEBUG = True

class ProductionConfig(Config):
    DEBUG = False

config = {
    'development': DevelopmentConfig,
    'production': ProductionConfig,
    'default': DevelopmentConfig
}

# 4. Basic Flask Application (app.py)
from flask import Flask, render_template, request, redirect, url_for, flash, session, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import FlaskForm
from wtforms import StringField, TextAreaField, SubmitField, IntegerField
from wtforms.validators import DataRequired, Length, NumberRange
from datetime import datetime
import os

app = Flask(__name__)

# Configuration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY') or 'dev-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# Initialize extensions
db = SQLAlchemy(app)

# 5. Database Models
class Greeting(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    message = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    is_active = db.Column(db.Boolean, default=True)
    visits = db.Column(db.Integer, default=0)

    def __repr__(self):
        return f'<Greeting {self.name}: {self.message[:20]}>'

class Visitor(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    ip_address = db.Column(db.String(45), nullable=False)
    user_agent = db.Column(db.Text)
    visit_count = db.Column(db.Integer, default=1)
    first_visit = db.Column(db.DateTime, default=datetime.utcnow)
    last_visit = db.Column(db.DateTime, default=datetime.utcnow)

    def __repr__(self):
        return f'<Visitor {self.ip_address}>'

# 6. Forms
class GreetingForm(FlaskForm):
    name = StringField('Name',
                      validators=[DataRequired(), Length(min=2, max=100)])
    message = TextAreaField('Message',
                           validators=[DataRequired(), Length(min=5, max=500)])
    submit = SubmitField('Create Greeting')

class SearchForm(FlaskForm):
    query = StringField('Search', validators=[DataRequired()])
    submit = SubmitField('Search')

# 7. Basic Routes
@app.route('/')
def index():
    """Home page with visitor tracking"""
    # Track visitor
    visitor_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)
    visitor = Visitor.query.filter_by(ip_address=visitor_ip).first()

    if visitor:
        visitor.visit_count += 1
        visitor.last_visit = datetime.utcnow()
    else:
        visitor = Visitor(
            ip_address=visitor_ip,
            user_agent=request.headers.get('User-Agent', '')
        )
        db.session.add(visitor)

    db.session.commit()

    # Get statistics
    total_greetings = Greeting.query.filter_by(is_active=True).count()
    total_visitors = Visitor.query.count()

    return render_template('index.html',
                         total_greetings=total_greetings,
                         total_visitors=total_visitors,
                         current_visitor=visitor)

@app.route('/hello')
def hello_world():
    """Simple Hello World route"""
    return "Hello, World! Welcome to Flask!"

@app.route('/hello/<name>')
def hello_name(name):
    """Hello World with name parameter"""
    return render_template('hello.html', name=name)

@app.route('/greet', methods=['GET', 'POST'])
def greet():
    """Greeting form with database storage"""
    form = GreetingForm()

    if form.validate_on_submit():
        greeting = Greeting(
            name=form.name.data,
            message=form.message.data
        )
        db.session.add(greeting)
        db.session.commit()

        flash(f'Greeting from {form.name.data} created successfully!', 'success')
        return redirect(url_for('greeting_list'))

    return render_template('forms/greeting_form.html', form=form, title='Create Greeting')

@app.route('/greetings')
def greeting_list():
    """Display all greetings with pagination"""
    page = request.args.get('page', 1, type=int)
    greetings = Greeting.query.filter_by(is_active=True)                             .order_by(Greeting.created_at.desc())                             .paginate(page=page, per_page=5, error_out=False)

    return render_template('greeting_list.html', greetings=greetings)

@app.route('/greeting/<int:id>')
def greeting_detail(id):
    """Display single greeting with visit tracking"""
    greeting = Greeting.query.get_or_404(id)

    # Increment visit count
    greeting.visits += 1
    db.session.commit()

    return render_template('greeting_detail.html', greeting=greeting)

@app.route('/greeting/<int:id>/edit', methods=['GET', 'POST'])
def edit_greeting(id):
    """Edit existing greeting"""
    greeting = Greeting.query.get_or_404(id)
    form = GreetingForm(obj=greeting)

    if form.validate_on_submit():
        greeting.name = form.name.data
        greeting.message = form.message.data
        db.session.commit()

        flash('Greeting updated successfully!', 'success')
        return redirect(url_for('greeting_detail', id=greeting.id))

    return render_template('forms/greeting_form.html',
                         form=form,
                         title='Edit Greeting',
                         greeting=greeting)

@app.route('/greeting/<int:id>/delete', methods=['POST'])
def delete_greeting(id):
    """Delete greeting (soft delete)"""
    greeting = Greeting.query.get_or_404(id)
    greeting.is_active = False
    db.session.commit()

    flash('Greeting deleted successfully!', 'info')
    return redirect(url_for('greeting_list'))

# 8. Search Functionality
@app.route('/search', methods=['GET', 'POST'])
def search():
    """Search greetings"""
    form = SearchForm()
    results = []

    if form.validate_on_submit() and form.query.data:
        query = form.query.data
        results = Greeting.query.filter(
            Greeting.is_active == True,
            db.or_(
                Greeting.name.contains(query),
                Greeting.message.contains(query)
            )
        ).order_by(Greeting.created_at.desc()).all()

    return render_template('search.html', form=form, results=results)

# 9. API Routes
@app.route('/api/hello')
def api_hello():
    """API Hello World endpoint"""
    return jsonify({
        'message': 'Hello, World!',
        'framework': 'Flask',
        'version': '2.3.3',
        'timestamp': datetime.utcnow().isoformat()
    })

@app.route('/api/greetings')
def api_greetings():
    """API endpoint to get all greetings"""
    greetings = Greeting.query.filter_by(is_active=True).all()

    return jsonify({
        'greetings': [
            {
                'id': g.id,
                'name': g.name,
                'message': g.message,
                'created_at': g.created_at.isoformat(),
                'visits': g.visits
            }
            for g in greetings
        ],
        'total': len(greetings)
    })

@app.route('/api/greeting/<int:id>')
def api_greeting_detail(id):
    """API endpoint to get single greeting"""
    greeting = Greeting.query.get_or_404(id)

    # Increment visit count
    greeting.visits += 1
    db.session.commit()

    return jsonify({
        'id': greeting.id,
        'name': greeting.name,
        'message': greeting.message,
        'created_at': greeting.created_at.isoformat(),
        'visits': greeting.visits
    })

@app.route('/api/greeting', methods=['POST'])
def api_create_greeting():
    """API endpoint to create greeting"""
    data = request.get_json()

    if not data or 'name' not in data or 'message' not in data:
        return jsonify({'error': 'Name and message are required'}), 400

    greeting = Greeting(
        name=data['name'],
        message=data['message']
    )
    db.session.add(greeting)
    db.session.commit()

    return jsonify({
        'message': 'Greeting created successfully',
        'id': greeting.id
    }), 201

# 10. Session-based Routes
@app.route('/session/set')
def set_session():
    """Set session data"""
    session['user_name'] = 'Flask User'
    session['visits'] = session.get('visits', 0) + 1
    flash('Session data set!', 'info')
    return redirect(url_for('session_info'))

@app.route('/session/info')
def session_info():
    """Display session information"""
    user_name = session.get('user_name', 'Guest')
    visits = session.get('visits', 0)

    return render_template('session_info.html',
                         user_name=user_name,
                         visits=visits)

@app.route('/session/clear')
def clear_session():
    """Clear session data"""
    session.clear()
    flash('Session cleared!', 'info')
    return redirect(url_for('index'))

# 11. Error Handlers
@app.errorhandler(404)
def not_found_error(error):
    """Handle 404 errors"""
    return render_template('errors/404.html'), 404

@app.errorhandler(500)
def internal_error(error):
    """Handle 500 errors"""
    db.session.rollback()
    return render_template('errors/500.html'), 500

# 12. Template Filters
@app.template_filter('format_date')
def format_date(date):
    """Format date for display"""
    if date:
        return date.strftime('%B %d, %Y at %I:%M %p')
    return ''

@app.template_filter('truncate_words')
def truncate_words(text, num_words=10):
    """Truncate text to specified number of words"""
    words = text.split()
    if len(words) <= num_words:
        return text
    return ' '.join(words[:num_words]) + '...'

# 13. Context Processors
@app.context_processor
def inject_stats():
    """Inject global stats into templates"""
    return {
        'total_greetings': Greeting.query.filter_by(is_active=True).count(),
        'total_visitors': Visitor.query.count()
    }

# 14. CLI Commands
@app.cli.command()
def init_db():
    """Initialize the database"""
    db.create_all()
    print('Database initialized!')

@app.cli.command()
def seed_db():
    """Seed the database with sample data"""
    # Clear existing data
    Greeting.query.delete()
    Visitor.query.delete()

    # Add sample greetings
    greetings = [
        Greeting(name='Alice', message='Hello from Wonderland!'),
        Greeting(name='Bob', message='Greetings from the tech world!'),
        Greeting(name='Charlie', message='Happy coding everyone!'),
        Greeting(name='Diana', message='Flask is awesome!'),
        Greeting(name='Eve', message='Python power!')
    ]

    for greeting in greetings:
        db.session.add(greeting)

    db.session.commit()
    print('Database seeded with sample data!')

@app.cli.command()
def reset_db():
    """Reset the database"""
    db.drop_all()
    db.create_all()
    print('Database reset!')

# 15. Template Files

# templates/base.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{% block title %}Flask Hello World{% endblock %}</title>
    <link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet">
    <link href="{{ url_for('static', filename='css/style.css') }}" rel="stylesheet">
</head>
<body>
    <nav class="navbar navbar-expand-lg navbar-dark bg-dark">
        <div class="container">
            <a class="navbar-brand" href="{{ url_for('index') }}">Flask Demo</a>

            <div class="navbar-nav ms-auto">
                <a class="nav-link" href="{{ url_for('index') }}">Home</a>
                <a class="nav-link" href="{{ url_for('greet') }}">Create Greeting</a>
                <a class="nav-link" href="{{ url_for('greeting_list') }}">Greetings</a>
                <a class="nav-link" href="{{ url_for('search') }}">Search</a>
                <a class="nav-link" href="{{ url_for('api_hello') }}">API</a>
            </div>
        </div>
    </nav>

    <main class="container mt-4">
        {% with messages = get_flashed_messages(with_categories=true) %}
            {% if messages %}
                {% for category, message in messages %}
                    <div class="alert alert-{{ category }} alert-dismissible fade show" role="alert">
                        {{ message }}
                        <button type="button" class="btn-close" data-bs-dismiss="alert"></button>
                    </div>
                {% endfor %}
            {% endif %}
        {% endwith %}

        {% block content %}{% endblock %}
    </main>

    <footer class="bg-light text-center py-3 mt-5">
        <p>&copy; 2024 Flask Hello World Demo |
           Total Greetings: {{ total_greetings }} |
           Total Visitors: {{ total_visitors }}
        </p>
    </footer>

    <script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.bundle.min.js"></script>
</body>
</html>

# templates/index.html
{% extends 'base.html' %}

{% block title %}Flask Hello World{% endblock %}

{% block content %}
<div class="row">
    <div class="col-md-8 mx-auto">
        <div class="card text-center mb-4">
            <div class="card-header bg-primary text-white">
                <h1>🌍 Hello, World!</h1>
            </div>
            <div class="card-body">
                <p class="lead">Welcome to your Flask application!</p>

                {% if current_visitor %}
                    <p class="text-muted">
                        Welcome back! You've visited {{ current_visitor.visit_count }} times.
                    </p>
                {% endif %}

                <div class="row mt-4">
                    <div class="col-md-4">
                        <div class="card h-100">
                            <div class="card-body">
                                <h5 class="card-title">⚡ Lightweight</h5>
                                <p class="card-text">Flask is a lightweight WSGI web application framework.</p>
                            </div>
                        </div>
                    </div>
                    <div class="col-md-4">
                        <div class="card h-100">
                            <div class="card-body">
                                <h5 class="card-title">🔧 Flexible</h5>
                                <p class="card-text">Flask provides flexibility and extensibility for your projects.</p>
                            </div>
                        </div>
                    </div>
                    <div class="col-md-4">
                        <div class="card h-100">
                            <div class="card-body">
                                <h5 class="card-title">🐍 Pythonic</h5>
                                <p class="card-text">Built with Python and follows Python best practices.</p>
                            </div>
                        </div>
                    </div>
                </div>

                <div class="mt-4">
                    <a href="{{ url_for('greet') }}" class="btn btn-primary btn-lg me-2">Create Greeting</a>
                    <a href="{{ url_for('greeting_list') }}" class="btn btn-success btn-lg me-2">View Greetings</a>
                    <a href="{{ url_for('session_info') }}" class="btn btn-info btn-lg">Session Demo</a>
                </div>
            </div>
        </div>

        <div class="card">
            <div class="card-header">
                <h3>📊 Application Statistics</h3>
            </div>
            <div class="card-body">
                <div class="row text-center">
                    <div class="col-md-4">
                        <h3 class="text-primary">{{ total_greetings }}</h3>
                        <p class="text-muted">Total Greetings</p>
                    </div>
                    <div class="col-md-4">
                        <h3 class="text-success">{{ total_visitors }}</h3>
                        <p class="text-muted">Total Visitors</p>
                    </div>
                    <div class="col-md-4">
                        <h3 class="text-info">{{ current_visitor.visit_count if current_visitor else 0 }}</h3>
                        <p class="text-muted">Your Visits</p>
                    </div>
                </div>
            </div>
        </div>
    </div>
</div>
{% endblock %}

# templates/hello.html
{% extends 'base.html' %}

{% block title %}Hello, {{ name }}!{% endblock %}

{% block content %}
<div class="row">
    <div class="col-md-6 mx-auto">
        <div class="card">
            <div class="card-header bg-success text-white">
                <h2>👋 Hello, {{ name }}!</h2>
            </div>
            <div class="card-body text-center">
                <p class="lead">Welcome to the Flask application!</p>
                <p class="text-muted">
                    Current time: {{ moment().format('MMMM Do YYYY, h:mm:ss a') }}
                </p>

                <div class="mt-4">
                    <a href="{{ url_for('index') }}" class="btn btn-secondary">Back to Home</a>
                    <a href="{{ url_for('greet') }}" class="btn btn-primary">Create a Greeting</a>
                    <a href="{{ url_for('greeting_list') }}" class="btn btn-success">View All Greetings</a>
                </div>
            </div>
        </div>
    </div>
</div>
{% endblock %}

# templates/forms/greeting_form.html
{% extends 'base.html' %}

{% block title %}{{ title }}{% endblock %}

{% block content %}
<div class="row">
    <div class="col-md-6 mx-auto">
        <div class="card">
            <div class="card-header">
                <h3>{{ title }}</h3>
            </div>
            <div class="card-body">
                <form method="POST" novalidate>
                    {{ form.hidden_tag() }}

                    <div class="mb-3">
                        {{ form.name.label(class="form-label") }}
                        {{ form.name(class="form-control" + (" is-invalid" if form.name.errors else "")) }}
                        {% if form.name.errors %}
                            <div class="invalid-feedback">
                                {% for error in form.name.errors %}
                                    <span>{{ error }}</span>
                                {% endfor %}
                            </div>
                        {% endif %}
                    </div>

                    <div class="mb-3">
                        {{ form.message.label(class="form-label") }}
                        {{ form.message(class="form-control" + (" is-invalid" if form.message.errors else ""), rows="4") }}
                        {% if form.message.errors %}
                            <div class="invalid-feedback">
                                {% for error in form.message.errors %}
                                    <span>{{ error }}</span>
                                {% endfor %}
                            </div>
                        {% endif %}
                    </div>

                    <div class="d-grid">
                        {{ form.submit(class="btn btn-primary") }}
                    </div>
                </form>
            </div>
        </div>

        {% if greeting %}
        <div class="card mt-3">
            <div class="card-header">
                <h5>Current Greeting</h5>
            </div>
            <div class="card-body">
                <p><strong>Name:</strong> {{ greeting.name }}</p>
                <p><strong>Message:</strong> {{ greeting.message }}</p>
                <p><small class="text-muted">Created: {{ greeting.created_at|format_date }}</small></p>
            </div>
        </div>
        {% endif %}
    </div>
</div>
{% endblock %}

# static/css/style.css
body {
    font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
    background-color: #f8f9fa;
}

.card {
    box-shadow: 0 0.125rem 0.25rem rgba(0, 0, 0, 0.075);
    border: 1px solid rgba(0, 0, 0, 0.125);
    margin-bottom: 1rem;
}

.card-header {
    background-color: #007bff;
    border-bottom: 1px solid rgba(0, 0, 0, 0.125);
}

.navbar-brand {
    font-weight: bold;
    font-size: 1.5rem;
}

footer {
    background-color: #343a40;
    color: white;
    margin-top: 2rem;
}

.btn {
    border-radius: 0.375rem;
}

.alert {
    border: none;
    border-radius: 0.5rem;
}

/* Custom animations */
.card:hover {
    transform: translateY(-2px);
    transition: transform 0.2s ease-in-out;
}

/* Loading spinner */
.loading {
    display: inline-block;
    width: 20px;
    height: 20px;
    border: 3px solid rgba(0,0,0,.1);
    border-radius: 50%;
    border-top-color: #007bff;
    animation: spin 1s ease-in-out infinite;
}

@keyframes spin {
    to { transform: rotate(360deg); }
}

# 16. Run the Application
"""
# Install dependencies:
pip install -r requirements.txt

# Initialize database:
flask init-db
flask seed-db

# Run development server:
flask run

# Visit these URLs:
# http://127.0.0.1:5000/ - Main page
# http://127.0.0.1:5000/hello - Simple hello
# http://127.0.0.1:5000/hello/John - Personalized hello
# http://127.0.0.1:5000/greet - Create greeting form
# http://127.0.0.1:5000/greetings - List all greetings
# http://127.0.0.1:5000/api/hello - API endpoint
# http://127.0.0.1:5000/api/greetings - API greetings list
"""

💻 Flask RESTful API python

🟡 intermediate ⭐⭐⭐⭐

Advanced Flask REST API with SQLAlchemy, authentication, pagination, and API best practices

⏱️ 55 min 🏷️ flask, rest api, sqlalchemy, jwt, authentication
Prerequisites: Flask basics, REST API concepts, Database design, Authentication
# Flask RESTful API Examples

# 1. requirements.txt
"""
Flask==2.3.3
Flask-SQLAlchemy==3.0.5
Flask-RESTful==0.3.10
Flask-Migrate==4.0.5
Flask-JWT-Extended==4.5.2
Flask-CORS==4.0.0
Flask-Marshmallow==0.15.0
marshmallow-sqlalchemy==0.29.0
python-dotenv==1.0.0
Werkzeug==2.3.7
psycopg2-binary==2.9.7
gunicorn==21.2.0
"""

# 2. Project Structure
"""
flask_api/
├── app/
│   ├── __init__.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── post.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── auth.py
│   │   ├── users.py
│   │   └── posts.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   └── post.py
│   ├── utils/
│   │   ├── __init__.py
│   │   ├── decorators.py
│   │   └── helpers.py
│   └── config.py
├── migrations/
├── tests/
├── run.py
├── requirements.txt
└── .env
"""

# 3. Configuration (app/config.py)
import os
from datetime import timedelta

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///api.db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-string'
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=24)
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)

class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_ECHO = True

class TestingConfig(Config):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=5)

class ProductionConfig(Config):
    DEBUG = False

config = {
    'development': DevelopmentConfig,
    'testing': TestingConfig,
    'production': ProductionConfig,
    'default': DevelopmentConfig
}

# 4. Application Factory (app/__init__.py)
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_jwt_extended import JWTManager
from flask_cors import CORS
from flask_marshmallow import Marshmallow
from app.config import config

# Initialize extensions
db = SQLAlchemy()
migrate = Migrate()
jwt = JWTManager()
ma = Marshmallow()

def create_app(config_name='default'):
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    # Initialize extensions
    db.init_app(app)
    migrate.init_app(app, db)
    jwt.init_app(app)
    ma.init_app(app)
    CORS(app)

    # Register blueprints
    from app.api.auth import auth_bp
    from app.api.users import users_bp
    from app.api.posts import posts_bp

    app.register_blueprint(auth_bp, url_prefix='/api/auth')
    app.register_blueprint(users_bp, url_prefix='/api/users')
    app.register_blueprint(posts_bp, url_prefix='/api/posts')

    # Error handlers
    @app.errorhandler(404)
    def not_found(error):
        return {'error': 'Not found'}, 404

    @app.errorhandler(500)
    def internal_error(error):
        db.session.rollback()
        return {'error': 'Internal server error'}, 500

    @jwt.expired_token_loader
    def expired_token_callback(jwt_header, jwt_payload):
        return {'message': 'The token has expired'}, 401

    @jwt.invalid_token_loader
    def invalid_token_callback(error):
        return {'message': 'Invalid token'}, 401

    @jwt.unauthorized_loader
    def missing_token_callback(error):
        return {'message': 'Authorization token is required'}, 401

    return app

# 5. Models (app/models/user.py)
from app import db
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime

class User(db.Model):
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(255), nullable=False)
    first_name = db.Column(db.String(50))
    last_name = db.Column(db.String(50))
    bio = db.Column(db.Text)
    avatar_url = db.Column(db.String(255))
    is_active = db.Column(db.Boolean, default=True)
    is_admin = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    last_login = db.Column(db.DateTime)

    # Relationships
    posts = db.relationship('Post', backref='author', lazy='dynamic', cascade='all, delete-orphan')

    def __init__(self, username, email, **kwargs):
        self.username = username
        self.email = email
        for key, value in kwargs.items():
            setattr(self, key, value)

    def set_password(self, password):
        self.password_hash = generate_password_hash(password)

    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

    def update_last_login(self):
        self.last_login = datetime.utcnow()
        db.session.commit()

    def to_dict(self, include_posts=False):
        data = {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'first_name': self.first_name,
            'last_name': self.last_name,
            'bio': self.bio,
            'avatar_url': self.avatar_url,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat(),
            'last_login': self.last_login.isoformat() if self.last_login else None
        }

        if include_posts:
            data['posts'] = [post.to_dict() for post in self.posts]

        return data

    def __repr__(self):
        return f'<User {self.username}>'

# 6. Models (app/models/post.py)
from app import db
from datetime import datetime

class Post(db.Model):
    __tablename__ = 'posts'

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    content = db.Column(db.Text, nullable=False)
    summary = db.Column(db.String(500))
    status = db.Column(db.String(20), default='draft')  # draft, published, archived
    featured_image = db.Column(db.String(255))
    view_count = db.Column(db.Integer, default=0)
    like_count = db.Column(db.Integer, default=0)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    published_at = db.Column(db.DateTime)

    # Foreign key
    author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

    # Relationships
    tags = db.relationship('Tag', secondary='post_tags', backref='posts', lazy='dynamic')
    comments = db.relationship('Comment', backref='post', lazy='dynamic', cascade='all, delete-orphan')

    def __init__(self, title, content, author_id, **kwargs):
        self.title = title
        self.content = content
        self.author_id = author_id
        for key, value in kwargs.items():
            setattr(self, key, value)

    def publish(self):
        self.status = 'published'
        self.published_at = datetime.utcnow()
        db.session.commit()

    def increment_view_count(self):
        self.view_count += 1
        db.session.commit()

    def increment_like_count(self):
        self.like_count += 1
        db.session.commit()

    def to_dict(self, include_author=True, include_comments=False, include_tags=False):
        data = {
            'id': self.id,
            'title': self.title,
            'content': self.content,
            'summary': self.summary,
            'status': self.status,
            'featured_image': self.featured_image,
            'view_count': self.view_count,
            'like_count': self.like_count,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat(),
            'published_at': self.published_at.isoformat() if self.published_at else None
        }

        if include_author:
            data['author'] = self.author.to_dict()

        if include_comments:
            data['comments'] = [comment.to_dict() for comment in self.comments]

        if include_tags:
            data['tags'] = [tag.to_dict() for tag in self.tags]

        return data

    def __repr__(self):
        return f'<Post {self.title}>'

class Tag(db.Model):
    __tablename__ = 'tags'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    slug = db.Column(db.String(50), unique=True, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'slug': self.slug,
            'created_at': self.created_at.isoformat()
        }

    def __repr__(self):
        return f'<Tag {self.name}>'

# Association table for many-to-many relationship
post_tags = db.Table('post_tags',
    db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
    db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)

class Comment(db.Model):
    __tablename__ = 'comments'

    id = db.Column(db.Integer, primary_key=True)
    content = db.Column(db.Text, nullable=False)
    author_name = db.Column(db.String(100))
    author_email = db.Column(db.String(120))
    is_approved = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

    # Foreign key
    post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), nullable=False)

    def to_dict(self):
        return {
            'id': self.id,
            'content': self.content,
            'author_name': self.author_name,
            'author_email': self.author_email,
            'is_approved': self.is_approved,
            'created_at': self.created_at.isoformat()
        }

    def __repr__(self):
        return f'<Comment {self.id}>'

# 7. Schemas (app/schemas/user.py)
from flask_marshmallow import Marshmallow
from marshmallow import validate, fields, post_load
from app.models.user import User

ma = Marshmallow()

class UserSchema(ma.SQLAlchemyAutoSchema):
    class Meta:
        model = User
        load_instance = True
        exclude = ('password_hash',)

    password = fields.Str(load_only=True, required=True,
                         validate=validate.Length(min=8, error='Password must be at least 8 characters long'))
    email = fields.Email(required=True, validate=validate.Email(error='Invalid email format'))
    username = fields.Str(required=True, validate=validate.Length(min=3, max=80))

    @post_load
    def make_user(self, data, **kwargs):
        return User(**data)

class UserUpdateSchema(ma.SQLAlchemyAutoSchema):
    class Meta:
        model = User
        load_instance = True
        exclude = ('password_hash', 'password', 'email', 'id', 'created_at')

    email = fields.Email(validate=validate.Email(error='Invalid email format'))

class UserLoginSchema(ma.Schema):
    username = fields.Str(required=True)
    password = fields.Str(required=True, load_only=True)

# 8. Schemas (app/schemas/post.py)
from app import ma
from app.models.post import Post, Tag, Comment

class TagSchema(ma.SQLAlchemyAutoSchema):
    class Meta:
        model = Tag
        load_instance = True

class CommentSchema(ma.SQLAlchemyAutoSchema):
    class Meta:
        model = Comment
        load_instance = True

class PostSchema(ma.SQLAlchemyAutoSchema):
    class Meta:
        model = Post
        load_instance = True
        include_fk = True

    tags = fields.List(fields.Nested(TagSchema), dump_only=True)
    author = fields.Nested('UserSchema', only=('id', 'username', 'email'), dump_only=True)
    comments = fields.List(fields.Nested(CommentSchema), dump_only=True)

class PostCreateSchema(ma.SQLAlchemyAutoSchema):
    class Meta:
        model = Post
        load_instance = True
        include_fk = True
        exclude = ('id', 'created_at', 'updated_at', 'view_count', 'like_count')

    title = fields.Str(required=True, validate=validate.Length(min=5, max=200))
    content = fields.Str(required=True, validate=validate.Length(min=10))
    status = fields.Str(validate=validate.OneOf(['draft', 'published', 'archived']))

# 9. Authentication Blueprint (app/api/auth.py)
from flask import Blueprint, request, jsonify
from flask_jwt_extended import (
    create_access_token, create_refresh_token, jwt_required,
    get_jwt_identity, get_jwt
)
from app import db, jwt_redis_blocklist
from app.models.user import User
from app.schemas.user import UserSchema, UserLoginSchema
from app.utils.decorators import admin_required

auth_bp = Blueprint('auth', __name__)

user_schema = UserSchema()
user_login_schema = UserLoginSchema()

@auth_bp.route('/register', methods=['POST'])
def register():
    json_data = request.get_json()
    if not json_data:
        return {'error': 'No input data provided'}, 400

    try:
        data = user_login_schema.load(json_data)
    except Exception as err:
        return {'errors': str(err)}, 422

    # Check if user already exists
    if User.query.filter_by(username=data.username).first():
        return {'error': 'Username already exists'}, 409

    if User.query.filter_by(email=data.email).first():
        return {'error': 'Email already exists'}, 409

    # Create new user
    user = User(
        username=data.username,
        email=data.email
    )
    user.set_password(data.password)
    db.session.add(user)
    db.session.commit()

    # Create tokens
    access_token = create_access_token(identity=user.id)
    refresh_token = create_refresh_token(identity=user.id)

    return {
        'message': 'User created successfully',
        'user': user.to_dict(),
        'access_token': access_token,
        'refresh_token': refresh_token
    }, 201

@auth_bp.route('/login', methods=['POST'])
def login():
    json_data = request.get_json()
    if not json_data:
        return {'error': 'No input data provided'}, 400

    try:
        data = user_login_schema.load(json_data)
    except Exception as err:
        return {'errors': str(err)}, 422

    # Find user
    user = User.query.filter(
        (User.username == data.username) | (User.email == data.username)
    ).first()

    if not user or not user.check_password(data.password):
        return {'error': 'Invalid username or password'}, 401

    if not user.is_active:
        return {'error': 'Account is deactivated'}, 401

    # Update last login
    user.update_last_login()

    # Create tokens
    access_token = create_access_token(identity=user.id)
    refresh_token = create_refresh_token(identity=user.id)

    return {
        'message': 'Login successful',
        'user': user.to_dict(),
        'access_token': access_token,
        'refresh_token': refresh_token
    }

@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
    current_user_id = get_jwt_identity()
    new_token = create_access_token(identity=current_user_id)
    return {'access_token': new_token}

@auth_bp.route('/logout', methods=['DELETE'])
@jwt_required()
def logout():
    jti = get_jwt()['jti']
    jwt_redis_blocklist.set(jti, "true", ex=3600)  # Token expires in 1 hour
    return {'message': 'Successfully logged out'}

@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_current_user():
    current_user_id = get_jwt_identity()
    user = User.query.get(current_user_id)

    if not user:
        return {'error': 'User not found'}, 404

    return {
        'user': user.to_dict(include_posts=True)
    }

# 10. Users Blueprint (app/api/users.py)
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from app import db
from app.models.user import User
from app.schemas.user import UserSchema, UserUpdateSchema
from app.utils.decorators import admin_required

users_bp = Blueprint('users', __name__)

user_schema = UserSchema()
user_update_schema = UserUpdateSchema()
users_schema = UserSchema(many=True)

@users_bp.route('/', methods=['GET'])
@jwt_required()
def get_users():
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    search = request.args.get('search', '')

    query = User.query

    if search:
        query = query.filter(
            User.username.ilike(f'%{search}%') |
            User.email.ilike(f'%{search}%') |
            User.first_name.ilike(f'%{search}%') |
            User.last_name.ilike(f'%{search}%')
        )

    users = query.paginate(
        page=page, per_page=per_page, error_out=False
    )

    return {
        'users': users_schema.dump(users.items),
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': users.total,
            'pages': users.pages,
            'has_next': users.has_next,
            'has_prev': users.has_prev
        }
    }

@users_bp.route('/<int:user_id>', methods=['GET'])
@jwt_required()
def get_user(user_id):
    user = User.query.get_or_404(user_id)
    return {'user': user.to_dict(include_posts=True)}

@users_bp.route('/<int:user_id>', methods=['PUT'])
@jwt_required()
def update_user(user_id):
    current_user_id = get_jwt_identity()
    current_user = User.query.get(current_user_id)

    # Check permissions
    if current_user_id != user_id and not current_user.is_admin:
        return {'error': 'Permission denied'}, 403

    user = User.query.get_or_404(user_id)
    json_data = request.get_json()

    if not json_data:
        return {'error': 'No input data provided'}, 400

    try:
        data = user_update_schema.load(json_data)
    except Exception as err:
        return {'errors': str(err)}, 422

    # Update user fields
    for key, value in data.items():
        if hasattr(user, key):
            setattr(user, key, value)

    db.session.commit()

    return {
        'message': 'User updated successfully',
        'user': user.to_dict()
    }

@users_bp.route('/<int:user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
    user = User.query.get_or_404(user_id)
    db.session.delete(user)
    db.session.commit()

    return {'message': 'User deleted successfully'}

@users_bp.route('/<int:user_id>/deactivate', methods=['POST'])
@admin_required
def deactivate_user(user_id):
    user = User.query.get_or_404(user_id)
    user.is_active = False
    db.session.commit()

    return {'message': 'User deactivated successfully'}

@users_bp.route('/<int:user_id>/activate', methods=['POST'])
@admin_required
def activate_user(user_id):
    user = User.query.get_or_404(user_id)
    user.is_active = True
    db.session.commit()

    return {'message': 'User activated successfully'}

# 11. Posts Blueprint (app/api/posts.py)
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from app import db
from app.models.post import Post, Tag
from app.schemas.post import PostSchema, PostCreateSchema, TagSchema
from app.utils.decorators import admin_required

posts_bp = Blueprint('posts', __name__)

post_schema = PostSchema()
post_create_schema = PostCreateSchema()
posts_schema = PostSchema(many=True)
tag_schema = TagSchema()
tags_schema = TagSchema(many=True)

@posts_bp.route('/', methods=['GET'])
def get_posts():
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    status = request.args.get('status', 'published')
    search = request.args.get('search', '')
    author_id = request.args.get('author_id', type=int)

    query = Post.query

    if status != 'all':
        query = query.filter_by(status=status)

    if search:
        query = query.filter(
            Post.title.ilike(f'%{search}%') |
            Post.content.ilike(f'%{search}%')
        )

    if author_id:
        query = query.filter_by(author_id=author_id)

    posts = query.order_by(Post.created_at.desc()).paginate(
        page=page, per_page=per_page, error_out=False
    )

    return {
        'posts': posts_schema.dump(posts.items),
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': posts.total,
            'pages': posts.pages,
            'has_next': posts.has_next,
            'has_prev': posts.has_prev
        }
    }

@posts_bp.route('/<int:post_id>', methods=['GET'])
def get_post(post_id):
    post = Post.query.get_or_404(post_id)

    # Increment view count
    post.increment_view_count()

    return {'post': post.to_dict(include_author=True, include_comments=True, include_tags=True)}

@posts_bp.route('/', methods=['POST'])
@jwt_required()
def create_post():
    current_user_id = get_jwt_identity()
    json_data = request.get_json()

    if not json_data:
        return {'error': 'No input data provided'}, 400

    try:
        data = post_create_schema.load(json_data)
    except Exception as err:
        return {'errors': str(err)}, 422

    post = Post(
        title=data.title,
        content=data.content,
        author_id=current_user_id,
        summary=data.get('summary'),
        status=data.get('status', 'draft'),
        featured_image=data.get('featured_image')
    )

    db.session.add(post)
    db.session.commit()

    # Handle tags if provided
    if 'tags' in json_data:
        for tag_name in json_data['tags']:
            tag = Tag.query.filter_by(name=tag_name).first()
            if not tag:
                tag = Tag(name=tag_name, slug=tag_name.lower().replace(' ', '-'))
                db.session.add(tag)
            post.tags.append(tag)

    db.session.commit()

    return {
        'message': 'Post created successfully',
        'post': post.to_dict(include_author=True)
    }, 201

@posts_bp.route('/<int:post_id>', methods=['PUT'])
@jwt_required()
def update_post(post_id):
    current_user_id = get_jwt_identity()
    post = Post.query.get_or_404(post_id)

    # Check permissions
    if post.author_id != current_user_id:
        return {'error': 'Permission denied'}, 403

    json_data = request.get_json()

    if not json_data:
        return {'error': 'No input data provided'}, 400

    try:
        data = post_create_schema.load(json_data)
    except Exception as err:
        return {'errors': str(err)}, 422

    # Update post fields
    post.title = data.title
    post.content = data.content
    post.summary = data.get('summary', post.summary)
    post.status = data.get('status', post.status)
    post.featured_image = data.get('featured_image', post.featured_image)

    db.session.commit()

    return {
        'message': 'Post updated successfully',
        'post': post.to_dict(include_author=True)
    }

@posts_bp.route('/<int:post_id>', methods=['DELETE'])
@jwt_required()
def delete_post(post_id):
    current_user_id = get_jwt_identity()
    post = Post.query.get_or_404(post_id)

    # Check permissions
    if post.author_id != current_user_id:
        return {'error': 'Permission denied'}, 403

    db.session.delete(post)
    db.session.commit()

    return {'message': 'Post deleted successfully'}

@posts_bp.route('/<int:post_id>/like', methods=['POST'])
def like_post(post_id):
    post = Post.query.get_or_404(post_id)
    post.increment_like_count()
    return {'message': 'Post liked successfully', 'like_count': post.like_count}

@posts_bp.route('/tags', methods=['GET'])
def get_tags():
    tags = Tag.query.all()
    return {'tags': tags_schema.dump(tags)}

# 12. Utility Decorators (app/utils/decorators.py)
from functools import wraps
from flask import jsonify
from flask_jwt_extended import get_jwt_identity
from app.models.user import User

def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)

        if not user or not user.is_admin:
            return jsonify({'error': 'Admin access required'}), 403

        return f(*args, **kwargs)
    return decorated_function

# 13. Application Runner (run.py)
from app import create_app
import os

app = create_app(os.getenv('FLASK_CONFIG') or 'default')

if __name__ == '__main__':
    app.run(debug=True)

# 14. Environment Variables (.env)
"""
FLASK_APP=run.py
FLASK_ENV=development
SECRET_KEY=your-secret-key-here
JWT_SECRET_KEY=your-jwt-secret-key-here
DATABASE_URL=sqlite:///api.db
"""

# 15. Testing the API
"""
# Install dependencies:
pip install -r requirements.txt

# Set up database:
flask db init
flask db migrate -m "Initial migration"
flask db upgrade

# Run the application:
python run.py

# Test API endpoints:

# Register user:
curl -X POST http://localhost:5000/api/auth/register \
  -H "Content-Type: application/json" \
  -d '{"username": "testuser", "email": "[email protected]", "password": "password123"}'

# Login:
curl -X POST http://localhost:5000/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "testuser", "password": "password123"}'

# Get posts (public):
curl http://localhost:5000/api/posts

# Create post (requires auth):
curl -X POST http://localhost:5000/api/posts \
  -H "Authorization: Bearer YOUR_TOKEN_HERE" \
  -H "Content-Type: application/json" \
  -d '{"title": "My First Post", "content": "This is the content of my post.", "status": "published"}'

# Get current user info:
curl -X GET http://localhost:5000/api/auth/me \
  -H "Authorization: Bearer YOUR_TOKEN_HERE"
"""

💻 Flask Advanced Patterns python

🔴 complex ⭐⭐⭐⭐⭐

Advanced Flask patterns including background tasks, caching, websockets, testing, and deployment strategies

⏱️ 70 min 🏷️ flask, advanced, celery, websockets, testing, deployment
Prerequisites: Advanced Flask, Asynchronous programming, Testing, DevOps
# Flask Advanced Patterns

# 1. requirements.txt
"""
Flask==2.3.3
Flask-SQLAlchemy==3.0.5
Flask-Migrate==4.0.5
Flask-Caching==2.1.0
Flask-Celery-Helper==2.0.0
celery==5.3.1
redis==5.0.0
flower==2.0.1
Flask-SocketIO==5.3.5
python-socketio==5.8.0
Flask-Mail==0.9.1
Flask-Admin==1.6.1
APScheduler==3.10.4
gunicorn==21.2.0
pytest==7.4.2
pytest-flask==1.2.0
pytest-cov==4.1.0
factory-boy==3.3.0
"""

# 2. Project Structure with Blueprints
"""
advanced_flask/
├── app/
│   ├── __init__.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── views.py
│   │   └── forms.py
│   ├── auth/
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── views.py
│   │   └── forms.py
│   ├── api/
│   │   ├── __init__.py
│   │   └── resources.py
│   ├── tasks/
│   │   ├── __init__.py
│   │   ├── celery_app.py
│   │   └── tasks.py
│   ├── static/
│   │   ├── css/
│   │   ├── js/
│   │   └── img/
│   └── templates/
│       ├── base.html
│       └── errors/
├── migrations/
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_auth.py
│   └── test_api.py
├── config.py
├── run.py
└── requirements.txt
"""

# 3. Advanced Configuration (config.py)
import os
from datetime import timedelta
from dotenv import load_dotenv

load_dotenv()

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'super-secret-key'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///advanced_flask.db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ENGINE_OPTIONS = {
        'pool_pre_ping': True,
        'pool_recycle': 300,
    }

    # Cache Configuration
    CACHE_TYPE = 'redis'
    CACHE_REDIS_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379/0'
    CACHE_DEFAULT_TIMEOUT = 300

    # Celery Configuration
    CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') or 'redis://localhost:6379/0'
    CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') or 'redis://localhost:6379/0'

    # Mail Configuration
    MAIL_SERVER = os.environ.get('MAIL_SERVER') or 'smtp.gmail.com'
    MAIL_PORT = int(os.environ.get('MAIL_PORT') or 587)
    MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
    MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
    MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')

    # WebSocket Configuration
    SOCKETIO_ASYNC_MODE = 'threading'
    SOCKETIO_CORS_ALLOWED_ORIGINS = "*"

    # Rate Limiting
    RATELIMIT_STORAGE_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379/1'

    # Background Tasks
    SCHEDULER_API_ENABLED = True

class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_ECHO = True

class TestingConfig(Config):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
    WTF_CSRF_ENABLED = False

class ProductionConfig(Config):
    DEBUG = False
    SQLALCHEMY_ENGINE_OPTIONS = {
        'pool_pre_ping': True,
        'pool_recycle': 300,
        'pool_size': 20,
        'max_overflow': 0
    }

config = {
    'development': DevelopmentConfig,
    'testing': TestingConfig,
    'production': ProductionConfig,
    'default': DevelopmentConfig
}

# 4. Advanced Application Factory with Extensions
# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_caching import Cache
from flask_mail import Mail
from flask_socketio import SocketIO
from flask_cors import CORS
from apscheduler.schedulers.background import BackgroundScheduler
import logging
from logging.handlers import RotatingFileHandler
import os

# Initialize extensions
db = SQLAlchemy()
migrate = Migrate()
cache = Cache()
mail = Mail()
socketio = SocketIO()
scheduler = BackgroundScheduler()

def create_app(config_name='default'):
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    # Initialize extensions
    db.init_app(app)
    migrate.init_app(app, db)
    cache.init_app(app)
    mail.init_app(app)
    socketio.init_app(app, cors_allowed_origins="*")
    CORS(app)

    # Register blueprints
    from app.core.views import core_bp
    from app.auth.views import auth_bp
    from app.api.resources import api_bp

    app.register_blueprint(core_bp)
    app.register_blueprint(auth_bp, url_prefix='/auth')
    app.register_blueprint(api_bp, url_prefix='/api')

    # Setup logging
    if not app.debug and not app.testing:
        if not os.path.exists('logs'):
            os.mkdir('logs')
        file_handler = RotatingFileHandler('logs/advanced_flask.log', maxBytes=10240, backupCount=10)
        file_handler.setFormatter(logging.Formatter(
            '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'))
        file_handler.setLevel(logging.INFO)
        app.logger.addHandler(file_handler)
        app.logger.setLevel(logging.INFO)
        app.logger.info('Advanced Flask startup')

    # Start scheduler
    scheduler.start()

    return app

# 5. Custom Model Mixins
# app/core/models.py
from app import db
from datetime import datetime
from sqlalchemy_utils import URLType
import uuid

class TimestampMixin:
    """Mixin for timestamp fields"""
    created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow,
                           onupdate=datetime.utcnow, nullable=False)

class UUIDMixin:
    """Mixin for UUID primary key"""
    id = db.Column(db.UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)

class SoftDeleteMixin:
    """Mixin for soft delete functionality"""
    is_deleted = db.Column(db.Boolean, default=False)
    deleted_at = db.Column(db.DateTime)

    def soft_delete(self):
        self.is_deleted = True
        self.deleted_at = datetime.utcnow()

    def restore(self):
        self.is_deleted = False
        self.deleted_at = None

class AuditMixin:
    """Mixin for audit trail"""
    created_by = db.Column(db.Integer, db.ForeignKey('users.id'))
    updated_by = db.Column(db.Integer, db.ForeignKey('users.id'))

    def set_created_by(self, user_id):
        self.created_by = user_id

    def set_updated_by(self, user_id):
        self.updated_by = user_id

class User(UUIDMixin, TimestampMixin, SoftDeleteMixin, db.Model):
    __tablename__ = 'users'

    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(255))
    first_name = db.Column(db.String(50))
    last_name = db.Column(db.String(50))
    avatar_url = db.Column(URLType)
    is_active = db.Column(db.Boolean, default=True)
    email_verified = db.Column(db.Boolean, default=False)
    last_login = db.Column(db.DateTime)

    # Relationships
    profile = db.relationship('UserProfile', backref='user', uselist=False, lazy='joined')

    def __repr__(self):
        return f'<User {self.username}>'

class UserProfile(UUIDMixin, TimestampMixin, db.Model):
    __tablename__ = 'user_profiles'

    user_id = db.Column(db.UUID(as_uuid=True), db.ForeignKey('users.id'), nullable=False)
    bio = db.Column(db.Text)
    phone = db.Column(db.String(20))
    address = db.Column(db.Text)
    website = db.Column(URLType)
    date_of_birth = db.Column(db.Date)
    timezone = db.Column(db.String(50), default='UTC')
    language = db.Column(db.String(10), default='en')

# 6. Custom Query Classes
from sqlalchemy import orm

class SoftDeleteQuery(orm.Query):
    """Custom query class for soft delete functionality"""

    def __new__(cls, *args, **kwargs):
        query = super().__new__(cls)
        if kwargs.get('include_deleted', False):
            return query
        return query.filter_by(is_deleted=False)

    def with_deleted(self):
        """Include deleted records in query"""
        return self.enable_assertions(False).options(orm.undefer('*'))

    def deleted(self):
        """Query only deleted records"""
        return self.filter_by(is_deleted=True)

# 7. Custom Model Base Class
class BaseModel(db.Model):
    """Base model with common functionality"""
    __abstract__ = True

    query_class = SoftDeleteQuery

    def save(self, commit=True):
        """Save model instance"""
        db.session.add(self)
        if commit:
            db.session.commit()
        return self

    def delete(self, soft_delete=True):
        """Delete model instance (soft delete by default)"""
        if soft_delete and hasattr(self, 'soft_delete'):
            self.soft_delete()
            db.session.commit()
        else:
            db.session.delete(self)
            if commit:
                db.session.commit()

    @classmethod
    def get_by_id(cls, id):
        """Get instance by ID"""
        return cls.query.get(id)

    @classmethod
    def get_or_404(cls, id):
        """Get instance by ID or raise 404"""
        instance = cls.get_by_id(id)
        if not instance:
            return None
        return instance

    def to_dict(self, include_relationships=False):
        """Convert model instance to dictionary"""
        result = {}
        for column in self.__table__.columns:
            value = getattr(self, column.name)
            if isinstance(value, datetime):
                value = value.isoformat()
            result[column.name] = value

        if include_relationships:
            for relationship in self.__mapper__.relationships:
                if relationship.lazy not in ['dynamic', 'subquery']:
                    related = getattr(self, relationship.key)
                    if related:
                        result[relationship.key] = related.to_dict() if hasattr(related, 'to_dict') else str(related)

        return result

# 8. Celery Configuration and Tasks
# app/tasks/celery_app.py
from celery import Celery
from app import create_app
import os

def make_celery(app_name=__name__):
    """Create celery instance"""
    flask_app = create_app(os.getenv('FLASK_CONFIG') or 'default')
    celery = Celery(
        app_name,
        broker=flask_app.config['CELERY_BROKER_URL'],
        backend=flask_app.config['CELERY_RESULT_BACKEND']
    )
    celery.conf.update(flask_app.config)

    class ContextTask(celery.Task):
        """Make celery tasks work with Flask app context."""
        def __call__(self, *args, **kwargs):
            with flask_app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

celery = make_celery()

# app/tasks/tasks.py
from .celery_app import celery
from app import mail, db
from flask_mail import Message
from datetime import datetime, timedelta
from app.core.models import User
import logging

logger = logging.getLogger(__name__)

@celery.task(bind=True)
def send_async_email(self, subject, recipients, body, html=None):
    """Send email asynchronously"""
    try:
        message = Message(
            subject=subject,
            recipients=recipients,
            body=body,
            html=html
        )
        mail.send(message)
        return {'status': 'success', 'message': 'Email sent successfully'}
    except Exception as e:
        logger.error(f"Failed to send email: {str(e)}")
        return {'status': 'error', 'message': str(e)}

@celery.task
def send_welcome_email(user_id):
    """Send welcome email to new user"""
    user = User.query.get(user_id)
    if user:
        subject = 'Welcome to our platform!'
        body = f'Hello {user.username}, welcome to our amazing platform!'
        html = f'<h1>Welcome {user.username}!</h1><p>Thank you for joining us.</p>'

        return send_async_email.delay(
            subject=subject,
            recipients=[user.email],
            body=body,
            html=html
        )
    return {'status': 'error', 'message': 'User not found'}

@celery.task
def cleanup_old_records():
    """Clean up old records (scheduled task)"""
    cutoff_date = datetime.utcnow() - timedelta(days=30)

    # Clean up old soft-deleted records
    from app.core.models import User
    old_records = User.query.filter(
        User.is_deleted == True,
        User.deleted_at < cutoff_date
    ).all()

    count = len(old_records)
    for record in old_records:
        db.session.delete(record)

    db.session.commit()

    return {'status': 'success', 'deleted_count': count}

@celery.task
def generate_analytics_report():
    """Generate daily analytics report"""
    from app.core.models import User

    # Generate basic analytics
    total_users = User.query.filter_by(is_deleted=False).count()
    new_users_today = User.query.filter(
        User.is_deleted == False,
        User.created_at >= datetime.utcnow().date()
    ).count()

    report = {
        'date': datetime.utcnow().date().isoformat(),
        'total_users': total_users,
        'new_users_today': new_users_today,
        'generated_at': datetime.utcnow().isoformat()
    }

    # Send report to admin
    return send_async_email.delay(
        subject='Daily Analytics Report',
        recipients=['[email protected]'],
        body=str(report)
    )

# 9. WebSocket Events
# app/core/websocket_events.py
from flask_socketio import emit, join_room, leave_room
from app import socketio
from flask_jwt_extended import jwt_required, get_jwt_identity
import logging

logger = logging.getLogger(__name__)

@socketio.on('connect')
def handle_connect():
    """Handle client connection"""
    logger.info('Client connected')
    emit('status', {'message': 'Connected to server'})

@socketio.on('disconnect')
def handle_disconnect():
    """Handle client disconnection"""
    logger.info('Client disconnected')

@socketio.on('join_room')
def handle_join_room(data):
    """Handle joining a room"""
    room = data.get('room')
    if room:
        join_room(room)
        emit('status', {'message': f'Joined room {room}'}, room=room)

@socketio.on('leave_room')
def handle_leave_room(data):
    """Handle leaving a room"""
    room = data.get('room')
    if room:
        leave_room(room)
        emit('status', {'message': f'Left room {room}'}, room=room)

@socketio.on('send_message')
@jwt_required()
def handle_message(data):
    """Handle chat messages"""
    user_id = get_jwt_identity()
    room = data.get('room', 'default')
    message = data.get('message')

    if message:
        message_data = {
            'user_id': user_id,
            'message': message,
            'timestamp': datetime.utcnow().isoformat()
        }
        emit('new_message', message_data, room=room)

# 10. Custom Decorators
# app/utils/decorators.py
from functools import wraps
from flask import jsonify, request, g
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.core.models import User
from app import cache
import time

def rate_limit(limit=100, per=60, scope_func=lambda: request.remote_addr):
    """Rate limiting decorator"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            key = f"rate_limit_{scope_func()}_{f.__name__}"

            # Check current count
            current = cache.get(key) or 0

            if current >= limit:
                return jsonify({'error': 'Rate limit exceeded'}), 429

            # Increment counter
            cache.set(key, current + 1, timeout=per)

            return f(*args, **kwargs)
        return decorated_function
    return decorator

def cache_result(timeout=300, key_prefix=None):
    """Cache function results"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            cache_key = f"{key_prefix or f.__name__}_{str(args)}_{str(kwargs)}"

            # Try to get cached result
            cached_result = cache.get(cache_key)
            if cached_result:
                return cached_result

            # Execute function and cache result
            result = f(*args, **kwargs)
            cache.set(cache_key, result, timeout=timeout)

            return result
        return decorated_function
    return decorator

def timing_middleware(f):
    """Measure function execution time"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        start_time = time.time()
        result = f(*args, **kwargs)
        end_time = time.time()

        logger.info(f"{f.__name__} executed in {end_time - start_time:.4f} seconds")

        return result
    return decorated_function

def load_user(f):
    """Load user into flask.g for authenticated requests"""
    @wraps(f)
    @jwt_required()
    def decorated_function(*args, **kwargs):
        user_id = get_jwt_identity()
        g.user = User.query.get(user_id)
        return f(*args, **kwargs)
    return decorated_function

# 11. Advanced Views with Caching
# app/core/views.py
from flask import Blueprint, render_template, request, jsonify, current_app
from app.core.models import User, UserProfile
from app.utils.decorators import cache_result, rate_limit, timing_middleware
from app.tasks.tasks import send_welcome_email
from app import db, cache
import logging

logger = logging.getLogger(__name__)

core_bp = Blueprint('core', __name__)

@core_bp.route('/')
@cache_result(timeout=300)
def index():
    """Home page with caching"""
    stats = {
        'total_users': User.query.count(),
        'active_users': User.query.filter_by(is_active=True).count()
    }
    return render_template('index.html', stats=stats)

@core_bp.route('/users')
@rate_limit(limit=30, per=60)  # 30 requests per minute
@timing_middleware
def list_users():
    """List users with rate limiting and timing"""
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 20, type=int)

    users = User.query.paginate(
        page=page, per_page=per_page, error_out=False
    )

    return render_template('users.html', users=users)

@core_bp.route('/register', methods=['POST'])
def register():
    """User registration with async email"""
    data = request.get_json()

    # Create user logic here...
    user = User(username=data['username'], email=data['email'])
    user.set_password(data['password'])
    db.session.add(user)
    db.session.commit()

    # Send welcome email asynchronously
    send_welcome_email.delay(user.id)

    return jsonify({'message': 'User created successfully', 'user_id': str(user.id)}), 201

# 12. Testing Setup
# tests/conftest.py
import pytest
from app import create_app, db
from app.core.models import User
from app import cache

@pytest.fixture
def app():
    """Create application for testing"""
    app = create_app('testing')

    with app.app_context():
        db.create_all()
        yield app
        db.drop_all()

@pytest.fixture
def client(app):
    """Create test client"""
    return app.test_client()

@pytest.fixture
def runner(app):
    """Create CLI runner"""
    return app.test_cli_runner()

@pytest.fixture
def auth_headers(client):
    """Create authenticated user and return headers"""
    # Register user
    client.post('/auth/register', json={
        'username': 'testuser',
        'email': '[email protected]',
        'password': 'password123'
    })

    # Login and get token
    response = client.post('/auth/login', json={
        'username': 'testuser',
        'password': 'password123'
    })

    token = response.json['access_token']
    return {'Authorization': f'Bearer {token}'}

@pytest.fixture
def sample_user(app):
    """Create sample user for testing"""
    user = User(username='sampleuser', email='[email protected]')
    user.set_password('password123')
    db.session.add(user)
    db.session.commit()
    return user

# tests/test_auth.py
def test_register_user(client):
    """Test user registration"""
    response = client.post('/auth/register', json={
        'username': 'newuser',
        'email': '[email protected]',
        'password': 'password123'
    })

    assert response.status_code == 201
    assert 'user_id' in response.json

def test_login_user(client, sample_user):
    """Test user login"""
    response = client.post('/auth/login', json={
        'username': 'sampleuser',
        'password': 'password123'
    })

    assert response.status_code == 200
    assert 'access_token' in response.json

def test_protected_route(client, auth_headers):
    """Test accessing protected route"""
    response = client.get('/auth/me', headers=auth_headers)
    assert response.status_code == 200

def test_rate_limiting(client):
    """Test rate limiting"""
    for _ in range(35):  # Exceed rate limit of 30 per minute
        response = client.get('/users')

    # Last request should be rate limited
    assert response.status_code == 429

# 13. Deployment Configuration
# gunicorn_config.py
import multiprocessing

# Server socket
bind = "0.0.0.0:8000"
backlog = 2048

# Worker processes
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "sync"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
preload_app = True

# Logging
accesslog = "logs/access.log"
errorlog = "logs/error.log"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'

# Process naming
proc_name = 'advanced_flask'

# Server mechanics
daemon = False
pidfile = '/tmp/gunicorn.pid'
user = None
group = None
tmp_upload_dir = None

# 14. Docker Configuration
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Create non-root user
RUN useradd --create-home --shell /bin/bash app \
    && chown -R app:app /app
USER app

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["gunicorn", "--config", "gunicorn_config.py", "run:app"]

# docker-compose.yml
version: '3.8'

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - FLASK_CONFIG=production
      - DATABASE_URL=postgresql://user:password@db:5432/advanced_flask
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    volumes:
      - ./logs:/app/logs

  db:
    image: postgres:15
    environment:
      - POSTGRES_DB=advanced_flask
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  celery:
    build: .
    command: celery -A app.tasks.celery worker --loglevel=info
    environment:
      - FLASK_CONFIG=production
      - DATABASE_URL=postgresql://user:password@db:5432/advanced_flask
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis

  celery-beat:
    build: .
    command: celery -A app.tasks.celery beat --loglevel=info
    environment:
      - FLASK_CONFIG=production
      - DATABASE_URL=postgresql://user:password@db:5432/advanced_flask
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis

  flower:
    build: .
    command: celery -A app.tasks.celery flower
    ports:
      - "5555:5555"
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis

volumes:
  postgres_data:

# 15. Usage Instructions
"""
# Setup environment:
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install -r requirements.txt

# Set up environment variables:
cp .env.example .env
# Edit .env with your configuration

# Database migrations:
flask db init
flask db migrate -m "Initial migration"
flask db upgrade

# Run development server:
python run.py

# Run with Celery worker:
celery -A app.tasks.celery worker --loglevel=info

# Run Celery beat scheduler:
celery -A app.tasks.celery beat --loglevel=info

# Run tests:
pytest

# Run tests with coverage:
pytest --cov=app --cov-report=html

# Production deployment:
gunicorn --config gunicorn_config.py run:app

# Docker deployment:
docker-compose up -d

# Monitor Celery tasks:
flower -A app.tasks.celery
"""