🎯 Ejemplos recomendados
Balanced sample collections from various categories for you to explore
Ejemplos SQLite Database
Ejemplos de base de datos SQLite incluyendo operaciones básicas, consultas avanzadas, migraciones y gestión
💻 Operaciones SQLite Básicas python
🟢 simple
⭐
Operaciones CRUD fundamentales, creación de tablas y consultas básicas
⏱️ 20 min
🏷️ sqlite, database, crud, python
Prerequisites:
Python 3.6+, Basic SQL knowledge
# SQLite Basic Operations
# Python - basic_operations.py
import sqlite3
import os
from datetime import datetime
import json
class SQLiteBasicOperations:
def __init__(self, db_path='sample.db'):
self.db_path = db_path
self.connection = None
self.cursor = None
def connect(self):
"""Connect to SQLite database"""
try:
self.connection = sqlite3.connect(self.db_path)
self.cursor = self.connection.cursor()
# Enable foreign keys
self.cursor.execute("PRAGMA foreign_keys = ON")
# Set row factory to return dictionaries
self.connection.row_factory = sqlite3.Row
print(f"✅ Connected to database: {self.db_path}")
return True
except Exception as e:
print(f"❌ Error connecting to database: {e}")
return False
def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
print(f"✅ Disconnected from database: {self.db_path}")
def create_users_table(self):
"""Create users table"""
try:
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
date_of_birth DATE,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.connection.commit()
print("✅ Users table created successfully")
return True
except Exception as e:
print(f"❌ Error creating users table: {e}")
return False
def create_posts_table(self):
"""Create posts table with foreign key"""
try:
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title VARCHAR(200) NOT NULL,
content TEXT,
category VARCHAR(50),
tags TEXT,
view_count INTEGER DEFAULT 0,
is_published BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
self.connection.commit()
print("✅ Posts table created successfully")
return True
except Exception as e:
print(f"❌ Error creating posts table: {e}")
return False
def insert_user(self, username, email, password_hash, first_name=None, last_name=None, date_of_birth=None):
"""Insert a new user"""
try:
self.cursor.execute('''
INSERT INTO users (username, email, password_hash, first_name, last_name, date_of_birth)
VALUES (?, ?, ?, ?, ?, ?)
''', (username, email, password_hash, first_name, last_name, date_of_birth))
user_id = self.cursor.lastrowid
self.connection.commit()
print(f"✅ User '{username}' created with ID: {user_id}")
return user_id
except sqlite3.IntegrityError as e:
print(f"❌ Error inserting user (duplicate data): {e}")
return None
except Exception as e:
print(f"❌ Error inserting user: {e}")
return None
def get_user(self, user_id):
"""Get user by ID"""
try:
self.cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
user = self.cursor.fetchone()
if user:
print(f"✅ Found user: {dict(user)}")
return dict(user)
else:
print(f"❌ User with ID {user_id} not found")
return None
except Exception as e:
print(f"❌ Error getting user: {e}")
return None
def update_user(self, user_id, **kwargs):
"""Update user information"""
if not kwargs:
print("❌ No fields to update")
return False
try:
# Build SET clause dynamically
set_clause = ", ".join([f"{key} = ?" for key in kwargs.keys()])
values = list(kwargs.values()) + [user_id]
# Add updated_at timestamp
set_clause += ", updated_at = CURRENT_TIMESTAMP"
query = f"UPDATE users SET {set_clause} WHERE id = ?"
self.cursor.execute(query, values)
if self.cursor.rowcount > 0:
self.connection.commit()
print(f"✅ User {user_id} updated successfully")
return True
else:
print(f"❌ No changes made to user {user_id}")
return False
except Exception as e:
print(f"❌ Error updating user: {e}")
return False
def delete_user(self, user_id):
"""Delete a user (cascade deletes related posts)"""
try:
self.cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
if self.cursor.rowcount > 0:
self.connection.commit()
print(f"✅ User {user_id} deleted successfully")
return True
else:
print(f"❌ User {user_id} not found")
return False
except Exception as e:
print(f"❌ Error deleting user: {e}")
return False
def list_users(self, active_only=True, limit=None):
"""List users with optional filters"""
try:
query = "SELECT * FROM users"
params = []
if active_only:
query += " WHERE is_active = 1"
query += " ORDER BY created_at DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
self.cursor.execute(query, params)
users = self.cursor.fetchall()
print(f"✅ Found {len(users)} users:")
for user in users:
user_dict = dict(user)
print(f" ID: {user_dict['id']}, Username: {user_dict['username']}, Email: {user_dict['email']}")
return [dict(user) for user in users]
except Exception as e:
print(f"❌ Error listing users: {e}")
return []
def insert_post(self, user_id, title, content=None, category=None, tags=None):
"""Insert a new post"""
try:
# Convert tags list to JSON string
tags_json = json.dumps(tags) if tags else None
self.cursor.execute('''
INSERT INTO posts (user_id, title, content, category, tags)
VALUES (?, ?, ?, ?, ?)
''', (user_id, title, content, category, tags_json))
post_id = self.cursor.lastrowid
self.connection.commit()
print(f"✅ Post '{title}' created with ID: {post_id}")
return post_id
except Exception as e:
print(f"❌ Error inserting post: {e}")
return None
def get_user_posts(self, user_id, published_only=False):
"""Get all posts by a user"""
try:
query = '''
SELECT p.*, u.username as author_username
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE p.user_id = ?
'''
params = [user_id]
if published_only:
query += " AND p.is_published = 1"
query += " ORDER BY p.created_at DESC"
self.cursor.execute(query, params)
posts = self.cursor.fetchall()
print(f"✅ Found {len(posts)} posts by user {user_id}:")
for post in posts:
post_dict = dict(post)
# Parse tags from JSON
if post_dict['tags']:
try:
post_dict['tags'] = json.loads(post_dict['tags'])
except:
post_dict['tags'] = []
print(f" ID: {post_dict['id']}, Title: {post_dict['title']}")
return [dict(post) for post in posts]
except Exception as e:
print(f"❌ Error getting user posts: {e}")
return []
def search_posts(self, keyword, category=None, limit=None):
"""Search posts by keyword and/or category"""
try:
query = '''
SELECT p.*, u.username as author_username
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE (p.title LIKE ? OR p.content LIKE ?)
'''
params = [f'%{keyword}%', f'%{keyword}%']
if category:
query += " AND p.category = ?"
params.append(category)
query += " ORDER BY p.created_at DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
self.cursor.execute(query, params)
posts = self.cursor.fetchall()
print(f"✅ Found {len(posts)} posts matching '{keyword}':")
for post in posts:
post_dict = dict(post)
print(f" ID: {post_dict['id']}, Title: {post_dict['title']}, Author: {post_dict['author_username']}")
return [dict(post) for post in posts]
except Exception as e:
print(f"❌ Error searching posts: {e}")
return []
def get_database_info(self):
"""Get database information and statistics"""
try:
info = {}
# Get database size
size_bytes = os.path.getsize(self.db_path)
info['size_bytes'] = size_bytes
info['size_mb'] = round(size_bytes / (1024 * 1024), 2)
# Get table info
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in self.cursor.fetchall()]
info['tables'] = tables
# Get record counts
info['record_counts'] = {}
for table in tables:
self.cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = self.cursor.fetchone()[0]
info['record_counts'][table] = count
print("📊 Database Information:")
print(f" Size: {info['size_mb']} MB")
print(f" Tables: {', '.join(info['tables'])}")
print(f" Record counts: {info['record_counts']}")
return info
except Exception as e:
print(f"❌ Error getting database info: {e}")
return None
def backup_database(self, backup_path=None):
"""Create a backup of the database"""
try:
if not backup_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"backup_{timestamp}.db"
# Close current connection
original_connection = self.connection
self.disconnect()
# Copy database file
import shutil
shutil.copy2(self.db_path, backup_path)
print(f"✅ Database backed up to: {backup_path}")
# Reconnect
self.connection = original_connection
self.cursor = self.connection.cursor()
return backup_path
except Exception as e:
print(f"❌ Error backing up database: {e}")
return None
# Example usage
def main():
# Initialize database manager
db = SQLiteBasicOperations('blog.db')
# Connect to database
if not db.connect():
return
try:
# Create tables
print("\n📝 Creating tables...")
db.create_users_table()
db.create_posts_table()
# Insert sample users
print("\n👤 Inserting sample users...")
user1_id = db.insert_user(
username='johndoe',
email='[email protected]',
password_hash='hashed_password_1',
first_name='John',
last_name='Doe',
date_of_birth='1990-01-15'
)
user2_id = db.insert_user(
username='janedoe',
email='[email protected]',
password_hash='hashed_password_2',
first_name='Jane',
last_name='Doe',
date_of_birth='1992-05-20'
)
# Insert sample posts
print("\n📝 Inserting sample posts...")
if user1_id:
db.insert_post(
user_id=user1_id,
title='My First Blog Post',
content='This is the content of my first blog post.',
category='Technology',
tags=['python', 'database', 'sqlite']
)
db.insert_post(
user_id=user1_id,
title='SQLite Best Practices',
content='Here are some best practices for using SQLite...',
category='Database',
tags=['sqlite', 'best-practices', 'tutorial']
)
if user2_id:
db.insert_post(
user_id=user2_id,
title='Getting Started with Python',
content='Python is a great programming language for beginners...',
category='Programming',
tags=['python', 'beginner', 'tutorial']
)
# List all users
print("\n👥 Listing all users:")
users = db.list_users()
# Get user details
if user1_id:
print("\n👤 Getting user details:")
user = db.get_user(user1_id)
# Get user posts
if user1_id:
print("\n📝 Getting user posts:")
posts = db.get_user_posts(user1_id)
# Search posts
print("\n🔍 Searching posts:")
search_results = db.search_posts('python')
# Get database info
print("\n📊 Database information:")
db_info = db.get_database_info()
# Create backup
print("\n💾 Creating backup:")
backup_path = db.backup_database()
# Update user
if user1_id:
print("\n✏️ Updating user:")
db.update_user(user1_id, first_name='Johnathan', last_name='Doe-Smith')
# Clean up
print("\n🧹 Cleaning up...")
if user1_id:
db.delete_user(user1_id)
if user2_id:
db.delete_user(user2_id)
finally:
# Disconnect from database
db.disconnect()
if __name__ == "__main__":
main()
💻 Consultas Avanzadas SQLite sql
🟡 intermediate
⭐⭐⭐
Consultas complejas, joins, agregaciones, funciones de ventana y optimización
⏱️ 45 min
🏷️ sqlite, advanced sql, database, queries
Prerequisites:
SQLite 3.x, Intermediate SQL knowledge, Understanding of window functions
-- Advanced SQLite Queries
-- SQL - advanced_queries.sql
-- Sample data setup (run this first)
-- DROP TABLE IF EXISTS employees;
-- DROP TABLE IF EXISTS departments;
-- DROP TABLE IF EXISTS salaries;
-- DROP TABLE IF EXISTS projects;
-- DROP TABLE IF EXISTS project_assignments;
-- Departments table
CREATE TABLE departments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL UNIQUE,
manager_id INTEGER,
budget DECIMAL(12, 2),
location VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Employees table
CREATE TABLE employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
department_id INTEGER,
position VARCHAR(100),
salary DECIMAL(10, 2),
hire_date DATE NOT NULL,
birth_date DATE,
phone VARCHAR(20),
address TEXT,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (department_id) REFERENCES departments (id)
);
-- Salaries table (for tracking salary history)
CREATE TABLE salaries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id INTEGER NOT NULL,
salary DECIMAL(10, 2) NOT NULL,
from_date DATE NOT NULL,
to_date DATE,
FOREIGN KEY (employee_id) REFERENCES employees (id)
);
-- Projects table
CREATE TABLE projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
start_date DATE,
end_date DATE,
budget DECIMAL(12, 2),
status VARCHAR(20) DEFAULT 'planning',
department_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (department_id) REFERENCES departments (id)
);
-- Project assignments table
CREATE TABLE project_assignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id INTEGER NOT NULL,
project_id INTEGER NOT NULL,
role VARCHAR(50),
assignment_date DATE DEFAULT CURRENT_DATE,
hours_allocated INTEGER,
hours_worked INTEGER DEFAULT 0,
FOREIGN KEY (employee_id) REFERENCES employees (id),
FOREIGN KEY (project_id) REFERENCES projects (id),
UNIQUE(employee_id, project_id)
);
-- 1. Complex JOIN Queries
-- Get all employees with their department information
SELECT
e.id,
e.first_name,
e.last_name,
e.email,
e.position,
e.salary,
d.name as department_name,
d.location as department_location,
m.first_name || ' ' || m.last_name as manager_name
FROM employees e
LEFT JOIN departments d ON e.department_id = d.id
LEFT JOIN employees m ON d.manager_id = m.id
WHERE e.is_active = 1
ORDER BY d.name, e.last_name, e.first_name;
-- Get employees with their current salaries
SELECT
e.id,
e.first_name,
e.last_name,
e.position,
s.salary,
s.from_date as salary_effective_date
FROM employees e
JOIN salaries s ON e.id = s.employee_id
WHERE s.to_date IS NULL
ORDER BY s.salary DESC;
-- Get department budget utilization
SELECT
d.name as department_name,
d.budget as allocated_budget,
COUNT(e.id) as employee_count,
SUM(e.salary) as total_salaries,
(d.budget - SUM(e.salary)) as remaining_budget,
ROUND((SUM(e.salary) / d.budget) * 100, 2) as budget_utilization_percent
FROM departments d
LEFT JOIN employees e ON d.id = e.department_id AND e.is_active = 1
GROUP BY d.id, d.name, d.budget
ORDER BY budget_utilization_percent DESC;
-- 2. Window Functions
-- Employee salary ranking within departments
SELECT
first_name,
last_name,
department_id,
salary,
RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) as dept_salary_rank,
LAG(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) as prev_salary,
salary - LAG(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) as salary_diff
FROM employees e
WHERE e.is_active = 1;
-- Running totals for project work hours
SELECT
p.name as project_name,
pa.employee_id,
e.first_name || ' ' || e.last_name as employee_name,
pa.hours_worked,
SUM(pa.hours_worked) OVER (
PARTITION BY pa.project_id
ORDER BY pa.assignment_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_hours
FROM project_assignments pa
JOIN employees e ON pa.employee_id = e.id
JOIN projects p ON pa.project_id = p.id
WHERE p.status = 'in_progress'
ORDER BY p.name, cumulative_hours;
-- Moving averages for salary changes
SELECT
e.first_name,
e.last_name,
s.salary,
s.from_date,
AVG(s.salary) OVER (
PARTITION BY e.id
ORDER BY s.from_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_salary_3_months,
s.salary - AVG(s.salary) OVER (
PARTITION BY e.id
ORDER BY s.from_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as salary_vs_avg_diff
FROM salaries s
JOIN employees e ON s.employee_id = e.id;
-- 3. Subqueries and CTEs
-- Find departments with above average salaries
WITH dept_avg_salaries AS (
SELECT
d.id,
d.name,
AVG(e.salary) as avg_salary
FROM departments d
JOIN employees e ON d.id = e.department_id
WHERE e.is_active = 1
GROUP BY d.id, d.name
),
overall_avg_salary AS (
SELECT AVG(salary) as overall_avg
FROM employees
WHERE is_active = 1
)
SELECT
das.name,
das.avg_salary,
oas.overall_avg,
ROUND((das.avg_salary / oas.overall_avg - 1) * 100, 2) as percent_above_avg
FROM dept_avg_salaries das
CROSS JOIN overall_avg_salary oas
WHERE das.avg_salary > oas.overall_avg
ORDER BY percent_above_avg DESC;
-- Find employees who have completed all assigned projects
WITH employee_projects AS (
SELECT
e.id as employee_id,
e.first_name || ' ' || e.last_name as employee_name,
p.id as project_id,
p.name as project_name,
p.end_date,
pa.hours_allocated,
pa.hours_worked
FROM employees e
JOIN project_assignments pa ON e.id = pa.employee_id
JOIN projects p ON pa.project_id = p.id
),
completed_projects AS (
SELECT
employee_id,
employee_name,
COUNT(*) as total_projects,
SUM(CASE WHEN hours_worked >= hours_allocated THEN 1 ELSE 0 END) as completed_projects
FROM employee_projects
GROUP BY employee_id, employee_name
)
SELECT
employee_name,
total_projects,
completed_projects,
ROUND((completed_projects * 100.0 / total_projects), 2) as completion_rate
FROM completed_projects
WHERE completed_projects = total_projects
ORDER BY completion_rate DESC;
-- 4. Aggregation and Grouping
-- Department performance metrics
SELECT
d.name as department,
COUNT(e.id) as total_employees,
COUNT(CASE WHEN e.hire_date >= date('now', '-1 year') THEN 1 END) as new_hires,
COUNT(CASE WHEN e.is_active = 0 THEN 1 END) as inactive_employees,
AVG(e.salary) as avg_salary,
MAX(e.salary) as max_salary,
MIN(e.salary) as min_salary,
ROUND(AVG(JULIANDAY('now') - JULIANDAY(e.hire_date))) as avg_tenure_days
FROM departments d
LEFT JOIN employees e ON d.id = e.department_id
GROUP BY d.id, d.name
ORDER BY avg_salary DESC;
-- Project timeline analysis
SELECT
p.name as project_name,
p.status,
p.start_date,
p.end_date,
julianday(p.end_date) - julianday(p.start_date) as planned_duration_days,
COUNT(pa.employee_id) as team_size,
SUM(pa.hours_allocated) as total_hours_allocated,
SUM(pa.hours_worked) as total_hours_worked,
CASE
WHEN SUM(pa.hours_worked) >= SUM(pa.hours_allocated) THEN 'On Track'
WHEN julianday('now') > julianday(p.end_date) THEN 'Overdue'
ELSE 'In Progress'
END as status_indicator
FROM projects p
LEFT JOIN project_assignments pa ON p.id = pa.project_id
GROUP BY p.id, p.name, p.status, p.start_date, p.end_date
ORDER BY status_indicator, p.end_date;
-- 5. Recursive Queries (Hierarchical Data)
-- Department hierarchy (assuming self-referencing structure)
WITH RECURSIVE dept_hierarchy AS (
-- Base case: top-level departments (no manager)
SELECT
id,
name,
manager_id,
0 as level,
name as hierarchy_path
FROM departments
WHERE manager_id IS NULL
UNION ALL
-- Recursive case: departments with managers
SELECT
d.id,
d.name,
d.manager_id,
dh.level + 1,
dh.hierarchy_path || ' > ' || d.name
FROM departments d
JOIN dept_hierarchy dh ON d.manager_id = dh.id
)
SELECT
level,
name,
hierarchy_path,
manager_id
FROM dept_hierarchy
ORDER BY level, hierarchy_path;
-- 6. Performance Optimization Queries
-- Create indexes for better performance
CREATE INDEX IF NOT EXISTS idx_employees_department_id ON employees(department_id);
CREATE INDEX IF NOT EXISTS idx_employees_hire_date ON employees(hire_date);
CREATE INDEX IF NOT EXISTS idx_employees_active ON employees(is_active);
CREATE INDEX IF NOT EXISTS idx_salaries_employee_id ON salaries(employee_id);
CREATE INDEX IF NOT EXISTS idx_salaries_from_date ON salaries(from_date);
CREATE INDEX IF NOT EXISTS idx_project_assignments_project_id ON project_assignments(project_id);
-- Analyze query performance
EXPLAIN QUERY PLAN
SELECT e.first_name, e.last_name, d.name as department
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.salary > 50000
ORDER BY e.salary DESC;
-- Get slow queries
SELECT
sql,
count(*) as execution_count,
avg(time) as avg_time,
max(time) as max_time
FROM sqlite_master
WHERE type = 'table'
AND sql IS NOT NULL
GROUP BY sql
ORDER BY avg_time DESC
LIMIT 10;
-- 7. Date and Time Functions
-- Employee age analysis
SELECT
first_name,
last_name,
birth_date,
ROUND((julianday('now') - julianday(birth_date)) / 365.25, 1) as age,
ROUND((julianday('now') - julianday(hire_date)) / 365.25, 1) as years_of_service,
ROUND(
(julianday('now') - julianday(hire_date)) /
(julianday('now') - julianday(birth_date)) * 100,
1
) as service_life_percentage
FROM employees
WHERE is_active = 1
ORDER BY years_of_service DESC;
-- Monthly hiring trends
SELECT
strftime('%Y-%m', hire_date) as hire_month,
COUNT(*) as hires_count,
AVG(salary) as avg_salary_hired
FROM employees
GROUP BY strftime('%Y-%m', hire_date)
ORDER BY hire_month DESC
LIMIT 24;
-- 8. Conditional Logic and Case Statements
-- Employee performance categorization
SELECT
first_name,
last_name,
salary,
CASE
WHEN salary < 30000 THEN 'Entry Level'
WHEN salary < 60000 THEN 'Junior'
WHEN salary < 100000 THEN 'Mid Level'
WHEN salary < 150000 THEN 'Senior'
ELSE 'Executive'
END as salary_level,
CASE
WHEN julianday('now') - julianday(hire_date) < 365 THEN 'New Hire'
WHEN julianday('now') - julianday(hire_date) < 1825 THEN 'Experienced'
ELSE 'Veteran'
END as experience_level,
CASE
WHEN julianday('now') - julianday(hire_date) < 365 AND salary < 50000 THEN 'Underpaid New Hire'
WHEN julianday('now') - julianday(hire_date) >= 1825 AND salary < 60000 THEN 'Underpaid Veteran'
WHEN salary > 100000 AND julianday('now') - julianday(hire_date) < 730 THEN 'Rapid Advancement'
ELSE 'Appropriate Compensation'
END as compensation_analysis
FROM employees
WHERE is_active = 1
ORDER BY salary DESC;
-- 9. String Functions and Pattern Matching
-- Search for employees with specific patterns
SELECT
first_name,
last_name,
email,
phone,
CASE
WHEN email LIKE '%@company.com' THEN 'Internal Email'
WHEN email LIKE '%@gmail.com' THEN 'Gmail User'
WHEN email LIKE '%@yahoo.com' THEN 'Yahoo User'
ELSE 'Other Email Provider'
END as email_category,
CASE
WHEN phone LIKE '%555-%' THEN 'Local Number'
WHEN phone LIKE '1-%' THEN 'US Number'
WHEN phone LIKE '+44-%' THEN 'UK Number'
ELSE 'International Number'
END as phone_category,
LENGTH(first_name) + LENGTH(last_name) as name_length
FROM employees
WHERE (
first_name LIKE '%a%' AND last_name LIKE '%s%' -- Names containing 'a' and 's'
OR email LIKE '%test%' -- Email containing 'test'
OR phone IS NOT NULL
)
ORDER BY name_length DESC;
-- 10. JSON Operations (SQLite 3.38+)
-- Store additional employee metadata as JSON
ALTER TABLE employees ADD COLUMN metadata TEXT;
-- Update metadata with JSON data
UPDATE employees SET metadata = json_object(
'skills', json_array('Python', 'SQL', 'JavaScript'),
'certifications', json_array('AWS Certified', 'Google Cloud'),
'preferences', json_object('remote_work', true, 'team_size', 'small')
) WHERE id = 1;
-- Query JSON data
SELECT
first_name,
last_name,
json_extract(metadata, '$.skills') as skills,
json_array_length(metadata, '$.certifications') as certification_count,
json_extract(metadata, '$.preferences.remote_work') as remote_work_preference
FROM employees
WHERE json_extract(metadata, '$.certifications') IS NOT NULL;
-- 11. Full-Text Search
-- Create FTS virtual table for employee search
CREATE VIRTUAL TABLE employees_fts USING fts5(
first_name,
last_name,
email,
position,
content='employees',
content_rowid='id'
);
-- Enable triggers for FTS
CREATE TRIGGER employees_fts_insert AFTER INSERT ON employees BEGIN
INSERT INTO employees_fts(rowid, first_name, last_name, email, position)
VALUES (new.id, new.first_name, new.last_name, new.email, new.position);
END;
CREATE TRIGGER employees_fts_delete AFTER DELETE ON employees BEGIN
INSERT INTO employees_fts(employees_fts, rowid, first_name, last_name, email, position)
VALUES ('delete', old.id, old.first_name, old.last_name, old.email, old.position);
END;
CREATE TRIGGER employees_fts_update AFTER UPDATE ON employees BEGIN
INSERT INTO employees_fts(employees_fts, rowid, first_name, last_name, email, position)
VALUES ('delete', old.id, old.first_name, old.last_name, old.email, old.position);
INSERT INTO employees_fts(rowid, first_name, last_name, email, position)
VALUES (new.id, new.first_name, new.last_name, new.email, new.position);
END;
-- Full-text search
SELECT
e.first_name,
e.last_name,
e.email,
e.position,
rank
FROM employees_fts efts
JOIN employees e ON efts.rowid = e.id
WHERE employees_fts MATCH 'python OR "software engineer"'
ORDER BY rank DESC
LIMIT 10;
-- 12. Data Analysis and Reporting
-- Generate quarterly reports
SELECT
strftime('%Y', p.start_date) as year,
((strftime('%m', p.start_date) - 1) / 3) + 1 as quarter,
COUNT(*) as projects_started,
SUM(CASE WHEN p.status = 'completed' THEN 1 ELSE 0 END) as projects_completed,
SUM(p.budget) as total_budget,
AVG(p.budget) as avg_budget,
SUM(p.budget) * 0.1 as estimated_profit
FROM projects p
WHERE p.start_date >= date('now', '-2 years')
GROUP BY strftime('%Y', p.start_date),
((strftime('%m', p.start_date) - 1) / 3) + 1
ORDER BY year DESC, quarter DESC;
💻 SQLite con SQLAlchemy ORM python
🟡 intermediate
⭐⭐⭐⭐
Mapeo Objeto-Relacional con SQLAlchemy para operaciones de base de datos Python
⏱️ 40 min
🏷️ sqlite, sqlalchemy, orm, python, database
Prerequisites:
Python 3.6+, Basic SQL knowledge, Object-oriented programming
# SQLite with SQLAlchemy ORM
# Python - sqlalchemy_orm.py
from sqlalchemy import (
create_engine, Column, Integer, String, Text, Boolean,
DateTime, Float, ForeignKey, Table, MetaData,
event, inspect
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, scoped_session
from sqlalchemy.sql import func
from sqlalchemy.schema import FetchedValue
from datetime import datetime, date
import json
# Create base class for declarative models
Base = declarative_base()
# Association table for many-to-many relationship (posts and tags)
post_tags = Table(
'post_tags',
Base.metadata,
Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)
class User(Base):
"""User model"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(100), unique=True, nullable=False, index=True)
password_hash = Column(String(255), nullable=False)
first_name = Column(String(50))
last_name = Column(String(50))
bio = Column(Text)
avatar_url = Column(String(255))
birth_date = Column(Date)
location = Column(String(100))
website = Column(String(255))
is_active = Column(Boolean, default=True)
is_verified = Column(Boolean, default=False)
email_verified_at = Column(DateTime)
last_login_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan")
followers = relationship("Follower", foreign_keys="Follower.following_id", back_populates="following")
following = relationship("Follower", foreign_keys="Follower.follower_id", back_populates="follower")
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
class Post(Base):
"""Post model"""
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False, index=True)
slug = Column(String(200), unique=True, nullable=False, index=True)
content = Column(Text, nullable=False)
excerpt = Column(Text)
status = Column(String(20), default='draft', index=True) # draft, published, archived
comment_status = Column(String(20), default='open') # open, closed
password = Column(String(255)) # For password-protected posts
featured_image = Column(String(255))
view_count = Column(Integer, default=0)
like_count = Column(Integer, default=0)
comment_count = Column(Integer, default=0)
published_at = Column(DateTime, index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Foreign keys
author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
category_id = Column(Integer, ForeignKey('categories.id'))
# Relationships
author = relationship("User", back_populates="posts")
category = relationship("Category", back_populates="posts")
comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
tags = relationship("Tag", secondary=post_tags, back_populates="posts")
def __repr__(self):
return f"<Post(id={self.id}, title='{self.title}', status='{self.status}')>"
class Category(Base):
"""Category model"""
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False, index=True)
slug = Column(String(50), unique=True, nullable=False, index=True)
description = Column(Text)
parent_id = Column(Integer, ForeignKey('categories.id'))
color = Column(String(7)) # Hex color code
post_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Self-referential relationship for hierarchical categories
parent = relationship("Category", remote_side=[id], back_populates="children")
children = relationship("Category", back_populates="parent")
# Relationships
posts = relationship("Post", back_populates="category")
def __repr__(self):
return f"<Category(id={self.id}, name='{self.name}')>"
class Tag(Base):
"""Tag model"""
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False, index=True)
slug = Column(String(50), unique=True, nullable=False, index=True)
description = Column(Text)
color = Column(String(7)) # Hex color code
post_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
posts = relationship("Post", secondary=post_tags, back_populates="tags")
def __repr__(self):
return f"<Tag(id={self.id}, name='{self.name}')>"
class Comment(Base):
"""Comment model"""
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
content = Column(Text, nullable=False)
author_name = Column(String(100)) # For guest comments
author_email = Column(String(100)) # For guest comments
author_ip = Column(String(45)) # For guest comments
is_approved = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Foreign keys
post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
author_id = Column(Integer, ForeignKey('users.id')) # null for guest comments
parent_id = Column(Integer, ForeignKey('comments.id')) # For nested comments
# Relationships
post = relationship("Post", back_populates="comments")
author = relationship("User", back_populates="comments")
parent = relationship("Comment", back_populates="children")
children = relationship("Comment", back_populates="parent")
def __repr__(self):
return f"<Comment(id={self.id}, post_id={self.post_id}, author_id={self.author_id})>"
class Follower(Base):
"""Follower model for user relationships"""
__tablename__ = 'followers'
id = Column(Integer, primary_key=True)
follower_id = Column(Integer, ForeignKey('users.id'), nullable=False)
following_id = Column(Integer, ForeignKey('users.id'), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
follower = relationship("User", foreign_keys=[follower_id], back_populates="following")
following = relationship("User", foreign_keys=[following_id], back_populates="followers")
# Ensure no duplicate follows
__table_args__ = (Index('idx_follower_following', 'follower_id', 'following_id', unique=True),)
class DatabaseManager:
"""Database manager class for SQLAlchemy operations"""
def __init__(self, db_url='sqlite:///blog.db'):
self.db_url = db_url
self.engine = None
self.Session = None
self.session = None
def connect(self):
"""Create database engine and session"""
try:
self.engine = create_engine(
self.db_url,
echo=False, # Set to True for SQL logging
pool_pre_ping=True
)
# Create session factory
self.Session = sessionmaker(bind=self.engine)
self.session = scoped_session(self.Session)
print(f"✅ Connected to database: {self.db_url}")
return True
except Exception as e:
print(f"❌ Error connecting to database: {e}")
return False
def create_tables(self):
"""Create all tables"""
try:
Base.metadata.create_all(self.engine)
print("✅ Database tables created successfully")
return True
except Exception as e:
print(f"❌ Error creating tables: {e}")
return False
def drop_tables(self):
"""Drop all tables"""
try:
Base.metadata.drop_all(self.engine)
print("✅ Database tables dropped successfully")
return True
except Exception as e:
print(f"❌ Error dropping tables: {e}")
return False
def get_session(self):
"""Get database session"""
if not self.session:
self.connect()
return self.session()
def commit_and_close(self):
"""Commit current session and close"""
if self.session:
try:
self.session.commit()
self.session.close()
print("✅ Session committed and closed")
except Exception as e:
print(f"❌ Error committing session: {e}")
self.session.rollback()
self.session.close()
self.session = None
class UserService:
"""Service class for user operations"""
def __init__(self, db_manager):
self.db = db_manager
self.session = db_manager.get_session()
def create_user(self, username, email, password_hash, **kwargs):
"""Create a new user"""
try:
user = User(
username=username,
email=email,
password_hash=password_hash,
**kwargs
)
self.session.add(user)
self.session.commit()
self.session.refresh(user)
print(f"✅ User '{username}' created with ID: {user.id}")
return user
except Exception as e:
self.session.rollback()
print(f"❌ Error creating user: {e}")
return None
def get_user_by_id(self, user_id):
"""Get user by ID"""
try:
user = self.session.query(User).filter(User.id == user_id).first()
if user:
print(f"✅ Found user: {user.username}")
else:
print(f"❌ User with ID {user_id} not found")
return user
except Exception as e:
print(f"❌ Error getting user: {e}")
return None
def get_user_by_username(self, username):
"""Get user by username"""
try:
user = self.session.query(User).filter(User.username == username).first()
return user
except Exception as e:
print(f"❌ Error getting user by username: {e}")
return None
def get_user_by_email(self, email):
"""Get user by email"""
try:
user = self.session.query(User).filter(User.email == email).first()
return user
except Exception as e:
print(f"❌ Error getting user by email: {e}")
return None
def update_user(self, user_id, **kwargs):
"""Update user information"""
try:
user = self.get_user_by_id(user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
user.updated_at = datetime.utcnow()
self.session.commit()
print(f"✅ User {user.username} updated successfully")
return user
except Exception as e:
self.session.rollback()
print(f"❌ Error updating user: {e}")
return None
def delete_user(self, user_id):
"""Delete a user"""
try:
user = self.get_user_by_id(user_id)
if not user:
return False
self.session.delete(user)
self.session.commit()
print(f"✅ User {user.username} deleted successfully")
return True
except Exception as e:
self.session.rollback()
print(f"❌ Error deleting user: {e}")
return False
def get_users(self, active_only=True, limit=None, offset=None):
"""Get list of users"""
try:
query = self.session.query(User)
if active_only:
query = query.filter(User.is_active == True)
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
users = query.all()
print(f"✅ Found {len(users)} users")
return users
except Exception as e:
print(f"❌ Error getting users: {e}")
return []
def search_users(self, query, active_only=True):
"""Search users by name or email"""
try:
search_pattern = f"%{query}%"
db_query = self.session.query(User).filter(
(User.first_name.ilike(search_pattern) |
User.last_name.ilike(search_pattern) |
User.email.ilike(search_pattern))
)
if active_only:
db_query = db_query.filter(User.is_active == True)
users = db_query.all()
print(f"✅ Found {len(users)} users matching '{query}'")
return users
except Exception as e:
print(f"❌ Error searching users: {e}")
return []
class PostService:
"""Service class for post operations"""
def __init__(self, db_manager):
self.db = db_manager
self.session = db_manager.get_session()
def create_post(self, title, content, author_id, **kwargs):
"""Create a new post"""
try:
post = Post(
title=title,
content=content,
author_id=author_id,
**kwargs
)
self.session.add(post)
self.session.commit()
self.session.refresh(post)
print(f"✅ Post '{title}' created with ID: {post.id}")
return post
except Exception as e:
self.session.rollback()
print(f"❌ Error creating post: {e}")
return None
def get_post_by_id(self, post_id):
"""Get post by ID"""
try:
post = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags),
joinedload(Post.comments)
).filter(Post.id == post_id).first()
if post:
print(f"✅ Found post: {post.title}")
else:
print(f"❌ Post with ID {post_id} not found")
return post
except Exception as e:
print(f"❌ Error getting post: {e}")
return None
def get_posts(self, status=None, category_id=None, author_id=None, limit=None, offset=None):
"""Get posts with filters"""
try:
query = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags),
joinedload(Post.comments)
)
if status:
query = query.filter(Post.status == status)
if category_id:
query = query.filter(Post.category_id == category_id)
if author_id:
query = query.filter(Post.author_id == author_id)
query = query.order_by(Post.created_at.desc())
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
posts = query.all()
print(f"✅ Found {len(posts)} posts")
return posts
except Exception as e:
print(f"❌ Error getting posts: {e}")
return []
def update_post(self, post_id, **kwargs):
"""Update post"""
try:
post = self.get_post_by_id(post_id)
if not post:
return None
for key, value in kwargs.items():
if hasattr(post, key):
setattr(post, key, value)
post.updated_at = datetime.utcnow()
self.session.commit()
print(f"✅ Post '{post.title}' updated successfully")
return post
except Exception as e:
self.session.rollback()
print(f"❌ Error updating post: {e}")
return None
def delete_post(self, post_id):
"""Delete a post"""
try:
post = self.get_post_by_id(post_id)
if not post:
return False
self.session.delete(post)
self.session.commit()
print(f"✅ Post '{post.title}' deleted successfully")
return True
except Exception as e:
self.session.rollback()
print(f"❌ Error deleting post: {e}")
return False
def get_popular_posts(self, limit=10):
"""Get popular posts based on view count and comment count"""
try:
posts = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags)
).filter(
Post.status == 'published'
).order_by(
(Post.view_count + Post.like_count + Post.comment_count).desc()
).limit(limit)
print(f"✅ Found {len(posts)} popular posts")
return posts
except Exception as e:
print(f"❌ Error getting popular posts: {e}")
return []
def search_posts(self, query, limit=None):
"""Search posts by title, content, or tags"""
try:
search_pattern = f"%{query}%"
db_query = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags)
).filter(
(Post.title.ilike(search_pattern) |
Post.content.ilike(search_pattern))
)
# Also search in tags
tag_query = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags)
).join(Post.tags).filter(
Tag.name.ilike(search_pattern)
)
# Combine results
posts = db_query.union(tag_query).distinct().order_by(Post.created_at.desc())
if limit:
posts = posts.limit(limit)
print(f"✅ Found {len(posts.all())} posts matching '{query}'")
return posts.all()
except Exception as e:
print(f"❌ Error searching posts: {e}")
return []
class AnalyticsService:
"""Service class for database analytics"""
def __init__(self, db_manager):
self.db = db_manager
self.session = db_manager.get_session()
def get_user_statistics(self):
"""Get user statistics"""
try:
total_users = self.session.query(User).count()
active_users = self.session.query(User).filter(User.is_active == True).count()
new_users_month = self.session.query(User).filter(
User.created_at >= datetime.utcnow().replace(day=1)
).count()
# User registration by month for the last 6 months
user_registrations = self.session.query(
func.strftime('%Y-%m', User.created_at).label('month'),
func.count(User.id).label('count')
).filter(
User.created_at >= datetime.utcnow().replace(month=datetime.utcnow().month - 5, day=1)
).group_by(func.strftime('%Y-%m', User.created_at)).all()
stats = {
'total_users': total_users,
'active_users': active_users,
'new_users_month': new_users_month,
'user_registrations': user_registrations
}
print("📊 User Statistics:")
print(f" Total Users: {total_users}")
print(f" Active Users: {active_users}")
print(f" New Users (This Month): {new_users_month}")
return stats
except Exception as e:
print(f"❌ Error getting user statistics: {e}")
return None
def get_post_statistics(self):
"""Get post statistics"""
try:
total_posts = self.session.query(Post).count()
published_posts = self.session.query(Post).filter(Post.status == 'published').count()
draft_posts = self.session.query(Post).filter(Post.status == 'draft').count()
# Posts by month for the last 6 months
posts_by_month = self.session.query(
func.strftime('%Y-%m', Post.created_at).label('month'),
func.count(Post.id).label('count'),
func.sum(Post.view_count).label('total_views')
).filter(
Post.created_at >= datetime.utcnow().replace(month=datetime.utcnow().month - 5, day=1)
).group_by(func.strftime('%Y-%m', Post.created_at)).all()
# Category distribution
category_distribution = self.session.query(
Category.name,
func.count(Post.id).label('post_count')
).join(Post).group_by(Category.name).all()
# Top posts
top_posts = self.session.query(
Post.title,
Post.view_count,
Post.like_count,
Post.comment_count
).filter(Post.status == 'published').order_by(
Post.view_count.desc()
).limit(10).all()
stats = {
'total_posts': total_posts,
'published_posts': published_posts,
'draft_posts': draft_posts,
'posts_by_month': posts_by_month,
'category_distribution': category_distribution,
'top_posts': top_posts
}
print("📊 Post Statistics:")
print(f" Total Posts: {total_posts}")
print(f" Published: {published_posts}")
print(f" Drafts: {draft_posts}")
return stats
except Exception as e:
print(f"❌ Error getting post statistics: {e}")
return None
def get_author_analytics(self, author_id):
"""Get analytics for a specific author"""
try:
author = self.session.query(User).filter(User.id == author_id).first()
if not author:
return None
total_posts = self.session.query(Post).filter(Post.author_id == author_id).count()
published_posts = self.session.query(Post).filter(
Post.author_id == author_id,
Post.status == 'published'
).count()
total_views = self.session.query(func.sum(Post.view_count)).filter(
Post.author_id == author_id
).scalar() or 0
total_comments = self.session.query(func.sum(Post.comment_count)).filter(
Post.author_id == author_id
).scalar() or 0
# Follower count
follower_count = self.session.query(Follower).filter(
Follower.following_id == author_id
).count()
following_count = self.session.query(Follower).filter(
Follower.follower_id == author_id
).count()
analytics = {
'author': author,
'total_posts': total_posts,
'published_posts': published_posts,
'total_views': total_views,
'total_comments': total_comments,
'follower_count': follower_count,
'following_count': following_count,
'engagement_rate': (total_views + total_comments * 2) / total_posts if total_posts > 0 else 0
}
print(f"📊 Analytics for {author.username}:")
print(f" Posts: {total_posts} ({published_posts} published)")
print(f" Total Views: {total_views}")
print(f" Total Comments: {total_comments}")
print(f" Followers: {follower_count}")
print(f" Following: {following_count}")
print(f" Engagement Rate: {analytics['engagement_rate']:.2f}")
return analytics
except Exception as e:
print(f"❌ Error getting author analytics: {e}")
return None
# Example usage
def main():
# Initialize database
db_manager = DatabaseManager()
# Connect and create tables
if db_manager.connect():
db_manager.create_tables()
# Create service instances
user_service = UserService(db_manager)
post_service = PostService(db_manager)
analytics_service = AnalyticsService(db_manager)
try:
# Create sample users
print("\n👤 Creating sample users...")
user1 = user_service.create_user(
username='johndoe',
email='[email protected]',
password_hash='hashed_password_1',
first_name='John',
last_name='Doe',
bio='Software developer and tech enthusiast.',
location='San Francisco, CA'
)
user2 = user_service.create_user(
username='janedoe',
email='[email protected]',
password_hash='hashed_password_2',
first_name='Jane',
last_name='Doe',
bio='Data scientist and AI researcher.',
location='New York, NY'
)
# Create sample posts
print("\n📝 Creating sample posts...")
if user1 and user2:
post1 = post_service.create_post(
title='Getting Started with SQLite',
content='SQLite is a lightweight, serverless database engine...',
author_id=user1.id,
status='published',
view_count=150,
like_count=25,
comment_count=8,
published_at=datetime(2024, 1, 15)
)
post2 = post_service.create_post(
title='Python Best Practices',
content='Here are some best practices for Python development...',
author_id=user1.id,
status='published',
view_count=300,
like_count=45,
comment_count=12,
published_at=datetime(2024, 2, 1)
)
post3 = post_service.create_post(
title='Machine Learning Fundamentals',
content='Machine learning is a subset of artificial intelligence...',
author_id=user2.id,
status='published',
view_count=500,
like_count=75,
comment_count=20,
published_at=datetime(2024, 1, 20)
)
# Get analytics
print("\n📊 Generating analytics...")
user_stats = analytics_service.get_user_statistics()
post_stats = analytics_service.get_post_statistics()
# Get author analytics
if user1:
author_analytics = analytics_service.get_author_analytics(user1.id)
# Search examples
print("\n🔍 Search examples...")
search_users = user_service.search_users('john')
search_posts = post_service.search_posts('python')
# Get popular posts
print("\n⭐ Popular posts:")
popular_posts = post_service.get_popular_posts(5)
except Exception as e:
print(f"❌ Error in main execution: {e}")
finally:
# Close database connection
db_manager.commit_and_close()
if __name__ == "__main__":
main()
💻 Scripts de Migración de Base de Datos python
🔴 complex
⭐⭐⭐⭐
Sistema de migración automatizada y versionamiento de esquemas
⏱️ 50 min
🏷️ sqlite, migrations, versioning, database
Prerequisites:
Python 3.6+, SQLite knowledge, Database concepts
# SQLite Migration System
# Python - migration_system.py
import os
import sqlite3
import json
import hashlib
from datetime import datetime
from typing import List, Dict, Any, Optional
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Migration:
"""Represents a single migration"""
def __init__(self, version: str, description: str, up_sql: str, down_sql: str):
self.version = version
self.description = description
self.up_sql = up_sql
self.down_sql = down_sql
self.checksum = self._calculate_checksum(up_sql + down_sql)
def _calculate_checksum(self, content: str) -> str:
"""Calculate checksum for migration content"""
return hashlib.sha256(content.encode()).hexdigest()
def validate(self):
"""Validate migration integrity"""
current_checksum = self._calculate_checksum(self.up_sql + self.down_sql)
return current_checksum == self.checksum
class MigrationSystem:
"""Database migration system"""
def __init__(self, db_path: str = 'database.db', migrations_dir: str = 'migrations'):
self.db_path = db_path
self.migrations_dir = migrations_dir
self.connection = None
self.cursor = None
# Ensure migrations directory exists
os.makedirs(migrations_dir, exist_ok=True)
# Create migration lock table
self.migrations_table = '''
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(50) PRIMARY KEY,
description TEXT,
checksum VARCHAR(64),
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
rollback_sql TEXT
);
'''
def connect(self):
"""Connect to database"""
try:
self.connection = sqlite3.connect(self.db_path)
self.cursor = self.connection.cursor()
# Enable foreign keys
self.cursor.execute("PRAGMA foreign_keys = ON")
# Row factory for dictionaries
self.connection.row_factory = sqlite3.Row
# Create migration lock table
self.cursor.executescript(self.migrations_table)
self.connection.commit()
logger.info(f"Connected to database: {self.db_path}")
return True
except Exception as e:
logger.error(f"Error connecting to database: {e}")
return False
def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
logger.info("Disconnected from database")
def lock_database(self):
"""Acquire exclusive database lock for migration"""
try:
# SQLite's BEGIN IMMEDIATE starts a transaction and immediately acquires a write lock
self.cursor.execute("BEGIN IMMEDIATE")
logger.debug("Database locked for migration")
return True
except Exception as e:
logger.error(f"Error locking database: {e}")
return False
def unlock_database(self):
"""Release database lock"""
try:
self.connection.commit()
logger.debug("Database unlocked")
return True
except Exception as e:
logger.error(f"Error unlocking database: {e}")
self.connection.rollback()
return False
def create_migration(self, version: str, description: str, up_sql: str, down_sql: str) -> Migration:
"""Create a new migration file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{version}.py"
filepath = os.path.join(self.migrations_dir, filename)
migration_content = f'''"""
# Migration: {version}
# Description: {description}
# Created: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
UP_SQL = """
{up_sql.strip()}
"""
DOWN_SQL = """
{down_sql.strip()}
"""
# Import migration base
from migration_system import Migration
# Create migration instance
migration = Migration(
version="{version}",
description="{description}",
up_sql=UP_SQL,
down_sql=DOWN_SQL
)
"""
with open(filepath, 'w') as f:
f.write(migration_content)
logger.info(f"Created migration file: {filename}")
return Migration(version, description, up_sql, down_sql)
def load_migrations(self) -> List[Migration]:
"""Load all migration files"""
migrations = []
if not os.path.exists(self.migrations_dir):
return migrations
for filename in sorted(os.listdir(self.migrations_dir)):
if filename.endswith('.py'):
filepath = os.path.join(self.migrations_dir, filename)
try:
# Load and execute migration file
namespace = {}
with open(filepath, 'r') as f:
exec(f.read(), namespace)
if 'migration' in namespace:
migration = namespace['migration']
migrations.append(migration)
logger.debug(f"Loaded migration: {migration.version}")
except Exception as e:
logger.error(f"Error loading migration {filename}: {e}")
return migrations
def get_applied_migrations(self) -> List[Dict[str, Any]]:
"""Get list of applied migrations"""
try:
self.cursor.execute("SELECT * FROM schema_migrations ORDER BY applied_at")
return [dict(row) for row in self.cursor.fetchall()]
except Exception as e:
logger.error(f"Error getting applied migrations: {e}")
return []
def get_pending_migrations(self) -> List[Migration]:
"""Get list of pending migrations"""
all_migrations = self.load_migrations()
applied_migrations = {m['version'] for m in self.get_applied_migrations()}
pending = []
for migration in all_migrations:
if migration.version not in applied_migrations:
pending.append(migration)
else:
# Validate applied migration checksum
applied = next(m for m in self.get_applied_migrations() if m['version'] == migration.version)
if not migration.validate() or migration.checksum != applied['checksum']:
logger.warning(f"Migration {migration.version} validation failed, marking as pending")
pending.append(migration)
return pending
def apply_migration(self, migration: Migration) -> bool:
"""Apply a single migration"""
logger.info(f"Applying migration: {migration.version} - {migration.description}")
if not self.lock_database():
return False
try:
# Execute migration SQL
if migration.up_sql.strip():
self.cursor.executescript(migration.up_sql)
# Record migration in schema_migrations table
self.cursor.execute("""
INSERT INTO schema_migrations (version, description, checksum, rollback_sql)
VALUES (?, ?, ?, ?)
""", (
migration.version,
migration.description,
migration.checksum,
migration.down_sql
))
self.connection.commit()
logger.info(f"Successfully applied migration: {migration.version}")
return True
except Exception as e:
logger.error(f"Error applying migration {migration.version}: {e}")
self.connection.rollback()
return False
finally:
self.unlock_database()
def rollback_migration(self, migration: Migration) -> bool:
"""Rollback a migration"""
logger.info(f"Rolling back migration: {migration.version} - {migration.description}")
if not self.lock_database():
return False
try:
# Execute rollback SQL
if migration.down_sql.strip():
self.cursor.executescript(migration.down_sql)
# Remove migration from schema_migrations table
self.cursor.execute(
"DELETE FROM schema_migrations WHERE version = ?",
(migration.version,)
)
self.connection.commit()
logger.info(f"Successfully rolled back migration: {migration.version}")
return True
except Exception as e:
logger.error(f"Error rolling back migration {migration.version}: {e}")
self.connection.rollback()
return False
finally:
self.unlock_database()
def migrate_up(self, target_version: Optional[str] = None) -> bool:
"""Migrate up to target version"""
logger.info("Starting migration up...")
pending_migrations = self.get_pending_migrations()
if target_version:
pending_migrations = [m for m in pending_migrations if m.version <= target_version]
if not pending_migrations:
logger.info("No pending migrations to apply")
return True
success = True
for migration in pending_migrations:
if not self.apply_migration(migration):
success = False
break
if success:
logger.info("Migration up completed successfully")
else:
logger.error("Migration up failed")
return success
def migrate_down(self, target_version: str) -> bool:
"""Migrate down to target version"""
logger.info(f"Starting migration down to version: {target_version}")
applied_migrations = self.get_applied_migrations()
# Get migrations to rollback (in reverse order)
migrations_to_rollback = []
for applied in reversed(applied_migrations):
if applied['version'] > target_version:
# Load migration to get rollback SQL
all_migrations = self.load_migrations()
migration = next((m for m in all_migrations if m.version == applied['version']), None)
if migration:
# Set rollback SQL from stored data
migration.down_sql = applied.get('rollback_sql', '')
migrations_to_rollback.append(migration)
if not migrations_to_rollback:
logger.info("No migrations to rollback")
return True
success = True
for migration in migrations_to_rollback:
if not self.rollback_migration(migration):
success = False
break
if success:
logger.info("Migration down completed successfully")
else:
logger.error("Migration down failed")
return success
def get_migration_status(self) -> Dict[str, Any]:
"""Get current migration status"""
try:
applied_migrations = self.get_applied_migrations()
pending_migrations = self.get_pending_migrations()
all_migrations = self.load_migrations()
return {
'total_migrations': len(all_migrations),
'applied_migrations': len(applied_migrations),
'pending_migrations': len(pending_migrations),
'current_version': applied_migrations[-1]['version'] if applied_migrations else None,
'latest_version': all_migrations[-1].version if all_migrations else None,
'applied_list': [m['version'] for m in applied_migrations],
'pending_list': [m.version for m in pending_migrations]
}
except Exception as e:
logger.error(f"Error getting migration status: {e}")
return {}
class BackupManager:
"""Database backup and restore manager"""
def __init__(self, db_path: str, backup_dir: str = 'backups'):
self.db_path = db_path
self.backup_dir = backup_dir
os.makedirs(backup_dir, exist_ok=True)
def create_backup(self, backup_name: Optional[str] = None) -> str:
"""Create a database backup"""
if not backup_name:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}.db"
backup_path = os.path.join(self.backup_dir, backup_name)
try:
# Close any existing connections to avoid database locking issues
import shutil
shutil.copy2(self.db_path, backup_path)
logger.info(f"Created backup: {backup_path}")
return backup_path
except Exception as e:
logger.error(f"Error creating backup: {e}")
return ""
def restore_backup(self, backup_path: str, create_new: bool = False) -> bool:
"""Restore database from backup"""
if not os.path.exists(backup_path):
logger.error(f"Backup file not found: {backup_path}")
return False
try:
import shutil
if create_new:
backup_path_new = f"{backup_path}.restored"
shutil.copy2(backup_path, backup_path_new)
logger.info(f"Created restored database: {backup_path_new}")
else:
shutil.copy2(backup_path, self.db_path)
logger.info(f"Restored database from: {backup_path}")
return True
except Exception as e:
logger.error(f"Error restoring backup: {e}")
return False
def list_backups(self) -> List[Dict[str, Any]]:
"""List all available backups"""
backups = []
if not os.path.exists(self.backup_dir):
return backups
for filename in os.listdir(self.backup_dir):
if filename.endswith('.db'):
filepath = os.path.join(self.backup_dir, filename)
stat = os.stat(filepath)
backups.append({
'filename': filename,
'filepath': filepath,
'size': stat.st_size,
'created': datetime.fromtimestamp(stat.st_ctime),
'modified': datetime.fromtimestamp(stat.st_mtime)
})
# Sort by creation date (newest first)
backups.sort(key=lambda x: x['created'], reverse=True)
return backups
# Example migration template
class SampleMigrations:
"""Sample migration definitions"""
@staticmethod
def create_initial_schema():
"""Initial schema migration"""
return Migration(
version="001_initial_schema",
description="Create initial database schema",
up_sql='''
-- Create users table
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create posts table
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title VARCHAR(200) NOT NULL,
content TEXT,
author_id INTEGER NOT NULL,
status VARCHAR(20) DEFAULT 'draft',
view_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users (id)
);
-- Create categories table
CREATE TABLE categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) UNIQUE NOT NULL,
slug VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create indexes
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_posts_author_id ON posts(author_id);
CREATE INDEX idx_posts_status ON posts(status);
CREATE INDEX idx_posts_created_at ON posts(created_at);
CREATE INDEX idx_categories_name ON categories(name);
''',
down_sql='''
DROP INDEX IF EXISTS idx_categories_name;
DROP INDEX IF EXISTS idx_posts_created_at;
DROP INDEX IF EXISTS idx_posts_status;
DROP INDEX IF EXISTS idx_posts_author_id;
DROP INDEX IF EXISTS idx_users_email;
DROP INDEX IF EXISTS idx_users_username;
DROP TABLE IF EXISTS categories;
DROP TABLE IF EXISTS posts;
DROP TABLE IF EXISTS users;
'''
)
@staticmethod
def add_user_profile():
"""Add user profile fields"""
return Migration(
version="002_add_user_profile",
description="Add profile fields to users table",
up_sql='''
-- Add new columns to users table
ALTER TABLE users ADD COLUMN bio TEXT;
ALTER TABLE users ADD COLUMN avatar_url VARCHAR(255);
ALTER TABLE users ADD COLUMN birth_date DATE;
ALTER TABLE users ADD COLUMN phone VARCHAR(20);
ALTER TABLE users ADD COLUMN address TEXT;
ALTER TABLE users ADD COLUMN website VARCHAR(255);
-- Add is_verified column
ALTER TABLE users ADD COLUMN is_verified BOOLEAN DEFAULT 0;
ALTER TABLE users ADD COLUMN email_verified_at TIMESTAMP;
''',
down_sql='''
-- Remove columns from users table
ALTER TABLE users DROP COLUMN IF EXISTS email_verified_at;
ALTER TABLE users DROP COLUMN IF EXISTS is_verified;
ALTER TABLE users DROP COLUMN IF EXISTS website;
ALTER TABLE DROP COLUMN IF EXISTS address;
ALTER TABLE DROP COLUMN IF EXISTS phone;
ALTER TABLE DROP COLUMN IF EXISTS birth_date;
ALTER TABLE DROP COLUMN IF EXISTS avatar_url;
ALTER TABLE DROP COLUMN IF EXISTS bio;
'''
)
# Example usage
def main():
# Initialize migration system
migration_system = MigrationSystem('blog.db', 'migrations')
# Connect to database
if migration_system.connect():
try:
# Create initial migration if it doesn't exist
initial_migration = SampleMigrations.create_initial_schema()
all_migrations = migration_system.load_migrations()
if not all_migrations:
initial_migration = migration_system.create_migration(
version="001_initial_schema",
description="Create initial database schema",
up_sql='''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''',
down_sql='''
DROP TABLE IF EXISTS users;
'''
)
logger.info("Created initial migration file")
# Get migration status
status = migration_system.get_migration_status()
print("\n📊 Migration Status:")
print(f" Total Migrations: {status['total_migrations']}")
print(f" Applied: {status['applied_migrations']}")
print(f" Pending: {status['pending_migrations']}")
print(f" Current Version: {status['current_version']}")
print(f" Latest Version: {status['latest_version']}")
# Migrate up to latest
print("\n🔄 Running migrations...")
if migration_system.migrate_up():
print("✅ All migrations applied successfully")
else:
print("❌ Migration failed")
# Create backup after migration
backup_manager = BackupManager('blog.db')
print("\n💾 Creating backup...")
backup_path = backup_manager.create_backup()
# List available backups
print("\n📋 Available Backups:")
backups = backup_manager.list_backups()
for backup in backups:
print(f" {backup['filename']} ({backup['size']} bytes, created {backup['created']})")
except Exception as e:
logger.error(f"Error in migration system: {e}")
finally:
# Disconnect from database
migration_system.disconnect()
if __name__ == "__main__":
main()