Ejemplos SQLite Database

Ejemplos de base de datos SQLite incluyendo operaciones básicas, consultas avanzadas, migraciones y gestión

💻 Operaciones SQLite Básicas python

🟢 simple

Operaciones CRUD fundamentales, creación de tablas y consultas básicas

⏱️ 20 min 🏷️ sqlite, database, crud, python
Prerequisites: Python 3.6+, Basic SQL knowledge
# SQLite Basic Operations
# Python - basic_operations.py

import sqlite3
import os
from datetime import datetime
import json

class SQLiteBasicOperations:
    def __init__(self, db_path='sample.db'):
        self.db_path = db_path
        self.connection = None
        self.cursor = None

    def connect(self):
        """Connect to SQLite database"""
        try:
            self.connection = sqlite3.connect(self.db_path)
            self.cursor = self.connection.cursor()
            # Enable foreign keys
            self.cursor.execute("PRAGMA foreign_keys = ON")
            # Set row factory to return dictionaries
            self.connection.row_factory = sqlite3.Row
            print(f"✅ Connected to database: {self.db_path}")
            return True
        except Exception as e:
            print(f"❌ Error connecting to database: {e}")
            return False

    def disconnect(self):
        """Close database connection"""
        if self.connection:
            self.connection.close()
            print(f"✅ Disconnected from database: {self.db_path}")

    def create_users_table(self):
        """Create users table"""
        try:
            self.cursor.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username VARCHAR(50) UNIQUE NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    first_name VARCHAR(50),
                    last_name VARCHAR(50),
                    date_of_birth DATE,
                    is_active BOOLEAN DEFAULT 1,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            self.connection.commit()
            print("✅ Users table created successfully")
            return True
        except Exception as e:
            print(f"❌ Error creating users table: {e}")
            return False

    def create_posts_table(self):
        """Create posts table with foreign key"""
        try:
            self.cursor.execute('''
                CREATE TABLE IF NOT EXISTS posts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER NOT NULL,
                    title VARCHAR(200) NOT NULL,
                    content TEXT,
                    category VARCHAR(50),
                    tags TEXT,
                    view_count INTEGER DEFAULT 0,
                    is_published BOOLEAN DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
                )
            ''')

            self.connection.commit()
            print("✅ Posts table created successfully")
            return True
        except Exception as e:
            print(f"❌ Error creating posts table: {e}")
            return False

    def insert_user(self, username, email, password_hash, first_name=None, last_name=None, date_of_birth=None):
        """Insert a new user"""
        try:
            self.cursor.execute('''
                INSERT INTO users (username, email, password_hash, first_name, last_name, date_of_birth)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (username, email, password_hash, first_name, last_name, date_of_birth))

            user_id = self.cursor.lastrowid
            self.connection.commit()
            print(f"✅ User '{username}' created with ID: {user_id}")
            return user_id
        except sqlite3.IntegrityError as e:
            print(f"❌ Error inserting user (duplicate data): {e}")
            return None
        except Exception as e:
            print(f"❌ Error inserting user: {e}")
            return None

    def get_user(self, user_id):
        """Get user by ID"""
        try:
            self.cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
            user = self.cursor.fetchone()

            if user:
                print(f"✅ Found user: {dict(user)}")
                return dict(user)
            else:
                print(f"❌ User with ID {user_id} not found")
                return None
        except Exception as e:
            print(f"❌ Error getting user: {e}")
            return None

    def update_user(self, user_id, **kwargs):
        """Update user information"""
        if not kwargs:
            print("❌ No fields to update")
            return False

        try:
            # Build SET clause dynamically
            set_clause = ", ".join([f"{key} = ?" for key in kwargs.keys()])
            values = list(kwargs.values()) + [user_id]

            # Add updated_at timestamp
            set_clause += ", updated_at = CURRENT_TIMESTAMP"

            query = f"UPDATE users SET {set_clause} WHERE id = ?"
            self.cursor.execute(query, values)

            if self.cursor.rowcount > 0:
                self.connection.commit()
                print(f"✅ User {user_id} updated successfully")
                return True
            else:
                print(f"❌ No changes made to user {user_id}")
                return False
        except Exception as e:
            print(f"❌ Error updating user: {e}")
            return False

    def delete_user(self, user_id):
        """Delete a user (cascade deletes related posts)"""
        try:
            self.cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))

            if self.cursor.rowcount > 0:
                self.connection.commit()
                print(f"✅ User {user_id} deleted successfully")
                return True
            else:
                print(f"❌ User {user_id} not found")
                return False
        except Exception as e:
            print(f"❌ Error deleting user: {e}")
            return False

    def list_users(self, active_only=True, limit=None):
        """List users with optional filters"""
        try:
            query = "SELECT * FROM users"
            params = []

            if active_only:
                query += " WHERE is_active = 1"

            query += " ORDER BY created_at DESC"

            if limit:
                query += " LIMIT ?"
                params.append(limit)

            self.cursor.execute(query, params)
            users = self.cursor.fetchall()

            print(f"✅ Found {len(users)} users:")
            for user in users:
                user_dict = dict(user)
                print(f"   ID: {user_dict['id']}, Username: {user_dict['username']}, Email: {user_dict['email']}")

            return [dict(user) for user in users]
        except Exception as e:
            print(f"❌ Error listing users: {e}")
            return []

    def insert_post(self, user_id, title, content=None, category=None, tags=None):
        """Insert a new post"""
        try:
            # Convert tags list to JSON string
            tags_json = json.dumps(tags) if tags else None

            self.cursor.execute('''
                INSERT INTO posts (user_id, title, content, category, tags)
                VALUES (?, ?, ?, ?, ?)
            ''', (user_id, title, content, category, tags_json))

            post_id = self.cursor.lastrowid
            self.connection.commit()
            print(f"✅ Post '{title}' created with ID: {post_id}")
            return post_id
        except Exception as e:
            print(f"❌ Error inserting post: {e}")
            return None

    def get_user_posts(self, user_id, published_only=False):
        """Get all posts by a user"""
        try:
            query = '''
                SELECT p.*, u.username as author_username
                FROM posts p
                JOIN users u ON p.user_id = u.id
                WHERE p.user_id = ?
            '''
            params = [user_id]

            if published_only:
                query += " AND p.is_published = 1"

            query += " ORDER BY p.created_at DESC"

            self.cursor.execute(query, params)
            posts = self.cursor.fetchall()

            print(f"✅ Found {len(posts)} posts by user {user_id}:")
            for post in posts:
                post_dict = dict(post)
                # Parse tags from JSON
                if post_dict['tags']:
                    try:
                        post_dict['tags'] = json.loads(post_dict['tags'])
                    except:
                        post_dict['tags'] = []
                print(f"   ID: {post_dict['id']}, Title: {post_dict['title']}")

            return [dict(post) for post in posts]
        except Exception as e:
            print(f"❌ Error getting user posts: {e}")
            return []

    def search_posts(self, keyword, category=None, limit=None):
        """Search posts by keyword and/or category"""
        try:
            query = '''
                SELECT p.*, u.username as author_username
                FROM posts p
                JOIN users u ON p.user_id = u.id
                WHERE (p.title LIKE ? OR p.content LIKE ?)
            '''
            params = [f'%{keyword}%', f'%{keyword}%']

            if category:
                query += " AND p.category = ?"
                params.append(category)

            query += " ORDER BY p.created_at DESC"

            if limit:
                query += " LIMIT ?"
                params.append(limit)

            self.cursor.execute(query, params)
            posts = self.cursor.fetchall()

            print(f"✅ Found {len(posts)} posts matching '{keyword}':")
            for post in posts:
                post_dict = dict(post)
                print(f"   ID: {post_dict['id']}, Title: {post_dict['title']}, Author: {post_dict['author_username']}")

            return [dict(post) for post in posts]
        except Exception as e:
            print(f"❌ Error searching posts: {e}")
            return []

    def get_database_info(self):
        """Get database information and statistics"""
        try:
            info = {}

            # Get database size
            size_bytes = os.path.getsize(self.db_path)
            info['size_bytes'] = size_bytes
            info['size_mb'] = round(size_bytes / (1024 * 1024), 2)

            # Get table info
            self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
            tables = [row[0] for row in self.cursor.fetchall()]
            info['tables'] = tables

            # Get record counts
            info['record_counts'] = {}
            for table in tables:
                self.cursor.execute(f"SELECT COUNT(*) FROM {table}")
                count = self.cursor.fetchone()[0]
                info['record_counts'][table] = count

            print("📊 Database Information:")
            print(f"   Size: {info['size_mb']} MB")
            print(f"   Tables: {', '.join(info['tables'])}")
            print(f"   Record counts: {info['record_counts']}")

            return info
        except Exception as e:
            print(f"❌ Error getting database info: {e}")
            return None

    def backup_database(self, backup_path=None):
        """Create a backup of the database"""
        try:
            if not backup_path:
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                backup_path = f"backup_{timestamp}.db"

            # Close current connection
            original_connection = self.connection
            self.disconnect()

            # Copy database file
            import shutil
            shutil.copy2(self.db_path, backup_path)

            print(f"✅ Database backed up to: {backup_path}")

            # Reconnect
            self.connection = original_connection
            self.cursor = self.connection.cursor()

            return backup_path
        except Exception as e:
            print(f"❌ Error backing up database: {e}")
            return None

# Example usage
def main():
    # Initialize database manager
    db = SQLiteBasicOperations('blog.db')

    # Connect to database
    if not db.connect():
        return

    try:
        # Create tables
        print("\n📝 Creating tables...")
        db.create_users_table()
        db.create_posts_table()

        # Insert sample users
        print("\n👤 Inserting sample users...")
        user1_id = db.insert_user(
            username='johndoe',
            email='[email protected]',
            password_hash='hashed_password_1',
            first_name='John',
            last_name='Doe',
            date_of_birth='1990-01-15'
        )

        user2_id = db.insert_user(
            username='janedoe',
            email='[email protected]',
            password_hash='hashed_password_2',
            first_name='Jane',
            last_name='Doe',
            date_of_birth='1992-05-20'
        )

        # Insert sample posts
        print("\n📝 Inserting sample posts...")
        if user1_id:
            db.insert_post(
                user_id=user1_id,
                title='My First Blog Post',
                content='This is the content of my first blog post.',
                category='Technology',
                tags=['python', 'database', 'sqlite']
            )

            db.insert_post(
                user_id=user1_id,
                title='SQLite Best Practices',
                content='Here are some best practices for using SQLite...',
                category='Database',
                tags=['sqlite', 'best-practices', 'tutorial']
            )

        if user2_id:
            db.insert_post(
                user_id=user2_id,
                title='Getting Started with Python',
                content='Python is a great programming language for beginners...',
                category='Programming',
                tags=['python', 'beginner', 'tutorial']
            )

        # List all users
        print("\n👥 Listing all users:")
        users = db.list_users()

        # Get user details
        if user1_id:
            print("\n👤 Getting user details:")
            user = db.get_user(user1_id)

        # Get user posts
        if user1_id:
            print("\n📝 Getting user posts:")
            posts = db.get_user_posts(user1_id)

        # Search posts
        print("\n🔍 Searching posts:")
        search_results = db.search_posts('python')

        # Get database info
        print("\n📊 Database information:")
        db_info = db.get_database_info()

        # Create backup
        print("\n💾 Creating backup:")
        backup_path = db.backup_database()

        # Update user
        if user1_id:
            print("\n✏️ Updating user:")
            db.update_user(user1_id, first_name='Johnathan', last_name='Doe-Smith')

        # Clean up
        print("\n🧹 Cleaning up...")
        if user1_id:
            db.delete_user(user1_id)
        if user2_id:
            db.delete_user(user2_id)

    finally:
        # Disconnect from database
        db.disconnect()

if __name__ == "__main__":
    main()

💻 Consultas Avanzadas SQLite sql

🟡 intermediate ⭐⭐⭐

Consultas complejas, joins, agregaciones, funciones de ventana y optimización

⏱️ 45 min 🏷️ sqlite, advanced sql, database, queries
Prerequisites: SQLite 3.x, Intermediate SQL knowledge, Understanding of window functions
-- Advanced SQLite Queries
-- SQL - advanced_queries.sql

-- Sample data setup (run this first)
-- DROP TABLE IF EXISTS employees;
-- DROP TABLE IF EXISTS departments;
-- DROP TABLE IF EXISTS salaries;
-- DROP TABLE IF EXISTS projects;
-- DROP TABLE IF EXISTS project_assignments;

-- Departments table
CREATE TABLE departments (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name VARCHAR(100) NOT NULL UNIQUE,
    manager_id INTEGER,
    budget DECIMAL(12, 2),
    location VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Employees table
CREATE TABLE employees (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    department_id INTEGER,
    position VARCHAR(100),
    salary DECIMAL(10, 2),
    hire_date DATE NOT NULL,
    birth_date DATE,
    phone VARCHAR(20),
    address TEXT,
    is_active BOOLEAN DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (department_id) REFERENCES departments (id)
);

-- Salaries table (for tracking salary history)
CREATE TABLE salaries (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    employee_id INTEGER NOT NULL,
    salary DECIMAL(10, 2) NOT NULL,
    from_date DATE NOT NULL,
    to_date DATE,
    FOREIGN KEY (employee_id) REFERENCES employees (id)
);

-- Projects table
CREATE TABLE projects (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name VARCHAR(200) NOT NULL,
    description TEXT,
    start_date DATE,
    end_date DATE,
    budget DECIMAL(12, 2),
    status VARCHAR(20) DEFAULT 'planning',
    department_id INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (department_id) REFERENCES departments (id)
);

-- Project assignments table
CREATE TABLE project_assignments (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    employee_id INTEGER NOT NULL,
    project_id INTEGER NOT NULL,
    role VARCHAR(50),
    assignment_date DATE DEFAULT CURRENT_DATE,
    hours_allocated INTEGER,
    hours_worked INTEGER DEFAULT 0,
    FOREIGN KEY (employee_id) REFERENCES employees (id),
    FOREIGN KEY (project_id) REFERENCES projects (id),
    UNIQUE(employee_id, project_id)
);

-- 1. Complex JOIN Queries
-- Get all employees with their department information
SELECT
    e.id,
    e.first_name,
    e.last_name,
    e.email,
    e.position,
    e.salary,
    d.name as department_name,
    d.location as department_location,
    m.first_name || ' ' || m.last_name as manager_name
FROM employees e
LEFT JOIN departments d ON e.department_id = d.id
LEFT JOIN employees m ON d.manager_id = m.id
WHERE e.is_active = 1
ORDER BY d.name, e.last_name, e.first_name;

-- Get employees with their current salaries
SELECT
    e.id,
    e.first_name,
    e.last_name,
    e.position,
    s.salary,
    s.from_date as salary_effective_date
FROM employees e
JOIN salaries s ON e.id = s.employee_id
WHERE s.to_date IS NULL
ORDER BY s.salary DESC;

-- Get department budget utilization
SELECT
    d.name as department_name,
    d.budget as allocated_budget,
    COUNT(e.id) as employee_count,
    SUM(e.salary) as total_salaries,
    (d.budget - SUM(e.salary)) as remaining_budget,
    ROUND((SUM(e.salary) / d.budget) * 100, 2) as budget_utilization_percent
FROM departments d
LEFT JOIN employees e ON d.id = e.department_id AND e.is_active = 1
GROUP BY d.id, d.name, d.budget
ORDER BY budget_utilization_percent DESC;

-- 2. Window Functions
-- Employee salary ranking within departments
SELECT
    first_name,
    last_name,
    department_id,
    salary,
    RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) as dept_salary_rank,
    LAG(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) as prev_salary,
    salary - LAG(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) as salary_diff
FROM employees e
WHERE e.is_active = 1;

-- Running totals for project work hours
SELECT
    p.name as project_name,
    pa.employee_id,
    e.first_name || ' ' || e.last_name as employee_name,
    pa.hours_worked,
    SUM(pa.hours_worked) OVER (
        PARTITION BY pa.project_id
        ORDER BY pa.assignment_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_hours
FROM project_assignments pa
JOIN employees e ON pa.employee_id = e.id
JOIN projects p ON pa.project_id = p.id
WHERE p.status = 'in_progress'
ORDER BY p.name, cumulative_hours;

-- Moving averages for salary changes
SELECT
    e.first_name,
    e.last_name,
    s.salary,
    s.from_date,
    AVG(s.salary) OVER (
        PARTITION BY e.id
        ORDER BY s.from_date
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) as moving_avg_salary_3_months,
    s.salary - AVG(s.salary) OVER (
        PARTITION BY e.id
        ORDER BY s.from_date
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) as salary_vs_avg_diff
FROM salaries s
JOIN employees e ON s.employee_id = e.id;

-- 3. Subqueries and CTEs
-- Find departments with above average salaries
WITH dept_avg_salaries AS (
    SELECT
        d.id,
        d.name,
        AVG(e.salary) as avg_salary
    FROM departments d
    JOIN employees e ON d.id = e.department_id
    WHERE e.is_active = 1
    GROUP BY d.id, d.name
),
overall_avg_salary AS (
    SELECT AVG(salary) as overall_avg
    FROM employees
    WHERE is_active = 1
)
SELECT
    das.name,
    das.avg_salary,
    oas.overall_avg,
    ROUND((das.avg_salary / oas.overall_avg - 1) * 100, 2) as percent_above_avg
FROM dept_avg_salaries das
CROSS JOIN overall_avg_salary oas
WHERE das.avg_salary > oas.overall_avg
ORDER BY percent_above_avg DESC;

-- Find employees who have completed all assigned projects
WITH employee_projects AS (
    SELECT
        e.id as employee_id,
        e.first_name || ' ' || e.last_name as employee_name,
        p.id as project_id,
        p.name as project_name,
        p.end_date,
        pa.hours_allocated,
        pa.hours_worked
    FROM employees e
    JOIN project_assignments pa ON e.id = pa.employee_id
    JOIN projects p ON pa.project_id = p.id
),
completed_projects AS (
    SELECT
        employee_id,
        employee_name,
        COUNT(*) as total_projects,
        SUM(CASE WHEN hours_worked >= hours_allocated THEN 1 ELSE 0 END) as completed_projects
    FROM employee_projects
    GROUP BY employee_id, employee_name
)
SELECT
    employee_name,
    total_projects,
    completed_projects,
    ROUND((completed_projects * 100.0 / total_projects), 2) as completion_rate
FROM completed_projects
WHERE completed_projects = total_projects
ORDER BY completion_rate DESC;

-- 4. Aggregation and Grouping
-- Department performance metrics
SELECT
    d.name as department,
    COUNT(e.id) as total_employees,
    COUNT(CASE WHEN e.hire_date >= date('now', '-1 year') THEN 1 END) as new_hires,
    COUNT(CASE WHEN e.is_active = 0 THEN 1 END) as inactive_employees,
    AVG(e.salary) as avg_salary,
    MAX(e.salary) as max_salary,
    MIN(e.salary) as min_salary,
    ROUND(AVG(JULIANDAY('now') - JULIANDAY(e.hire_date))) as avg_tenure_days
FROM departments d
LEFT JOIN employees e ON d.id = e.department_id
GROUP BY d.id, d.name
ORDER BY avg_salary DESC;

-- Project timeline analysis
SELECT
    p.name as project_name,
    p.status,
    p.start_date,
    p.end_date,
    julianday(p.end_date) - julianday(p.start_date) as planned_duration_days,
    COUNT(pa.employee_id) as team_size,
    SUM(pa.hours_allocated) as total_hours_allocated,
    SUM(pa.hours_worked) as total_hours_worked,
    CASE
        WHEN SUM(pa.hours_worked) >= SUM(pa.hours_allocated) THEN 'On Track'
        WHEN julianday('now') > julianday(p.end_date) THEN 'Overdue'
        ELSE 'In Progress'
    END as status_indicator
FROM projects p
LEFT JOIN project_assignments pa ON p.id = pa.project_id
GROUP BY p.id, p.name, p.status, p.start_date, p.end_date
ORDER BY status_indicator, p.end_date;

-- 5. Recursive Queries (Hierarchical Data)
-- Department hierarchy (assuming self-referencing structure)
WITH RECURSIVE dept_hierarchy AS (
    -- Base case: top-level departments (no manager)
    SELECT
        id,
        name,
        manager_id,
        0 as level,
        name as hierarchy_path
    FROM departments
    WHERE manager_id IS NULL

    UNION ALL

    -- Recursive case: departments with managers
    SELECT
        d.id,
        d.name,
        d.manager_id,
        dh.level + 1,
        dh.hierarchy_path || ' > ' || d.name
    FROM departments d
    JOIN dept_hierarchy dh ON d.manager_id = dh.id
)
SELECT
    level,
    name,
    hierarchy_path,
    manager_id
FROM dept_hierarchy
ORDER BY level, hierarchy_path;

-- 6. Performance Optimization Queries
-- Create indexes for better performance
CREATE INDEX IF NOT EXISTS idx_employees_department_id ON employees(department_id);
CREATE INDEX IF NOT EXISTS idx_employees_hire_date ON employees(hire_date);
CREATE INDEX IF NOT EXISTS idx_employees_active ON employees(is_active);
CREATE INDEX IF NOT EXISTS idx_salaries_employee_id ON salaries(employee_id);
CREATE INDEX IF NOT EXISTS idx_salaries_from_date ON salaries(from_date);
CREATE INDEX IF NOT EXISTS idx_project_assignments_project_id ON project_assignments(project_id);

-- Analyze query performance
EXPLAIN QUERY PLAN
SELECT e.first_name, e.last_name, d.name as department
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.salary > 50000
ORDER BY e.salary DESC;

-- Get slow queries
SELECT
    sql,
    count(*) as execution_count,
    avg(time) as avg_time,
    max(time) as max_time
FROM sqlite_master
WHERE type = 'table'
  AND sql IS NOT NULL
GROUP BY sql
ORDER BY avg_time DESC
LIMIT 10;

-- 7. Date and Time Functions
-- Employee age analysis
SELECT
    first_name,
    last_name,
    birth_date,
    ROUND((julianday('now') - julianday(birth_date)) / 365.25, 1) as age,
    ROUND((julianday('now') - julianday(hire_date)) / 365.25, 1) as years_of_service,
    ROUND(
        (julianday('now') - julianday(hire_date)) /
        (julianday('now') - julianday(birth_date)) * 100,
        1
    ) as service_life_percentage
FROM employees
WHERE is_active = 1
ORDER BY years_of_service DESC;

-- Monthly hiring trends
SELECT
    strftime('%Y-%m', hire_date) as hire_month,
    COUNT(*) as hires_count,
    AVG(salary) as avg_salary_hired
FROM employees
GROUP BY strftime('%Y-%m', hire_date)
ORDER BY hire_month DESC
LIMIT 24;

-- 8. Conditional Logic and Case Statements
-- Employee performance categorization
SELECT
    first_name,
    last_name,
    salary,
    CASE
        WHEN salary < 30000 THEN 'Entry Level'
        WHEN salary < 60000 THEN 'Junior'
        WHEN salary < 100000 THEN 'Mid Level'
        WHEN salary < 150000 THEN 'Senior'
        ELSE 'Executive'
    END as salary_level,
    CASE
        WHEN julianday('now') - julianday(hire_date) < 365 THEN 'New Hire'
        WHEN julianday('now') - julianday(hire_date) < 1825 THEN 'Experienced'
        ELSE 'Veteran'
    END as experience_level,
    CASE
        WHEN julianday('now') - julianday(hire_date) < 365 AND salary < 50000 THEN 'Underpaid New Hire'
        WHEN julianday('now') - julianday(hire_date) >= 1825 AND salary < 60000 THEN 'Underpaid Veteran'
        WHEN salary > 100000 AND julianday('now') - julianday(hire_date) < 730 THEN 'Rapid Advancement'
        ELSE 'Appropriate Compensation'
    END as compensation_analysis
FROM employees
WHERE is_active = 1
ORDER BY salary DESC;

-- 9. String Functions and Pattern Matching
-- Search for employees with specific patterns
SELECT
    first_name,
    last_name,
    email,
    phone,
    CASE
        WHEN email LIKE '%@company.com' THEN 'Internal Email'
        WHEN email LIKE '%@gmail.com' THEN 'Gmail User'
        WHEN email LIKE '%@yahoo.com' THEN 'Yahoo User'
        ELSE 'Other Email Provider'
    END as email_category,
    CASE
        WHEN phone LIKE '%555-%' THEN 'Local Number'
        WHEN phone LIKE '1-%' THEN 'US Number'
        WHEN phone LIKE '+44-%' THEN 'UK Number'
        ELSE 'International Number'
    END as phone_category,
    LENGTH(first_name) + LENGTH(last_name) as name_length
FROM employees
WHERE (
    first_name LIKE '%a%' AND last_name LIKE '%s%'  -- Names containing 'a' and 's'
    OR email LIKE '%test%'  -- Email containing 'test'
    OR phone IS NOT NULL
)
ORDER BY name_length DESC;

-- 10. JSON Operations (SQLite 3.38+)
-- Store additional employee metadata as JSON
ALTER TABLE employees ADD COLUMN metadata TEXT;

-- Update metadata with JSON data
UPDATE employees SET metadata = json_object(
    'skills', json_array('Python', 'SQL', 'JavaScript'),
    'certifications', json_array('AWS Certified', 'Google Cloud'),
    'preferences', json_object('remote_work', true, 'team_size', 'small')
) WHERE id = 1;

-- Query JSON data
SELECT
    first_name,
    last_name,
    json_extract(metadata, '$.skills') as skills,
    json_array_length(metadata, '$.certifications') as certification_count,
    json_extract(metadata, '$.preferences.remote_work') as remote_work_preference
FROM employees
WHERE json_extract(metadata, '$.certifications') IS NOT NULL;

-- 11. Full-Text Search
-- Create FTS virtual table for employee search
CREATE VIRTUAL TABLE employees_fts USING fts5(
    first_name,
    last_name,
    email,
    position,
    content='employees',
    content_rowid='id'
);

-- Enable triggers for FTS
CREATE TRIGGER employees_fts_insert AFTER INSERT ON employees BEGIN
    INSERT INTO employees_fts(rowid, first_name, last_name, email, position)
    VALUES (new.id, new.first_name, new.last_name, new.email, new.position);
END;

CREATE TRIGGER employees_fts_delete AFTER DELETE ON employees BEGIN
    INSERT INTO employees_fts(employees_fts, rowid, first_name, last_name, email, position)
    VALUES ('delete', old.id, old.first_name, old.last_name, old.email, old.position);
END;

CREATE TRIGGER employees_fts_update AFTER UPDATE ON employees BEGIN
    INSERT INTO employees_fts(employees_fts, rowid, first_name, last_name, email, position)
    VALUES ('delete', old.id, old.first_name, old.last_name, old.email, old.position);
    INSERT INTO employees_fts(rowid, first_name, last_name, email, position)
    VALUES (new.id, new.first_name, new.last_name, new.email, new.position);
END;

-- Full-text search
SELECT
    e.first_name,
    e.last_name,
    e.email,
    e.position,
    rank
FROM employees_fts efts
JOIN employees e ON efts.rowid = e.id
WHERE employees_fts MATCH 'python OR "software engineer"'
ORDER BY rank DESC
LIMIT 10;

-- 12. Data Analysis and Reporting
-- Generate quarterly reports
SELECT
    strftime('%Y', p.start_date) as year,
    ((strftime('%m', p.start_date) - 1) / 3) + 1 as quarter,
    COUNT(*) as projects_started,
    SUM(CASE WHEN p.status = 'completed' THEN 1 ELSE 0 END) as projects_completed,
    SUM(p.budget) as total_budget,
    AVG(p.budget) as avg_budget,
    SUM(p.budget) * 0.1 as estimated_profit
FROM projects p
WHERE p.start_date >= date('now', '-2 years')
GROUP BY strftime('%Y', p.start_date),
         ((strftime('%m', p.start_date) - 1) / 3) + 1
ORDER BY year DESC, quarter DESC;

💻 SQLite con SQLAlchemy ORM python

🟡 intermediate ⭐⭐⭐⭐

Mapeo Objeto-Relacional con SQLAlchemy para operaciones de base de datos Python

⏱️ 40 min 🏷️ sqlite, sqlalchemy, orm, python, database
Prerequisites: Python 3.6+, Basic SQL knowledge, Object-oriented programming
# SQLite with SQLAlchemy ORM
# Python - sqlalchemy_orm.py

from sqlalchemy import (
    create_engine, Column, Integer, String, Text, Boolean,
    DateTime, Float, ForeignKey, Table, MetaData,
    event, inspect
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, scoped_session
from sqlalchemy.sql import func
from sqlalchemy.schema import FetchedValue
from datetime import datetime, date
import json

# Create base class for declarative models
Base = declarative_base()

# Association table for many-to-many relationship (posts and tags)
post_tags = Table(
    'post_tags',
    Base.metadata,
    Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
    Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)

class User(Base):
    """User model"""
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False, index=True)
    email = Column(String(100), unique=True, nullable=False, index=True)
    password_hash = Column(String(255), nullable=False)
    first_name = Column(String(50))
    last_name = Column(String(50))
    bio = Column(Text)
    avatar_url = Column(String(255))
    birth_date = Column(Date)
    location = Column(String(100))
    website = Column(String(255))
    is_active = Column(Boolean, default=True)
    is_verified = Column(Boolean, default=False)
    email_verified_at = Column(DateTime)
    last_login_at = Column(DateTime)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # Relationships
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
    comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan")
    followers = relationship("Follower", foreign_keys="Follower.following_id", back_populates="following")
    following = relationship("Follower", foreign_keys="Follower.follower_id", back_populates="follower")

    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"

class Post(Base):
    """Post model"""
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False, index=True)
    slug = Column(String(200), unique=True, nullable=False, index=True)
    content = Column(Text, nullable=False)
    excerpt = Column(Text)
    status = Column(String(20), default='draft', index=True)  # draft, published, archived
    comment_status = Column(String(20), default='open')  # open, closed
    password = Column(String(255))  # For password-protected posts
    featured_image = Column(String(255))
    view_count = Column(Integer, default=0)
    like_count = Column(Integer, default=0)
    comment_count = Column(Integer, default=0)
    published_at = Column(DateTime, index=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # Foreign keys
    author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    category_id = Column(Integer, ForeignKey('categories.id'))

    # Relationships
    author = relationship("User", back_populates="posts")
    category = relationship("Category", back_populates="posts")
    comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
    tags = relationship("Tag", secondary=post_tags, back_populates="posts")

    def __repr__(self):
        return f"<Post(id={self.id}, title='{self.title}', status='{self.status}')>"

class Category(Base):
    """Category model"""
    __tablename__ = 'categories'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False, index=True)
    slug = Column(String(50), unique=True, nullable=False, index=True)
    description = Column(Text)
    parent_id = Column(Integer, ForeignKey('categories.id'))
    color = Column(String(7))  # Hex color code
    post_count = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # Self-referential relationship for hierarchical categories
    parent = relationship("Category", remote_side=[id], back_populates="children")
    children = relationship("Category", back_populates="parent")

    # Relationships
    posts = relationship("Post", back_populates="category")

    def __repr__(self):
        return f"<Category(id={self.id}, name='{self.name}')>"

class Tag(Base):
    """Tag model"""
    __tablename__ = 'tags'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False, index=True)
    slug = Column(String(50), unique=True, nullable=False, index=True)
    description = Column(Text)
    color = Column(String(7))  # Hex color code
    post_count = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)

    # Relationships
    posts = relationship("Post", secondary=post_tags, back_populates="tags")

    def __repr__(self):
        return f"<Tag(id={self.id}, name='{self.name}')>"

class Comment(Base):
    """Comment model"""
    __tablename__ = 'comments'

    id = Column(Integer, primary_key=True)
    content = Column(Text, nullable=False)
    author_name = Column(String(100))  # For guest comments
    author_email = Column(String(100))  # For guest comments
    author_ip = Column(String(45))  # For guest comments
    is_approved = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # Foreign keys
    post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
    author_id = Column(Integer, ForeignKey('users.id'))  # null for guest comments
    parent_id = Column(Integer, ForeignKey('comments.id'))  # For nested comments

    # Relationships
    post = relationship("Post", back_populates="comments")
    author = relationship("User", back_populates="comments")
    parent = relationship("Comment", back_populates="children")
    children = relationship("Comment", back_populates="parent")

    def __repr__(self):
        return f"<Comment(id={self.id}, post_id={self.post_id}, author_id={self.author_id})>"

class Follower(Base):
    """Follower model for user relationships"""
    __tablename__ = 'followers'

    id = Column(Integer, primary_key=True)
    follower_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    following_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

    # Relationships
    follower = relationship("User", foreign_keys=[follower_id], back_populates="following")
    following = relationship("User", foreign_keys=[following_id], back_populates="followers")

    # Ensure no duplicate follows
    __table_args__ = (Index('idx_follower_following', 'follower_id', 'following_id', unique=True),)

class DatabaseManager:
    """Database manager class for SQLAlchemy operations"""

    def __init__(self, db_url='sqlite:///blog.db'):
        self.db_url = db_url
        self.engine = None
        self.Session = None
        self.session = None

    def connect(self):
        """Create database engine and session"""
        try:
            self.engine = create_engine(
                self.db_url,
                echo=False,  # Set to True for SQL logging
                pool_pre_ping=True
            )

            # Create session factory
            self.Session = sessionmaker(bind=self.engine)
            self.session = scoped_session(self.Session)

            print(f"✅ Connected to database: {self.db_url}")
            return True
        except Exception as e:
            print(f"❌ Error connecting to database: {e}")
            return False

    def create_tables(self):
        """Create all tables"""
        try:
            Base.metadata.create_all(self.engine)
            print("✅ Database tables created successfully")
            return True
        except Exception as e:
            print(f"❌ Error creating tables: {e}")
            return False

    def drop_tables(self):
        """Drop all tables"""
        try:
            Base.metadata.drop_all(self.engine)
            print("✅ Database tables dropped successfully")
            return True
        except Exception as e:
            print(f"❌ Error dropping tables: {e}")
            return False

    def get_session(self):
        """Get database session"""
        if not self.session:
            self.connect()
        return self.session()

    def commit_and_close(self):
        """Commit current session and close"""
        if self.session:
            try:
                self.session.commit()
                self.session.close()
                print("✅ Session committed and closed")
            except Exception as e:
                print(f"❌ Error committing session: {e}")
                self.session.rollback()
                self.session.close()
            self.session = None

class UserService:
    """Service class for user operations"""

    def __init__(self, db_manager):
        self.db = db_manager
        self.session = db_manager.get_session()

    def create_user(self, username, email, password_hash, **kwargs):
        """Create a new user"""
        try:
            user = User(
                username=username,
                email=email,
                password_hash=password_hash,
                **kwargs
            )

            self.session.add(user)
            self.session.commit()
            self.session.refresh(user)

            print(f"✅ User '{username}' created with ID: {user.id}")
            return user
        except Exception as e:
            self.session.rollback()
            print(f"❌ Error creating user: {e}")
            return None

    def get_user_by_id(self, user_id):
        """Get user by ID"""
        try:
            user = self.session.query(User).filter(User.id == user_id).first()
            if user:
                print(f"✅ Found user: {user.username}")
            else:
                print(f"❌ User with ID {user_id} not found")
            return user
        except Exception as e:
            print(f"❌ Error getting user: {e}")
            return None

    def get_user_by_username(self, username):
        """Get user by username"""
        try:
            user = self.session.query(User).filter(User.username == username).first()
            return user
        except Exception as e:
            print(f"❌ Error getting user by username: {e}")
            return None

    def get_user_by_email(self, email):
        """Get user by email"""
        try:
            user = self.session.query(User).filter(User.email == email).first()
            return user
        except Exception as e:
            print(f"❌ Error getting user by email: {e}")
            return None

    def update_user(self, user_id, **kwargs):
        """Update user information"""
        try:
            user = self.get_user_by_id(user_id)
            if not user:
                return None

            for key, value in kwargs.items():
                if hasattr(user, key):
                    setattr(user, key, value)

            user.updated_at = datetime.utcnow()
            self.session.commit()

            print(f"✅ User {user.username} updated successfully")
            return user
        except Exception as e:
            self.session.rollback()
            print(f"❌ Error updating user: {e}")
            return None

    def delete_user(self, user_id):
        """Delete a user"""
        try:
            user = self.get_user_by_id(user_id)
            if not user:
                return False

            self.session.delete(user)
            self.session.commit()

            print(f"✅ User {user.username} deleted successfully")
            return True
        except Exception as e:
            self.session.rollback()
            print(f"❌ Error deleting user: {e}")
            return False

    def get_users(self, active_only=True, limit=None, offset=None):
        """Get list of users"""
        try:
            query = self.session.query(User)

            if active_only:
                query = query.filter(User.is_active == True)

            if limit:
                query = query.limit(limit)

            if offset:
                query = query.offset(offset)

            users = query.all()
            print(f"✅ Found {len(users)} users")
            return users
        except Exception as e:
            print(f"❌ Error getting users: {e}")
            return []

    def search_users(self, query, active_only=True):
        """Search users by name or email"""
        try:
            search_pattern = f"%{query}%"
            db_query = self.session.query(User).filter(
                (User.first_name.ilike(search_pattern) |
                 User.last_name.ilike(search_pattern) |
                 User.email.ilike(search_pattern))
            )

            if active_only:
                db_query = db_query.filter(User.is_active == True)

            users = db_query.all()
            print(f"✅ Found {len(users)} users matching '{query}'")
            return users
        except Exception as e:
            print(f"❌ Error searching users: {e}")
            return []

class PostService:
    """Service class for post operations"""

    def __init__(self, db_manager):
        self.db = db_manager
        self.session = db_manager.get_session()

    def create_post(self, title, content, author_id, **kwargs):
        """Create a new post"""
        try:
            post = Post(
                title=title,
                content=content,
                author_id=author_id,
                **kwargs
            )

            self.session.add(post)
            self.session.commit()
            self.session.refresh(post)

            print(f"✅ Post '{title}' created with ID: {post.id}")
            return post
        except Exception as e:
            self.session.rollback()
            print(f"❌ Error creating post: {e}")
            return None

    def get_post_by_id(self, post_id):
        """Get post by ID"""
        try:
            post = self.session.query(Post).options(
                joinedload(Post.author),
                joinedload(Post.category),
                joinedload(Post.tags),
                joinedload(Post.comments)
            ).filter(Post.id == post_id).first()

            if post:
                print(f"✅ Found post: {post.title}")
            else:
                print(f"❌ Post with ID {post_id} not found")
            return post
        except Exception as e:
            print(f"❌ Error getting post: {e}")
            return None

    def get_posts(self, status=None, category_id=None, author_id=None, limit=None, offset=None):
        """Get posts with filters"""
        try:
            query = self.session.query(Post).options(
                joinedload(Post.author),
                joinedload(Post.category),
                joinedload(Post.tags),
                joinedload(Post.comments)
            )

            if status:
                query = query.filter(Post.status == status)

            if category_id:
                query = query.filter(Post.category_id == category_id)

            if author_id:
                query = query.filter(Post.author_id == author_id)

            query = query.order_by(Post.created_at.desc())

            if limit:
                query = query.limit(limit)

            if offset:
                query = query.offset(offset)

            posts = query.all()
            print(f"✅ Found {len(posts)} posts")
            return posts
        except Exception as e:
            print(f"❌ Error getting posts: {e}")
            return []

    def update_post(self, post_id, **kwargs):
        """Update post"""
        try:
            post = self.get_post_by_id(post_id)
            if not post:
                return None

            for key, value in kwargs.items():
                if hasattr(post, key):
                    setattr(post, key, value)

            post.updated_at = datetime.utcnow()
            self.session.commit()

            print(f"✅ Post '{post.title}' updated successfully")
            return post
        except Exception as e:
            self.session.rollback()
            print(f"❌ Error updating post: {e}")
            return None

    def delete_post(self, post_id):
        """Delete a post"""
        try:
            post = self.get_post_by_id(post_id)
            if not post:
                return False

            self.session.delete(post)
            self.session.commit()

            print(f"✅ Post '{post.title}' deleted successfully")
            return True
        except Exception as e:
            self.session.rollback()
            print(f"❌ Error deleting post: {e}")
            return False

    def get_popular_posts(self, limit=10):
        """Get popular posts based on view count and comment count"""
        try:
            posts = self.session.query(Post).options(
                joinedload(Post.author),
                joinedload(Post.category),
                joinedload(Post.tags)
            ).filter(
                Post.status == 'published'
            ).order_by(
                (Post.view_count + Post.like_count + Post.comment_count).desc()
            ).limit(limit)

            print(f"✅ Found {len(posts)} popular posts")
            return posts
        except Exception as e:
            print(f"❌ Error getting popular posts: {e}")
            return []

    def search_posts(self, query, limit=None):
        """Search posts by title, content, or tags"""
        try:
            search_pattern = f"%{query}%"
            db_query = self.session.query(Post).options(
                joinedload(Post.author),
                joinedload(Post.category),
                joinedload(Post.tags)
            ).filter(
                (Post.title.ilike(search_pattern) |
                 Post.content.ilike(search_pattern))
            )

            # Also search in tags
            tag_query = self.session.query(Post).options(
                joinedload(Post.author),
                joinedload(Post.category),
                joinedload(Post.tags)
            ).join(Post.tags).filter(
                Tag.name.ilike(search_pattern)
            )

            # Combine results
            posts = db_query.union(tag_query).distinct().order_by(Post.created_at.desc())

            if limit:
                posts = posts.limit(limit)

            print(f"✅ Found {len(posts.all())} posts matching '{query}'")
            return posts.all()
        except Exception as e:
            print(f"❌ Error searching posts: {e}")
            return []

class AnalyticsService:
    """Service class for database analytics"""

    def __init__(self, db_manager):
        self.db = db_manager
        self.session = db_manager.get_session()

    def get_user_statistics(self):
        """Get user statistics"""
        try:
            total_users = self.session.query(User).count()
            active_users = self.session.query(User).filter(User.is_active == True).count()
            new_users_month = self.session.query(User).filter(
                User.created_at >= datetime.utcnow().replace(day=1)
            ).count()

            # User registration by month for the last 6 months
            user_registrations = self.session.query(
                func.strftime('%Y-%m', User.created_at).label('month'),
                func.count(User.id).label('count')
            ).filter(
                User.created_at >= datetime.utcnow().replace(month=datetime.utcnow().month - 5, day=1)
            ).group_by(func.strftime('%Y-%m', User.created_at)).all()

            stats = {
                'total_users': total_users,
                'active_users': active_users,
                'new_users_month': new_users_month,
                'user_registrations': user_registrations
            }

            print("📊 User Statistics:")
            print(f"   Total Users: {total_users}")
            print(f"   Active Users: {active_users}")
            print(f"   New Users (This Month): {new_users_month}")

            return stats
        except Exception as e:
            print(f"❌ Error getting user statistics: {e}")
            return None

    def get_post_statistics(self):
        """Get post statistics"""
        try:
            total_posts = self.session.query(Post).count()
            published_posts = self.session.query(Post).filter(Post.status == 'published').count()
            draft_posts = self.session.query(Post).filter(Post.status == 'draft').count()

            # Posts by month for the last 6 months
            posts_by_month = self.session.query(
                func.strftime('%Y-%m', Post.created_at).label('month'),
                func.count(Post.id).label('count'),
                func.sum(Post.view_count).label('total_views')
            ).filter(
                Post.created_at >= datetime.utcnow().replace(month=datetime.utcnow().month - 5, day=1)
            ).group_by(func.strftime('%Y-%m', Post.created_at)).all()

            # Category distribution
            category_distribution = self.session.query(
                Category.name,
                func.count(Post.id).label('post_count')
            ).join(Post).group_by(Category.name).all()

            # Top posts
            top_posts = self.session.query(
                Post.title,
                Post.view_count,
                Post.like_count,
                Post.comment_count
            ).filter(Post.status == 'published').order_by(
                Post.view_count.desc()
            ).limit(10).all()

            stats = {
                'total_posts': total_posts,
                'published_posts': published_posts,
                'draft_posts': draft_posts,
                'posts_by_month': posts_by_month,
                'category_distribution': category_distribution,
                'top_posts': top_posts
            }

            print("📊 Post Statistics:")
            print(f"   Total Posts: {total_posts}")
            print(f"   Published: {published_posts}")
            print(f"   Drafts: {draft_posts}")

            return stats
        except Exception as e:
            print(f"❌ Error getting post statistics: {e}")
            return None

    def get_author_analytics(self, author_id):
        """Get analytics for a specific author"""
        try:
            author = self.session.query(User).filter(User.id == author_id).first()
            if not author:
                return None

            total_posts = self.session.query(Post).filter(Post.author_id == author_id).count()
            published_posts = self.session.query(Post).filter(
                Post.author_id == author_id,
                Post.status == 'published'
            ).count()

            total_views = self.session.query(func.sum(Post.view_count)).filter(
                Post.author_id == author_id
            ).scalar() or 0

            total_comments = self.session.query(func.sum(Post.comment_count)).filter(
                Post.author_id == author_id
            ).scalar() or 0

            # Follower count
            follower_count = self.session.query(Follower).filter(
                Follower.following_id == author_id
            ).count()

            following_count = self.session.query(Follower).filter(
                Follower.follower_id == author_id
            ).count()

            analytics = {
                'author': author,
                'total_posts': total_posts,
                'published_posts': published_posts,
                'total_views': total_views,
                'total_comments': total_comments,
                'follower_count': follower_count,
                'following_count': following_count,
                'engagement_rate': (total_views + total_comments * 2) / total_posts if total_posts > 0 else 0
            }

            print(f"📊 Analytics for {author.username}:")
            print(f"   Posts: {total_posts} ({published_posts} published)")
            print(f"   Total Views: {total_views}")
            print(f"   Total Comments: {total_comments}")
            print(f"   Followers: {follower_count}")
            print(f"   Following: {following_count}")
            print(f"   Engagement Rate: {analytics['engagement_rate']:.2f}")

            return analytics
        except Exception as e:
            print(f"❌ Error getting author analytics: {e}")
            return None

# Example usage
def main():
    # Initialize database
    db_manager = DatabaseManager()

    # Connect and create tables
    if db_manager.connect():
        db_manager.create_tables()

        # Create service instances
        user_service = UserService(db_manager)
        post_service = PostService(db_manager)
        analytics_service = AnalyticsService(db_manager)

        try:
            # Create sample users
            print("\n👤 Creating sample users...")
            user1 = user_service.create_user(
                username='johndoe',
                email='[email protected]',
                password_hash='hashed_password_1',
                first_name='John',
                last_name='Doe',
                bio='Software developer and tech enthusiast.',
                location='San Francisco, CA'
            )

            user2 = user_service.create_user(
                username='janedoe',
                email='[email protected]',
                password_hash='hashed_password_2',
                first_name='Jane',
                last_name='Doe',
                bio='Data scientist and AI researcher.',
                location='New York, NY'
            )

            # Create sample posts
            print("\n📝 Creating sample posts...")
            if user1 and user2:
                post1 = post_service.create_post(
                    title='Getting Started with SQLite',
                    content='SQLite is a lightweight, serverless database engine...',
                    author_id=user1.id,
                    status='published',
                    view_count=150,
                    like_count=25,
                    comment_count=8,
                    published_at=datetime(2024, 1, 15)
                )

                post2 = post_service.create_post(
                    title='Python Best Practices',
                    content='Here are some best practices for Python development...',
                    author_id=user1.id,
                    status='published',
                    view_count=300,
                    like_count=45,
                    comment_count=12,
                    published_at=datetime(2024, 2, 1)
                )

                post3 = post_service.create_post(
                    title='Machine Learning Fundamentals',
                    content='Machine learning is a subset of artificial intelligence...',
                    author_id=user2.id,
                    status='published',
                    view_count=500,
                    like_count=75,
                    comment_count=20,
                    published_at=datetime(2024, 1, 20)
                )

            # Get analytics
            print("\n📊 Generating analytics...")
            user_stats = analytics_service.get_user_statistics()
            post_stats = analytics_service.get_post_statistics()

            # Get author analytics
            if user1:
                author_analytics = analytics_service.get_author_analytics(user1.id)

            # Search examples
            print("\n🔍 Search examples...")
            search_users = user_service.search_users('john')
            search_posts = post_service.search_posts('python')

            # Get popular posts
            print("\n⭐ Popular posts:")
            popular_posts = post_service.get_popular_posts(5)

        except Exception as e:
            print(f"❌ Error in main execution: {e}")

        finally:
            # Close database connection
            db_manager.commit_and_close()

if __name__ == "__main__":
    main()

💻 Scripts de Migración de Base de Datos python

🔴 complex ⭐⭐⭐⭐

Sistema de migración automatizada y versionamiento de esquemas

⏱️ 50 min 🏷️ sqlite, migrations, versioning, database
Prerequisites: Python 3.6+, SQLite knowledge, Database concepts
# SQLite Migration System
# Python - migration_system.py

import os
import sqlite3
import json
import hashlib
from datetime import datetime
from typing import List, Dict, Any, Optional
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class Migration:
    """Represents a single migration"""

    def __init__(self, version: str, description: str, up_sql: str, down_sql: str):
        self.version = version
        self.description = description
        self.up_sql = up_sql
        self.down_sql = down_sql
        self.checksum = self._calculate_checksum(up_sql + down_sql)

    def _calculate_checksum(self, content: str) -> str:
        """Calculate checksum for migration content"""
        return hashlib.sha256(content.encode()).hexdigest()

    def validate(self):
        """Validate migration integrity"""
        current_checksum = self._calculate_checksum(self.up_sql + self.down_sql)
        return current_checksum == self.checksum

class MigrationSystem:
    """Database migration system"""

    def __init__(self, db_path: str = 'database.db', migrations_dir: str = 'migrations'):
        self.db_path = db_path
        self.migrations_dir = migrations_dir
        self.connection = None
        self.cursor = None

        # Ensure migrations directory exists
        os.makedirs(migrations_dir, exist_ok=True)

        # Create migration lock table
        self.migrations_table = '''
        CREATE TABLE IF NOT EXISTS schema_migrations (
            version VARCHAR(50) PRIMARY KEY,
            description TEXT,
            checksum VARCHAR(64),
            applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            rollback_sql TEXT
        );
        '''

    def connect(self):
        """Connect to database"""
        try:
            self.connection = sqlite3.connect(self.db_path)
            self.cursor = self.connection.cursor()

            # Enable foreign keys
            self.cursor.execute("PRAGMA foreign_keys = ON")

            # Row factory for dictionaries
            self.connection.row_factory = sqlite3.Row

            # Create migration lock table
            self.cursor.executescript(self.migrations_table)
            self.connection.commit()

            logger.info(f"Connected to database: {self.db_path}")
            return True
        except Exception as e:
            logger.error(f"Error connecting to database: {e}")
            return False

    def disconnect(self):
        """Close database connection"""
        if self.connection:
            self.connection.close()
            logger.info("Disconnected from database")

    def lock_database(self):
        """Acquire exclusive database lock for migration"""
        try:
            # SQLite's BEGIN IMMEDIATE starts a transaction and immediately acquires a write lock
            self.cursor.execute("BEGIN IMMEDIATE")
            logger.debug("Database locked for migration")
            return True
        except Exception as e:
            logger.error(f"Error locking database: {e}")
            return False

    def unlock_database(self):
        """Release database lock"""
        try:
            self.connection.commit()
            logger.debug("Database unlocked")
            return True
        except Exception as e:
            logger.error(f"Error unlocking database: {e}")
            self.connection.rollback()
            return False

    def create_migration(self, version: str, description: str, up_sql: str, down_sql: str) -> Migration:
        """Create a new migration file"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{timestamp}_{version}.py"
        filepath = os.path.join(self.migrations_dir, filename)

        migration_content = f'''"""
# Migration: {version}
# Description: {description}
# Created: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

UP_SQL = """
{up_sql.strip()}
"""

DOWN_SQL = """
{down_sql.strip()}
"""

# Import migration base
from migration_system import Migration

# Create migration instance
migration = Migration(
    version="{version}",
    description="{description}",
    up_sql=UP_SQL,
    down_sql=DOWN_SQL
)
"""

        with open(filepath, 'w') as f:
            f.write(migration_content)

        logger.info(f"Created migration file: {filename}")
        return Migration(version, description, up_sql, down_sql)

    def load_migrations(self) -> List[Migration]:
        """Load all migration files"""
        migrations = []

        if not os.path.exists(self.migrations_dir):
            return migrations

        for filename in sorted(os.listdir(self.migrations_dir)):
            if filename.endswith('.py'):
                filepath = os.path.join(self.migrations_dir, filename)

                try:
                    # Load and execute migration file
                    namespace = {}
                    with open(filepath, 'r') as f:
                        exec(f.read(), namespace)

                    if 'migration' in namespace:
                        migration = namespace['migration']
                        migrations.append(migration)
                        logger.debug(f"Loaded migration: {migration.version}")
                except Exception as e:
                    logger.error(f"Error loading migration {filename}: {e}")

        return migrations

    def get_applied_migrations(self) -> List[Dict[str, Any]]:
        """Get list of applied migrations"""
        try:
            self.cursor.execute("SELECT * FROM schema_migrations ORDER BY applied_at")
            return [dict(row) for row in self.cursor.fetchall()]
        except Exception as e:
            logger.error(f"Error getting applied migrations: {e}")
            return []

    def get_pending_migrations(self) -> List[Migration]:
        """Get list of pending migrations"""
        all_migrations = self.load_migrations()
        applied_migrations = {m['version'] for m in self.get_applied_migrations()}

        pending = []
        for migration in all_migrations:
            if migration.version not in applied_migrations:
                pending.append(migration)
            else:
                # Validate applied migration checksum
                applied = next(m for m in self.get_applied_migrations() if m['version'] == migration.version)
                if not migration.validate() or migration.checksum != applied['checksum']:
                    logger.warning(f"Migration {migration.version} validation failed, marking as pending")
                    pending.append(migration)

        return pending

    def apply_migration(self, migration: Migration) -> bool:
        """Apply a single migration"""
        logger.info(f"Applying migration: {migration.version} - {migration.description}")

        if not self.lock_database():
            return False

        try:
            # Execute migration SQL
            if migration.up_sql.strip():
                self.cursor.executescript(migration.up_sql)

            # Record migration in schema_migrations table
            self.cursor.execute("""
                INSERT INTO schema_migrations (version, description, checksum, rollback_sql)
                VALUES (?, ?, ?, ?)
            """, (
                migration.version,
                migration.description,
                migration.checksum,
                migration.down_sql
            ))

            self.connection.commit()
            logger.info(f"Successfully applied migration: {migration.version}")
            return True

        except Exception as e:
            logger.error(f"Error applying migration {migration.version}: {e}")
            self.connection.rollback()
            return False
        finally:
            self.unlock_database()

    def rollback_migration(self, migration: Migration) -> bool:
        """Rollback a migration"""
        logger.info(f"Rolling back migration: {migration.version} - {migration.description}")

        if not self.lock_database():
            return False

        try:
            # Execute rollback SQL
            if migration.down_sql.strip():
                self.cursor.executescript(migration.down_sql)

            # Remove migration from schema_migrations table
            self.cursor.execute(
                "DELETE FROM schema_migrations WHERE version = ?",
                (migration.version,)
            )

            self.connection.commit()
            logger.info(f"Successfully rolled back migration: {migration.version}")
            return True

        except Exception as e:
            logger.error(f"Error rolling back migration {migration.version}: {e}")
            self.connection.rollback()
            return False
        finally:
            self.unlock_database()

    def migrate_up(self, target_version: Optional[str] = None) -> bool:
        """Migrate up to target version"""
        logger.info("Starting migration up...")

        pending_migrations = self.get_pending_migrations()

        if target_version:
            pending_migrations = [m for m in pending_migrations if m.version <= target_version]

        if not pending_migrations:
            logger.info("No pending migrations to apply")
            return True

        success = True
        for migration in pending_migrations:
            if not self.apply_migration(migration):
                success = False
                break

        if success:
            logger.info("Migration up completed successfully")
        else:
            logger.error("Migration up failed")

        return success

    def migrate_down(self, target_version: str) -> bool:
        """Migrate down to target version"""
        logger.info(f"Starting migration down to version: {target_version}")

        applied_migrations = self.get_applied_migrations()

        # Get migrations to rollback (in reverse order)
        migrations_to_rollback = []
        for applied in reversed(applied_migrations):
            if applied['version'] > target_version:
                # Load migration to get rollback SQL
                all_migrations = self.load_migrations()
                migration = next((m for m in all_migrations if m.version == applied['version']), None)
                if migration:
                    # Set rollback SQL from stored data
                    migration.down_sql = applied.get('rollback_sql', '')
                    migrations_to_rollback.append(migration)

        if not migrations_to_rollback:
            logger.info("No migrations to rollback")
            return True

        success = True
        for migration in migrations_to_rollback:
            if not self.rollback_migration(migration):
                success = False
                break

        if success:
            logger.info("Migration down completed successfully")
        else:
            logger.error("Migration down failed")

        return success

    def get_migration_status(self) -> Dict[str, Any]:
        """Get current migration status"""
        try:
            applied_migrations = self.get_applied_migrations()
            pending_migrations = self.get_pending_migrations()
            all_migrations = self.load_migrations()

            return {
                'total_migrations': len(all_migrations),
                'applied_migrations': len(applied_migrations),
                'pending_migrations': len(pending_migrations),
                'current_version': applied_migrations[-1]['version'] if applied_migrations else None,
                'latest_version': all_migrations[-1].version if all_migrations else None,
                'applied_list': [m['version'] for m in applied_migrations],
                'pending_list': [m.version for m in pending_migrations]
            }
        except Exception as e:
            logger.error(f"Error getting migration status: {e}")
            return {}

class BackupManager:
    """Database backup and restore manager"""

    def __init__(self, db_path: str, backup_dir: str = 'backups'):
        self.db_path = db_path
        self.backup_dir = backup_dir
        os.makedirs(backup_dir, exist_ok=True)

    def create_backup(self, backup_name: Optional[str] = None) -> str:
        """Create a database backup"""
        if not backup_name:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_name = f"backup_{timestamp}.db"

        backup_path = os.path.join(self.backup_dir, backup_name)

        try:
            # Close any existing connections to avoid database locking issues
            import shutil
            shutil.copy2(self.db_path, backup_path)

            logger.info(f"Created backup: {backup_path}")
            return backup_path
        except Exception as e:
            logger.error(f"Error creating backup: {e}")
            return ""

    def restore_backup(self, backup_path: str, create_new: bool = False) -> bool:
        """Restore database from backup"""
        if not os.path.exists(backup_path):
            logger.error(f"Backup file not found: {backup_path}")
            return False

        try:
            import shutil
            if create_new:
                backup_path_new = f"{backup_path}.restored"
                shutil.copy2(backup_path, backup_path_new)
                logger.info(f"Created restored database: {backup_path_new}")
            else:
                shutil.copy2(backup_path, self.db_path)
                logger.info(f"Restored database from: {backup_path}")

            return True
        except Exception as e:
            logger.error(f"Error restoring backup: {e}")
            return False

    def list_backups(self) -> List[Dict[str, Any]]:
        """List all available backups"""
        backups = []

        if not os.path.exists(self.backup_dir):
            return backups

        for filename in os.listdir(self.backup_dir):
            if filename.endswith('.db'):
                filepath = os.path.join(self.backup_dir, filename)
                stat = os.stat(filepath)

                backups.append({
                    'filename': filename,
                    'filepath': filepath,
                    'size': stat.st_size,
                    'created': datetime.fromtimestamp(stat.st_ctime),
                    'modified': datetime.fromtimestamp(stat.st_mtime)
                })

        # Sort by creation date (newest first)
        backups.sort(key=lambda x: x['created'], reverse=True)
        return backups

# Example migration template
class SampleMigrations:
    """Sample migration definitions"""

    @staticmethod
    def create_initial_schema():
        """Initial schema migration"""
        return Migration(
            version="001_initial_schema",
            description="Create initial database schema",
            up_sql='''
                -- Create users table
                CREATE TABLE users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username VARCHAR(50) UNIQUE NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    first_name VARCHAR(50),
                    last_name VARCHAR(50),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );

                -- Create posts table
                CREATE TABLE posts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title VARCHAR(200) NOT NULL,
                    content TEXT,
                    author_id INTEGER NOT NULL,
                    status VARCHAR(20) DEFAULT 'draft',
                    view_count INTEGER DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (author_id) REFERENCES users (id)
                );

                -- Create categories table
                CREATE TABLE categories (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name VARCHAR(100) UNIQUE NOT NULL,
                    slug VARCHAR(100) UNIQUE NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );

                -- Create indexes
                CREATE INDEX idx_users_username ON users(username);
                CREATE INDEX idx_users_email ON users(email);
                CREATE INDEX idx_posts_author_id ON posts(author_id);
                CREATE INDEX idx_posts_status ON posts(status);
                CREATE INDEX idx_posts_created_at ON posts(created_at);
                CREATE INDEX idx_categories_name ON categories(name);
            ''',
            down_sql='''
                DROP INDEX IF EXISTS idx_categories_name;
                DROP INDEX IF EXISTS idx_posts_created_at;
                DROP INDEX IF EXISTS idx_posts_status;
                DROP INDEX IF EXISTS idx_posts_author_id;
                DROP INDEX IF EXISTS idx_users_email;
                DROP INDEX IF EXISTS idx_users_username;
                DROP TABLE IF EXISTS categories;
                DROP TABLE IF EXISTS posts;
                DROP TABLE IF EXISTS users;
            '''
        )

    @staticmethod
    def add_user_profile():
        """Add user profile fields"""
        return Migration(
            version="002_add_user_profile",
            description="Add profile fields to users table",
            up_sql='''
                -- Add new columns to users table
                ALTER TABLE users ADD COLUMN bio TEXT;
                ALTER TABLE users ADD COLUMN avatar_url VARCHAR(255);
                ALTER TABLE users ADD COLUMN birth_date DATE;
                ALTER TABLE users ADD COLUMN phone VARCHAR(20);
                ALTER TABLE users ADD COLUMN address TEXT;
                ALTER TABLE users ADD COLUMN website VARCHAR(255);

                -- Add is_verified column
                ALTER TABLE users ADD COLUMN is_verified BOOLEAN DEFAULT 0;
                ALTER TABLE users ADD COLUMN email_verified_at TIMESTAMP;
            ''',
            down_sql='''
                -- Remove columns from users table
                ALTER TABLE users DROP COLUMN IF EXISTS email_verified_at;
                ALTER TABLE users DROP COLUMN IF EXISTS is_verified;
                ALTER TABLE users DROP COLUMN IF EXISTS website;
                ALTER TABLE DROP COLUMN IF EXISTS address;
                ALTER TABLE DROP COLUMN IF EXISTS phone;
                ALTER TABLE DROP COLUMN IF EXISTS birth_date;
                ALTER TABLE DROP COLUMN IF EXISTS avatar_url;
                ALTER TABLE DROP COLUMN IF EXISTS bio;
            '''
        )

# Example usage
def main():
    # Initialize migration system
    migration_system = MigrationSystem('blog.db', 'migrations')

    # Connect to database
    if migration_system.connect():
        try:
            # Create initial migration if it doesn't exist
            initial_migration = SampleMigrations.create_initial_schema()
            all_migrations = migration_system.load_migrations()

            if not all_migrations:
                initial_migration = migration_system.create_migration(
                    version="001_initial_schema",
                    description="Create initial database schema",
                    up_sql='''
                        CREATE TABLE users (
                            id INTEGER PRIMARY KEY AUTOINCREMENT,
                            username VARCHAR(50) UNIQUE NOT NULL,
                            email VARCHAR(100) UNIQUE NOT NULL,
                            password_hash VARCHAR(255) NOT NULL,
                            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                        );
                    ''',
                    down_sql='''
                        DROP TABLE IF EXISTS users;
                    '''
                )
                logger.info("Created initial migration file")

            # Get migration status
            status = migration_system.get_migration_status()
            print("\n📊 Migration Status:")
            print(f"   Total Migrations: {status['total_migrations']}")
            print(f"   Applied: {status['applied_migrations']}")
            print(f"   Pending: {status['pending_migrations']}")
            print(f"   Current Version: {status['current_version']}")
            print(f"   Latest Version: {status['latest_version']}")

            # Migrate up to latest
            print("\n🔄 Running migrations...")
            if migration_system.migrate_up():
                print("✅ All migrations applied successfully")
            else:
                print("❌ Migration failed")

            # Create backup after migration
            backup_manager = BackupManager('blog.db')
            print("\n💾 Creating backup...")
            backup_path = backup_manager.create_backup()

            # List available backups
            print("\n📋 Available Backups:")
            backups = backup_manager.list_backups()
            for backup in backups:
                print(f"   {backup['filename']} ({backup['size']} bytes, created {backup['created']})")

        except Exception as e:
            logger.error(f"Error in migration system: {e}")

        finally:
            # Disconnect from database
            migration_system.disconnect()

if __name__ == "__main__":
    main()