🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples SQLite Database
Exemples de base de données SQLite incluant les opérations de base, requêtes avancées, migrations et gestion
💻 Opérations SQLite de Base python
🟢 simple
⭐
Opérations CRUD fondamentales, création de tables et requêtes de base
⏱️ 20 min
🏷️ sqlite, database, crud, python
Prerequisites:
Python 3.6+, Basic SQL knowledge
# SQLite Basic Operations
# Python - basic_operations.py
import sqlite3
import os
from datetime import datetime
import json
class SQLiteBasicOperations:
def __init__(self, db_path='sample.db'):
self.db_path = db_path
self.connection = None
self.cursor = None
def connect(self):
"""Connect to SQLite database"""
try:
self.connection = sqlite3.connect(self.db_path)
self.cursor = self.connection.cursor()
# Enable foreign keys
self.cursor.execute("PRAGMA foreign_keys = ON")
# Set row factory to return dictionaries
self.connection.row_factory = sqlite3.Row
print(f"✅ Connected to database: {self.db_path}")
return True
except Exception as e:
print(f"❌ Error connecting to database: {e}")
return False
def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
print(f"✅ Disconnected from database: {self.db_path}")
def create_users_table(self):
"""Create users table"""
try:
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
date_of_birth DATE,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.connection.commit()
print("✅ Users table created successfully")
return True
except Exception as e:
print(f"❌ Error creating users table: {e}")
return False
def create_posts_table(self):
"""Create posts table with foreign key"""
try:
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title VARCHAR(200) NOT NULL,
content TEXT,
category VARCHAR(50),
tags TEXT,
view_count INTEGER DEFAULT 0,
is_published BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
self.connection.commit()
print("✅ Posts table created successfully")
return True
except Exception as e:
print(f"❌ Error creating posts table: {e}")
return False
def insert_user(self, username, email, password_hash, first_name=None, last_name=None, date_of_birth=None):
"""Insert a new user"""
try:
self.cursor.execute('''
INSERT INTO users (username, email, password_hash, first_name, last_name, date_of_birth)
VALUES (?, ?, ?, ?, ?, ?)
''', (username, email, password_hash, first_name, last_name, date_of_birth))
user_id = self.cursor.lastrowid
self.connection.commit()
print(f"✅ User '{username}' created with ID: {user_id}")
return user_id
except sqlite3.IntegrityError as e:
print(f"❌ Error inserting user (duplicate data): {e}")
return None
except Exception as e:
print(f"❌ Error inserting user: {e}")
return None
def get_user(self, user_id):
"""Get user by ID"""
try:
self.cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
user = self.cursor.fetchone()
if user:
print(f"✅ Found user: {dict(user)}")
return dict(user)
else:
print(f"❌ User with ID {user_id} not found")
return None
except Exception as e:
print(f"❌ Error getting user: {e}")
return None
def update_user(self, user_id, **kwargs):
"""Update user information"""
if not kwargs:
print("❌ No fields to update")
return False
try:
# Build SET clause dynamically
set_clause = ", ".join([f"{key} = ?" for key in kwargs.keys()])
values = list(kwargs.values()) + [user_id]
# Add updated_at timestamp
set_clause += ", updated_at = CURRENT_TIMESTAMP"
query = f"UPDATE users SET {set_clause} WHERE id = ?"
self.cursor.execute(query, values)
if self.cursor.rowcount > 0:
self.connection.commit()
print(f"✅ User {user_id} updated successfully")
return True
else:
print(f"❌ No changes made to user {user_id}")
return False
except Exception as e:
print(f"❌ Error updating user: {e}")
return False
def delete_user(self, user_id):
"""Delete a user (cascade deletes related posts)"""
try:
self.cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
if self.cursor.rowcount > 0:
self.connection.commit()
print(f"✅ User {user_id} deleted successfully")
return True
else:
print(f"❌ User {user_id} not found")
return False
except Exception as e:
print(f"❌ Error deleting user: {e}")
return False
def list_users(self, active_only=True, limit=None):
"""List users with optional filters"""
try:
query = "SELECT * FROM users"
params = []
if active_only:
query += " WHERE is_active = 1"
query += " ORDER BY created_at DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
self.cursor.execute(query, params)
users = self.cursor.fetchall()
print(f"✅ Found {len(users)} users:")
for user in users:
user_dict = dict(user)
print(f" ID: {user_dict['id']}, Username: {user_dict['username']}, Email: {user_dict['email']}")
return [dict(user) for user in users]
except Exception as e:
print(f"❌ Error listing users: {e}")
return []
def insert_post(self, user_id, title, content=None, category=None, tags=None):
"""Insert a new post"""
try:
# Convert tags list to JSON string
tags_json = json.dumps(tags) if tags else None
self.cursor.execute('''
INSERT INTO posts (user_id, title, content, category, tags)
VALUES (?, ?, ?, ?, ?)
''', (user_id, title, content, category, tags_json))
post_id = self.cursor.lastrowid
self.connection.commit()
print(f"✅ Post '{title}' created with ID: {post_id}")
return post_id
except Exception as e:
print(f"❌ Error inserting post: {e}")
return None
def get_user_posts(self, user_id, published_only=False):
"""Get all posts by a user"""
try:
query = '''
SELECT p.*, u.username as author_username
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE p.user_id = ?
'''
params = [user_id]
if published_only:
query += " AND p.is_published = 1"
query += " ORDER BY p.created_at DESC"
self.cursor.execute(query, params)
posts = self.cursor.fetchall()
print(f"✅ Found {len(posts)} posts by user {user_id}:")
for post in posts:
post_dict = dict(post)
# Parse tags from JSON
if post_dict['tags']:
try:
post_dict['tags'] = json.loads(post_dict['tags'])
except:
post_dict['tags'] = []
print(f" ID: {post_dict['id']}, Title: {post_dict['title']}")
return [dict(post) for post in posts]
except Exception as e:
print(f"❌ Error getting user posts: {e}")
return []
def search_posts(self, keyword, category=None, limit=None):
"""Search posts by keyword and/or category"""
try:
query = '''
SELECT p.*, u.username as author_username
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE (p.title LIKE ? OR p.content LIKE ?)
'''
params = [f'%{keyword}%', f'%{keyword}%']
if category:
query += " AND p.category = ?"
params.append(category)
query += " ORDER BY p.created_at DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
self.cursor.execute(query, params)
posts = self.cursor.fetchall()
print(f"✅ Found {len(posts)} posts matching '{keyword}':")
for post in posts:
post_dict = dict(post)
print(f" ID: {post_dict['id']}, Title: {post_dict['title']}, Author: {post_dict['author_username']}")
return [dict(post) for post in posts]
except Exception as e:
print(f"❌ Error searching posts: {e}")
return []
def get_database_info(self):
"""Get database information and statistics"""
try:
info = {}
# Get database size
size_bytes = os.path.getsize(self.db_path)
info['size_bytes'] = size_bytes
info['size_mb'] = round(size_bytes / (1024 * 1024), 2)
# Get table info
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in self.cursor.fetchall()]
info['tables'] = tables
# Get record counts
info['record_counts'] = {}
for table in tables:
self.cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = self.cursor.fetchone()[0]
info['record_counts'][table] = count
print("📊 Database Information:")
print(f" Size: {info['size_mb']} MB")
print(f" Tables: {', '.join(info['tables'])}")
print(f" Record counts: {info['record_counts']}")
return info
except Exception as e:
print(f"❌ Error getting database info: {e}")
return None
def backup_database(self, backup_path=None):
"""Create a backup of the database"""
try:
if not backup_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"backup_{timestamp}.db"
# Close current connection
original_connection = self.connection
self.disconnect()
# Copy database file
import shutil
shutil.copy2(self.db_path, backup_path)
print(f"✅ Database backed up to: {backup_path}")
# Reconnect
self.connection = original_connection
self.cursor = self.connection.cursor()
return backup_path
except Exception as e:
print(f"❌ Error backing up database: {e}")
return None
# Example usage
def main():
# Initialize database manager
db = SQLiteBasicOperations('blog.db')
# Connect to database
if not db.connect():
return
try:
# Create tables
print("\n📝 Creating tables...")
db.create_users_table()
db.create_posts_table()
# Insert sample users
print("\n👤 Inserting sample users...")
user1_id = db.insert_user(
username='johndoe',
email='[email protected]',
password_hash='hashed_password_1',
first_name='John',
last_name='Doe',
date_of_birth='1990-01-15'
)
user2_id = db.insert_user(
username='janedoe',
email='[email protected]',
password_hash='hashed_password_2',
first_name='Jane',
last_name='Doe',
date_of_birth='1992-05-20'
)
# Insert sample posts
print("\n📝 Inserting sample posts...")
if user1_id:
db.insert_post(
user_id=user1_id,
title='My First Blog Post',
content='This is the content of my first blog post.',
category='Technology',
tags=['python', 'database', 'sqlite']
)
db.insert_post(
user_id=user1_id,
title='SQLite Best Practices',
content='Here are some best practices for using SQLite...',
category='Database',
tags=['sqlite', 'best-practices', 'tutorial']
)
if user2_id:
db.insert_post(
user_id=user2_id,
title='Getting Started with Python',
content='Python is a great programming language for beginners...',
category='Programming',
tags=['python', 'beginner', 'tutorial']
)
# List all users
print("\n👥 Listing all users:")
users = db.list_users()
# Get user details
if user1_id:
print("\n👤 Getting user details:")
user = db.get_user(user1_id)
# Get user posts
if user1_id:
print("\n📝 Getting user posts:")
posts = db.get_user_posts(user1_id)
# Search posts
print("\n🔍 Searching posts:")
search_results = db.search_posts('python')
# Get database info
print("\n📊 Database information:")
db_info = db.get_database_info()
# Create backup
print("\n💾 Creating backup:")
backup_path = db.backup_database()
# Update user
if user1_id:
print("\n✏️ Updating user:")
db.update_user(user1_id, first_name='Johnathan', last_name='Doe-Smith')
# Clean up
print("\n🧹 Cleaning up...")
if user1_id:
db.delete_user(user1_id)
if user2_id:
db.delete_user(user2_id)
finally:
# Disconnect from database
db.disconnect()
if __name__ == "__main__":
main()
💻 Requêtes Avancées SQLite sql
🟡 intermediate
⭐⭐⭐
Requêtes complexes, joins, agrégats, fonctions de fenêtre et optimisation
⏱️ 45 min
🏷️ sqlite, advanced sql, database, queries
Prerequisites:
SQLite 3.x, Intermediate SQL knowledge, Understanding of window functions
-- Advanced SQLite Queries
-- SQL - advanced_queries.sql
-- Sample data setup (run this first)
-- DROP TABLE IF EXISTS employees;
-- DROP TABLE IF EXISTS departments;
-- DROP TABLE IF EXISTS salaries;
-- DROP TABLE IF EXISTS projects;
-- DROP TABLE IF EXISTS project_assignments;
-- Departments table
CREATE TABLE departments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL UNIQUE,
manager_id INTEGER,
budget DECIMAL(12, 2),
location VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Employees table
CREATE TABLE employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
department_id INTEGER,
position VARCHAR(100),
salary DECIMAL(10, 2),
hire_date DATE NOT NULL,
birth_date DATE,
phone VARCHAR(20),
address TEXT,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (department_id) REFERENCES departments (id)
);
-- Salaries table (for tracking salary history)
CREATE TABLE salaries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id INTEGER NOT NULL,
salary DECIMAL(10, 2) NOT NULL,
from_date DATE NOT NULL,
to_date DATE,
FOREIGN KEY (employee_id) REFERENCES employees (id)
);
-- Projects table
CREATE TABLE projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
start_date DATE,
end_date DATE,
budget DECIMAL(12, 2),
status VARCHAR(20) DEFAULT 'planning',
department_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (department_id) REFERENCES departments (id)
);
-- Project assignments table
CREATE TABLE project_assignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id INTEGER NOT NULL,
project_id INTEGER NOT NULL,
role VARCHAR(50),
assignment_date DATE DEFAULT CURRENT_DATE,
hours_allocated INTEGER,
hours_worked INTEGER DEFAULT 0,
FOREIGN KEY (employee_id) REFERENCES employees (id),
FOREIGN KEY (project_id) REFERENCES projects (id),
UNIQUE(employee_id, project_id)
);
-- 1. Complex JOIN Queries
-- Get all employees with their department information
SELECT
e.id,
e.first_name,
e.last_name,
e.email,
e.position,
e.salary,
d.name as department_name,
d.location as department_location,
m.first_name || ' ' || m.last_name as manager_name
FROM employees e
LEFT JOIN departments d ON e.department_id = d.id
LEFT JOIN employees m ON d.manager_id = m.id
WHERE e.is_active = 1
ORDER BY d.name, e.last_name, e.first_name;
-- Get employees with their current salaries
SELECT
e.id,
e.first_name,
e.last_name,
e.position,
s.salary,
s.from_date as salary_effective_date
FROM employees e
JOIN salaries s ON e.id = s.employee_id
WHERE s.to_date IS NULL
ORDER BY s.salary DESC;
-- Get department budget utilization
SELECT
d.name as department_name,
d.budget as allocated_budget,
COUNT(e.id) as employee_count,
SUM(e.salary) as total_salaries,
(d.budget - SUM(e.salary)) as remaining_budget,
ROUND((SUM(e.salary) / d.budget) * 100, 2) as budget_utilization_percent
FROM departments d
LEFT JOIN employees e ON d.id = e.department_id AND e.is_active = 1
GROUP BY d.id, d.name, d.budget
ORDER BY budget_utilization_percent DESC;
-- 2. Window Functions
-- Employee salary ranking within departments
SELECT
first_name,
last_name,
department_id,
salary,
RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) as dept_salary_rank,
LAG(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) as prev_salary,
salary - LAG(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) as salary_diff
FROM employees e
WHERE e.is_active = 1;
-- Running totals for project work hours
SELECT
p.name as project_name,
pa.employee_id,
e.first_name || ' ' || e.last_name as employee_name,
pa.hours_worked,
SUM(pa.hours_worked) OVER (
PARTITION BY pa.project_id
ORDER BY pa.assignment_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_hours
FROM project_assignments pa
JOIN employees e ON pa.employee_id = e.id
JOIN projects p ON pa.project_id = p.id
WHERE p.status = 'in_progress'
ORDER BY p.name, cumulative_hours;
-- Moving averages for salary changes
SELECT
e.first_name,
e.last_name,
s.salary,
s.from_date,
AVG(s.salary) OVER (
PARTITION BY e.id
ORDER BY s.from_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_salary_3_months,
s.salary - AVG(s.salary) OVER (
PARTITION BY e.id
ORDER BY s.from_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as salary_vs_avg_diff
FROM salaries s
JOIN employees e ON s.employee_id = e.id;
-- 3. Subqueries and CTEs
-- Find departments with above average salaries
WITH dept_avg_salaries AS (
SELECT
d.id,
d.name,
AVG(e.salary) as avg_salary
FROM departments d
JOIN employees e ON d.id = e.department_id
WHERE e.is_active = 1
GROUP BY d.id, d.name
),
overall_avg_salary AS (
SELECT AVG(salary) as overall_avg
FROM employees
WHERE is_active = 1
)
SELECT
das.name,
das.avg_salary,
oas.overall_avg,
ROUND((das.avg_salary / oas.overall_avg - 1) * 100, 2) as percent_above_avg
FROM dept_avg_salaries das
CROSS JOIN overall_avg_salary oas
WHERE das.avg_salary > oas.overall_avg
ORDER BY percent_above_avg DESC;
-- Find employees who have completed all assigned projects
WITH employee_projects AS (
SELECT
e.id as employee_id,
e.first_name || ' ' || e.last_name as employee_name,
p.id as project_id,
p.name as project_name,
p.end_date,
pa.hours_allocated,
pa.hours_worked
FROM employees e
JOIN project_assignments pa ON e.id = pa.employee_id
JOIN projects p ON pa.project_id = p.id
),
completed_projects AS (
SELECT
employee_id,
employee_name,
COUNT(*) as total_projects,
SUM(CASE WHEN hours_worked >= hours_allocated THEN 1 ELSE 0 END) as completed_projects
FROM employee_projects
GROUP BY employee_id, employee_name
)
SELECT
employee_name,
total_projects,
completed_projects,
ROUND((completed_projects * 100.0 / total_projects), 2) as completion_rate
FROM completed_projects
WHERE completed_projects = total_projects
ORDER BY completion_rate DESC;
-- 4. Aggregation and Grouping
-- Department performance metrics
SELECT
d.name as department,
COUNT(e.id) as total_employees,
COUNT(CASE WHEN e.hire_date >= date('now', '-1 year') THEN 1 END) as new_hires,
COUNT(CASE WHEN e.is_active = 0 THEN 1 END) as inactive_employees,
AVG(e.salary) as avg_salary,
MAX(e.salary) as max_salary,
MIN(e.salary) as min_salary,
ROUND(AVG(JULIANDAY('now') - JULIANDAY(e.hire_date))) as avg_tenure_days
FROM departments d
LEFT JOIN employees e ON d.id = e.department_id
GROUP BY d.id, d.name
ORDER BY avg_salary DESC;
-- Project timeline analysis
SELECT
p.name as project_name,
p.status,
p.start_date,
p.end_date,
julianday(p.end_date) - julianday(p.start_date) as planned_duration_days,
COUNT(pa.employee_id) as team_size,
SUM(pa.hours_allocated) as total_hours_allocated,
SUM(pa.hours_worked) as total_hours_worked,
CASE
WHEN SUM(pa.hours_worked) >= SUM(pa.hours_allocated) THEN 'On Track'
WHEN julianday('now') > julianday(p.end_date) THEN 'Overdue'
ELSE 'In Progress'
END as status_indicator
FROM projects p
LEFT JOIN project_assignments pa ON p.id = pa.project_id
GROUP BY p.id, p.name, p.status, p.start_date, p.end_date
ORDER BY status_indicator, p.end_date;
-- 5. Recursive Queries (Hierarchical Data)
-- Department hierarchy (assuming self-referencing structure)
WITH RECURSIVE dept_hierarchy AS (
-- Base case: top-level departments (no manager)
SELECT
id,
name,
manager_id,
0 as level,
name as hierarchy_path
FROM departments
WHERE manager_id IS NULL
UNION ALL
-- Recursive case: departments with managers
SELECT
d.id,
d.name,
d.manager_id,
dh.level + 1,
dh.hierarchy_path || ' > ' || d.name
FROM departments d
JOIN dept_hierarchy dh ON d.manager_id = dh.id
)
SELECT
level,
name,
hierarchy_path,
manager_id
FROM dept_hierarchy
ORDER BY level, hierarchy_path;
-- 6. Performance Optimization Queries
-- Create indexes for better performance
CREATE INDEX IF NOT EXISTS idx_employees_department_id ON employees(department_id);
CREATE INDEX IF NOT EXISTS idx_employees_hire_date ON employees(hire_date);
CREATE INDEX IF NOT EXISTS idx_employees_active ON employees(is_active);
CREATE INDEX IF NOT EXISTS idx_salaries_employee_id ON salaries(employee_id);
CREATE INDEX IF NOT EXISTS idx_salaries_from_date ON salaries(from_date);
CREATE INDEX IF NOT EXISTS idx_project_assignments_project_id ON project_assignments(project_id);
-- Analyze query performance
EXPLAIN QUERY PLAN
SELECT e.first_name, e.last_name, d.name as department
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.salary > 50000
ORDER BY e.salary DESC;
-- Get slow queries
SELECT
sql,
count(*) as execution_count,
avg(time) as avg_time,
max(time) as max_time
FROM sqlite_master
WHERE type = 'table'
AND sql IS NOT NULL
GROUP BY sql
ORDER BY avg_time DESC
LIMIT 10;
-- 7. Date and Time Functions
-- Employee age analysis
SELECT
first_name,
last_name,
birth_date,
ROUND((julianday('now') - julianday(birth_date)) / 365.25, 1) as age,
ROUND((julianday('now') - julianday(hire_date)) / 365.25, 1) as years_of_service,
ROUND(
(julianday('now') - julianday(hire_date)) /
(julianday('now') - julianday(birth_date)) * 100,
1
) as service_life_percentage
FROM employees
WHERE is_active = 1
ORDER BY years_of_service DESC;
-- Monthly hiring trends
SELECT
strftime('%Y-%m', hire_date) as hire_month,
COUNT(*) as hires_count,
AVG(salary) as avg_salary_hired
FROM employees
GROUP BY strftime('%Y-%m', hire_date)
ORDER BY hire_month DESC
LIMIT 24;
-- 8. Conditional Logic and Case Statements
-- Employee performance categorization
SELECT
first_name,
last_name,
salary,
CASE
WHEN salary < 30000 THEN 'Entry Level'
WHEN salary < 60000 THEN 'Junior'
WHEN salary < 100000 THEN 'Mid Level'
WHEN salary < 150000 THEN 'Senior'
ELSE 'Executive'
END as salary_level,
CASE
WHEN julianday('now') - julianday(hire_date) < 365 THEN 'New Hire'
WHEN julianday('now') - julianday(hire_date) < 1825 THEN 'Experienced'
ELSE 'Veteran'
END as experience_level,
CASE
WHEN julianday('now') - julianday(hire_date) < 365 AND salary < 50000 THEN 'Underpaid New Hire'
WHEN julianday('now') - julianday(hire_date) >= 1825 AND salary < 60000 THEN 'Underpaid Veteran'
WHEN salary > 100000 AND julianday('now') - julianday(hire_date) < 730 THEN 'Rapid Advancement'
ELSE 'Appropriate Compensation'
END as compensation_analysis
FROM employees
WHERE is_active = 1
ORDER BY salary DESC;
-- 9. String Functions and Pattern Matching
-- Search for employees with specific patterns
SELECT
first_name,
last_name,
email,
phone,
CASE
WHEN email LIKE '%@company.com' THEN 'Internal Email'
WHEN email LIKE '%@gmail.com' THEN 'Gmail User'
WHEN email LIKE '%@yahoo.com' THEN 'Yahoo User'
ELSE 'Other Email Provider'
END as email_category,
CASE
WHEN phone LIKE '%555-%' THEN 'Local Number'
WHEN phone LIKE '1-%' THEN 'US Number'
WHEN phone LIKE '+44-%' THEN 'UK Number'
ELSE 'International Number'
END as phone_category,
LENGTH(first_name) + LENGTH(last_name) as name_length
FROM employees
WHERE (
first_name LIKE '%a%' AND last_name LIKE '%s%' -- Names containing 'a' and 's'
OR email LIKE '%test%' -- Email containing 'test'
OR phone IS NOT NULL
)
ORDER BY name_length DESC;
-- 10. JSON Operations (SQLite 3.38+)
-- Store additional employee metadata as JSON
ALTER TABLE employees ADD COLUMN metadata TEXT;
-- Update metadata with JSON data
UPDATE employees SET metadata = json_object(
'skills', json_array('Python', 'SQL', 'JavaScript'),
'certifications', json_array('AWS Certified', 'Google Cloud'),
'preferences', json_object('remote_work', true, 'team_size', 'small')
) WHERE id = 1;
-- Query JSON data
SELECT
first_name,
last_name,
json_extract(metadata, '$.skills') as skills,
json_array_length(metadata, '$.certifications') as certification_count,
json_extract(metadata, '$.preferences.remote_work') as remote_work_preference
FROM employees
WHERE json_extract(metadata, '$.certifications') IS NOT NULL;
-- 11. Full-Text Search
-- Create FTS virtual table for employee search
CREATE VIRTUAL TABLE employees_fts USING fts5(
first_name,
last_name,
email,
position,
content='employees',
content_rowid='id'
);
-- Enable triggers for FTS
CREATE TRIGGER employees_fts_insert AFTER INSERT ON employees BEGIN
INSERT INTO employees_fts(rowid, first_name, last_name, email, position)
VALUES (new.id, new.first_name, new.last_name, new.email, new.position);
END;
CREATE TRIGGER employees_fts_delete AFTER DELETE ON employees BEGIN
INSERT INTO employees_fts(employees_fts, rowid, first_name, last_name, email, position)
VALUES ('delete', old.id, old.first_name, old.last_name, old.email, old.position);
END;
CREATE TRIGGER employees_fts_update AFTER UPDATE ON employees BEGIN
INSERT INTO employees_fts(employees_fts, rowid, first_name, last_name, email, position)
VALUES ('delete', old.id, old.first_name, old.last_name, old.email, old.position);
INSERT INTO employees_fts(rowid, first_name, last_name, email, position)
VALUES (new.id, new.first_name, new.last_name, new.email, new.position);
END;
-- Full-text search
SELECT
e.first_name,
e.last_name,
e.email,
e.position,
rank
FROM employees_fts efts
JOIN employees e ON efts.rowid = e.id
WHERE employees_fts MATCH 'python OR "software engineer"'
ORDER BY rank DESC
LIMIT 10;
-- 12. Data Analysis and Reporting
-- Generate quarterly reports
SELECT
strftime('%Y', p.start_date) as year,
((strftime('%m', p.start_date) - 1) / 3) + 1 as quarter,
COUNT(*) as projects_started,
SUM(CASE WHEN p.status = 'completed' THEN 1 ELSE 0 END) as projects_completed,
SUM(p.budget) as total_budget,
AVG(p.budget) as avg_budget,
SUM(p.budget) * 0.1 as estimated_profit
FROM projects p
WHERE p.start_date >= date('now', '-2 years')
GROUP BY strftime('%Y', p.start_date),
((strftime('%m', p.start_date) - 1) / 3) + 1
ORDER BY year DESC, quarter DESC;
💻 SQLite avec SQLAlchemy ORM python
🟡 intermediate
⭐⭐⭐⭐
Mapping Objet-Relationnel avec SQLAlchemy pour les opérations de base de données Python
⏱️ 40 min
🏷️ sqlite, sqlalchemy, orm, python, database
Prerequisites:
Python 3.6+, Basic SQL knowledge, Object-oriented programming
# SQLite with SQLAlchemy ORM
# Python - sqlalchemy_orm.py
from sqlalchemy import (
create_engine, Column, Integer, String, Text, Boolean,
DateTime, Float, ForeignKey, Table, MetaData,
event, inspect
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker, scoped_session
from sqlalchemy.sql import func
from sqlalchemy.schema import FetchedValue
from datetime import datetime, date
import json
# Create base class for declarative models
Base = declarative_base()
# Association table for many-to-many relationship (posts and tags)
post_tags = Table(
'post_tags',
Base.metadata,
Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)
class User(Base):
"""User model"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(100), unique=True, nullable=False, index=True)
password_hash = Column(String(255), nullable=False)
first_name = Column(String(50))
last_name = Column(String(50))
bio = Column(Text)
avatar_url = Column(String(255))
birth_date = Column(Date)
location = Column(String(100))
website = Column(String(255))
is_active = Column(Boolean, default=True)
is_verified = Column(Boolean, default=False)
email_verified_at = Column(DateTime)
last_login_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan")
followers = relationship("Follower", foreign_keys="Follower.following_id", back_populates="following")
following = relationship("Follower", foreign_keys="Follower.follower_id", back_populates="follower")
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
class Post(Base):
"""Post model"""
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False, index=True)
slug = Column(String(200), unique=True, nullable=False, index=True)
content = Column(Text, nullable=False)
excerpt = Column(Text)
status = Column(String(20), default='draft', index=True) # draft, published, archived
comment_status = Column(String(20), default='open') # open, closed
password = Column(String(255)) # For password-protected posts
featured_image = Column(String(255))
view_count = Column(Integer, default=0)
like_count = Column(Integer, default=0)
comment_count = Column(Integer, default=0)
published_at = Column(DateTime, index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Foreign keys
author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
category_id = Column(Integer, ForeignKey('categories.id'))
# Relationships
author = relationship("User", back_populates="posts")
category = relationship("Category", back_populates="posts")
comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
tags = relationship("Tag", secondary=post_tags, back_populates="posts")
def __repr__(self):
return f"<Post(id={self.id}, title='{self.title}', status='{self.status}')>"
class Category(Base):
"""Category model"""
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False, index=True)
slug = Column(String(50), unique=True, nullable=False, index=True)
description = Column(Text)
parent_id = Column(Integer, ForeignKey('categories.id'))
color = Column(String(7)) # Hex color code
post_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Self-referential relationship for hierarchical categories
parent = relationship("Category", remote_side=[id], back_populates="children")
children = relationship("Category", back_populates="parent")
# Relationships
posts = relationship("Post", back_populates="category")
def __repr__(self):
return f"<Category(id={self.id}, name='{self.name}')>"
class Tag(Base):
"""Tag model"""
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False, index=True)
slug = Column(String(50), unique=True, nullable=False, index=True)
description = Column(Text)
color = Column(String(7)) # Hex color code
post_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
posts = relationship("Post", secondary=post_tags, back_populates="tags")
def __repr__(self):
return f"<Tag(id={self.id}, name='{self.name}')>"
class Comment(Base):
"""Comment model"""
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
content = Column(Text, nullable=False)
author_name = Column(String(100)) # For guest comments
author_email = Column(String(100)) # For guest comments
author_ip = Column(String(45)) # For guest comments
is_approved = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Foreign keys
post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
author_id = Column(Integer, ForeignKey('users.id')) # null for guest comments
parent_id = Column(Integer, ForeignKey('comments.id')) # For nested comments
# Relationships
post = relationship("Post", back_populates="comments")
author = relationship("User", back_populates="comments")
parent = relationship("Comment", back_populates="children")
children = relationship("Comment", back_populates="parent")
def __repr__(self):
return f"<Comment(id={self.id}, post_id={self.post_id}, author_id={self.author_id})>"
class Follower(Base):
"""Follower model for user relationships"""
__tablename__ = 'followers'
id = Column(Integer, primary_key=True)
follower_id = Column(Integer, ForeignKey('users.id'), nullable=False)
following_id = Column(Integer, ForeignKey('users.id'), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
follower = relationship("User", foreign_keys=[follower_id], back_populates="following")
following = relationship("User", foreign_keys=[following_id], back_populates="followers")
# Ensure no duplicate follows
__table_args__ = (Index('idx_follower_following', 'follower_id', 'following_id', unique=True),)
class DatabaseManager:
"""Database manager class for SQLAlchemy operations"""
def __init__(self, db_url='sqlite:///blog.db'):
self.db_url = db_url
self.engine = None
self.Session = None
self.session = None
def connect(self):
"""Create database engine and session"""
try:
self.engine = create_engine(
self.db_url,
echo=False, # Set to True for SQL logging
pool_pre_ping=True
)
# Create session factory
self.Session = sessionmaker(bind=self.engine)
self.session = scoped_session(self.Session)
print(f"✅ Connected to database: {self.db_url}")
return True
except Exception as e:
print(f"❌ Error connecting to database: {e}")
return False
def create_tables(self):
"""Create all tables"""
try:
Base.metadata.create_all(self.engine)
print("✅ Database tables created successfully")
return True
except Exception as e:
print(f"❌ Error creating tables: {e}")
return False
def drop_tables(self):
"""Drop all tables"""
try:
Base.metadata.drop_all(self.engine)
print("✅ Database tables dropped successfully")
return True
except Exception as e:
print(f"❌ Error dropping tables: {e}")
return False
def get_session(self):
"""Get database session"""
if not self.session:
self.connect()
return self.session()
def commit_and_close(self):
"""Commit current session and close"""
if self.session:
try:
self.session.commit()
self.session.close()
print("✅ Session committed and closed")
except Exception as e:
print(f"❌ Error committing session: {e}")
self.session.rollback()
self.session.close()
self.session = None
class UserService:
"""Service class for user operations"""
def __init__(self, db_manager):
self.db = db_manager
self.session = db_manager.get_session()
def create_user(self, username, email, password_hash, **kwargs):
"""Create a new user"""
try:
user = User(
username=username,
email=email,
password_hash=password_hash,
**kwargs
)
self.session.add(user)
self.session.commit()
self.session.refresh(user)
print(f"✅ User '{username}' created with ID: {user.id}")
return user
except Exception as e:
self.session.rollback()
print(f"❌ Error creating user: {e}")
return None
def get_user_by_id(self, user_id):
"""Get user by ID"""
try:
user = self.session.query(User).filter(User.id == user_id).first()
if user:
print(f"✅ Found user: {user.username}")
else:
print(f"❌ User with ID {user_id} not found")
return user
except Exception as e:
print(f"❌ Error getting user: {e}")
return None
def get_user_by_username(self, username):
"""Get user by username"""
try:
user = self.session.query(User).filter(User.username == username).first()
return user
except Exception as e:
print(f"❌ Error getting user by username: {e}")
return None
def get_user_by_email(self, email):
"""Get user by email"""
try:
user = self.session.query(User).filter(User.email == email).first()
return user
except Exception as e:
print(f"❌ Error getting user by email: {e}")
return None
def update_user(self, user_id, **kwargs):
"""Update user information"""
try:
user = self.get_user_by_id(user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
user.updated_at = datetime.utcnow()
self.session.commit()
print(f"✅ User {user.username} updated successfully")
return user
except Exception as e:
self.session.rollback()
print(f"❌ Error updating user: {e}")
return None
def delete_user(self, user_id):
"""Delete a user"""
try:
user = self.get_user_by_id(user_id)
if not user:
return False
self.session.delete(user)
self.session.commit()
print(f"✅ User {user.username} deleted successfully")
return True
except Exception as e:
self.session.rollback()
print(f"❌ Error deleting user: {e}")
return False
def get_users(self, active_only=True, limit=None, offset=None):
"""Get list of users"""
try:
query = self.session.query(User)
if active_only:
query = query.filter(User.is_active == True)
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
users = query.all()
print(f"✅ Found {len(users)} users")
return users
except Exception as e:
print(f"❌ Error getting users: {e}")
return []
def search_users(self, query, active_only=True):
"""Search users by name or email"""
try:
search_pattern = f"%{query}%"
db_query = self.session.query(User).filter(
(User.first_name.ilike(search_pattern) |
User.last_name.ilike(search_pattern) |
User.email.ilike(search_pattern))
)
if active_only:
db_query = db_query.filter(User.is_active == True)
users = db_query.all()
print(f"✅ Found {len(users)} users matching '{query}'")
return users
except Exception as e:
print(f"❌ Error searching users: {e}")
return []
class PostService:
"""Service class for post operations"""
def __init__(self, db_manager):
self.db = db_manager
self.session = db_manager.get_session()
def create_post(self, title, content, author_id, **kwargs):
"""Create a new post"""
try:
post = Post(
title=title,
content=content,
author_id=author_id,
**kwargs
)
self.session.add(post)
self.session.commit()
self.session.refresh(post)
print(f"✅ Post '{title}' created with ID: {post.id}")
return post
except Exception as e:
self.session.rollback()
print(f"❌ Error creating post: {e}")
return None
def get_post_by_id(self, post_id):
"""Get post by ID"""
try:
post = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags),
joinedload(Post.comments)
).filter(Post.id == post_id).first()
if post:
print(f"✅ Found post: {post.title}")
else:
print(f"❌ Post with ID {post_id} not found")
return post
except Exception as e:
print(f"❌ Error getting post: {e}")
return None
def get_posts(self, status=None, category_id=None, author_id=None, limit=None, offset=None):
"""Get posts with filters"""
try:
query = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags),
joinedload(Post.comments)
)
if status:
query = query.filter(Post.status == status)
if category_id:
query = query.filter(Post.category_id == category_id)
if author_id:
query = query.filter(Post.author_id == author_id)
query = query.order_by(Post.created_at.desc())
if limit:
query = query.limit(limit)
if offset:
query = query.offset(offset)
posts = query.all()
print(f"✅ Found {len(posts)} posts")
return posts
except Exception as e:
print(f"❌ Error getting posts: {e}")
return []
def update_post(self, post_id, **kwargs):
"""Update post"""
try:
post = self.get_post_by_id(post_id)
if not post:
return None
for key, value in kwargs.items():
if hasattr(post, key):
setattr(post, key, value)
post.updated_at = datetime.utcnow()
self.session.commit()
print(f"✅ Post '{post.title}' updated successfully")
return post
except Exception as e:
self.session.rollback()
print(f"❌ Error updating post: {e}")
return None
def delete_post(self, post_id):
"""Delete a post"""
try:
post = self.get_post_by_id(post_id)
if not post:
return False
self.session.delete(post)
self.session.commit()
print(f"✅ Post '{post.title}' deleted successfully")
return True
except Exception as e:
self.session.rollback()
print(f"❌ Error deleting post: {e}")
return False
def get_popular_posts(self, limit=10):
"""Get popular posts based on view count and comment count"""
try:
posts = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags)
).filter(
Post.status == 'published'
).order_by(
(Post.view_count + Post.like_count + Post.comment_count).desc()
).limit(limit)
print(f"✅ Found {len(posts)} popular posts")
return posts
except Exception as e:
print(f"❌ Error getting popular posts: {e}")
return []
def search_posts(self, query, limit=None):
"""Search posts by title, content, or tags"""
try:
search_pattern = f"%{query}%"
db_query = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags)
).filter(
(Post.title.ilike(search_pattern) |
Post.content.ilike(search_pattern))
)
# Also search in tags
tag_query = self.session.query(Post).options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags)
).join(Post.tags).filter(
Tag.name.ilike(search_pattern)
)
# Combine results
posts = db_query.union(tag_query).distinct().order_by(Post.created_at.desc())
if limit:
posts = posts.limit(limit)
print(f"✅ Found {len(posts.all())} posts matching '{query}'")
return posts.all()
except Exception as e:
print(f"❌ Error searching posts: {e}")
return []
class AnalyticsService:
"""Service class for database analytics"""
def __init__(self, db_manager):
self.db = db_manager
self.session = db_manager.get_session()
def get_user_statistics(self):
"""Get user statistics"""
try:
total_users = self.session.query(User).count()
active_users = self.session.query(User).filter(User.is_active == True).count()
new_users_month = self.session.query(User).filter(
User.created_at >= datetime.utcnow().replace(day=1)
).count()
# User registration by month for the last 6 months
user_registrations = self.session.query(
func.strftime('%Y-%m', User.created_at).label('month'),
func.count(User.id).label('count')
).filter(
User.created_at >= datetime.utcnow().replace(month=datetime.utcnow().month - 5, day=1)
).group_by(func.strftime('%Y-%m', User.created_at)).all()
stats = {
'total_users': total_users,
'active_users': active_users,
'new_users_month': new_users_month,
'user_registrations': user_registrations
}
print("📊 User Statistics:")
print(f" Total Users: {total_users}")
print(f" Active Users: {active_users}")
print(f" New Users (This Month): {new_users_month}")
return stats
except Exception as e:
print(f"❌ Error getting user statistics: {e}")
return None
def get_post_statistics(self):
"""Get post statistics"""
try:
total_posts = self.session.query(Post).count()
published_posts = self.session.query(Post).filter(Post.status == 'published').count()
draft_posts = self.session.query(Post).filter(Post.status == 'draft').count()
# Posts by month for the last 6 months
posts_by_month = self.session.query(
func.strftime('%Y-%m', Post.created_at).label('month'),
func.count(Post.id).label('count'),
func.sum(Post.view_count).label('total_views')
).filter(
Post.created_at >= datetime.utcnow().replace(month=datetime.utcnow().month - 5, day=1)
).group_by(func.strftime('%Y-%m', Post.created_at)).all()
# Category distribution
category_distribution = self.session.query(
Category.name,
func.count(Post.id).label('post_count')
).join(Post).group_by(Category.name).all()
# Top posts
top_posts = self.session.query(
Post.title,
Post.view_count,
Post.like_count,
Post.comment_count
).filter(Post.status == 'published').order_by(
Post.view_count.desc()
).limit(10).all()
stats = {
'total_posts': total_posts,
'published_posts': published_posts,
'draft_posts': draft_posts,
'posts_by_month': posts_by_month,
'category_distribution': category_distribution,
'top_posts': top_posts
}
print("📊 Post Statistics:")
print(f" Total Posts: {total_posts}")
print(f" Published: {published_posts}")
print(f" Drafts: {draft_posts}")
return stats
except Exception as e:
print(f"❌ Error getting post statistics: {e}")
return None
def get_author_analytics(self, author_id):
"""Get analytics for a specific author"""
try:
author = self.session.query(User).filter(User.id == author_id).first()
if not author:
return None
total_posts = self.session.query(Post).filter(Post.author_id == author_id).count()
published_posts = self.session.query(Post).filter(
Post.author_id == author_id,
Post.status == 'published'
).count()
total_views = self.session.query(func.sum(Post.view_count)).filter(
Post.author_id == author_id
).scalar() or 0
total_comments = self.session.query(func.sum(Post.comment_count)).filter(
Post.author_id == author_id
).scalar() or 0
# Follower count
follower_count = self.session.query(Follower).filter(
Follower.following_id == author_id
).count()
following_count = self.session.query(Follower).filter(
Follower.follower_id == author_id
).count()
analytics = {
'author': author,
'total_posts': total_posts,
'published_posts': published_posts,
'total_views': total_views,
'total_comments': total_comments,
'follower_count': follower_count,
'following_count': following_count,
'engagement_rate': (total_views + total_comments * 2) / total_posts if total_posts > 0 else 0
}
print(f"📊 Analytics for {author.username}:")
print(f" Posts: {total_posts} ({published_posts} published)")
print(f" Total Views: {total_views}")
print(f" Total Comments: {total_comments}")
print(f" Followers: {follower_count}")
print(f" Following: {following_count}")
print(f" Engagement Rate: {analytics['engagement_rate']:.2f}")
return analytics
except Exception as e:
print(f"❌ Error getting author analytics: {e}")
return None
# Example usage
def main():
# Initialize database
db_manager = DatabaseManager()
# Connect and create tables
if db_manager.connect():
db_manager.create_tables()
# Create service instances
user_service = UserService(db_manager)
post_service = PostService(db_manager)
analytics_service = AnalyticsService(db_manager)
try:
# Create sample users
print("\n👤 Creating sample users...")
user1 = user_service.create_user(
username='johndoe',
email='[email protected]',
password_hash='hashed_password_1',
first_name='John',
last_name='Doe',
bio='Software developer and tech enthusiast.',
location='San Francisco, CA'
)
user2 = user_service.create_user(
username='janedoe',
email='[email protected]',
password_hash='hashed_password_2',
first_name='Jane',
last_name='Doe',
bio='Data scientist and AI researcher.',
location='New York, NY'
)
# Create sample posts
print("\n📝 Creating sample posts...")
if user1 and user2:
post1 = post_service.create_post(
title='Getting Started with SQLite',
content='SQLite is a lightweight, serverless database engine...',
author_id=user1.id,
status='published',
view_count=150,
like_count=25,
comment_count=8,
published_at=datetime(2024, 1, 15)
)
post2 = post_service.create_post(
title='Python Best Practices',
content='Here are some best practices for Python development...',
author_id=user1.id,
status='published',
view_count=300,
like_count=45,
comment_count=12,
published_at=datetime(2024, 2, 1)
)
post3 = post_service.create_post(
title='Machine Learning Fundamentals',
content='Machine learning is a subset of artificial intelligence...',
author_id=user2.id,
status='published',
view_count=500,
like_count=75,
comment_count=20,
published_at=datetime(2024, 1, 20)
)
# Get analytics
print("\n📊 Generating analytics...")
user_stats = analytics_service.get_user_statistics()
post_stats = analytics_service.get_post_statistics()
# Get author analytics
if user1:
author_analytics = analytics_service.get_author_analytics(user1.id)
# Search examples
print("\n🔍 Search examples...")
search_users = user_service.search_users('john')
search_posts = post_service.search_posts('python')
# Get popular posts
print("\n⭐ Popular posts:")
popular_posts = post_service.get_popular_posts(5)
except Exception as e:
print(f"❌ Error in main execution: {e}")
finally:
# Close database connection
db_manager.commit_and_close()
if __name__ == "__main__":
main()
💻 Scripts de Migration de Base de Données python
🔴 complex
⭐⭐⭐⭐
Système de migration automatisée et versionnement des schémas
⏱️ 50 min
🏷️ sqlite, migrations, versioning, database
Prerequisites:
Python 3.6+, SQLite knowledge, Database concepts
# SQLite Migration System
# Python - migration_system.py
import os
import sqlite3
import json
import hashlib
from datetime import datetime
from typing import List, Dict, Any, Optional
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Migration:
"""Represents a single migration"""
def __init__(self, version: str, description: str, up_sql: str, down_sql: str):
self.version = version
self.description = description
self.up_sql = up_sql
self.down_sql = down_sql
self.checksum = self._calculate_checksum(up_sql + down_sql)
def _calculate_checksum(self, content: str) -> str:
"""Calculate checksum for migration content"""
return hashlib.sha256(content.encode()).hexdigest()
def validate(self):
"""Validate migration integrity"""
current_checksum = self._calculate_checksum(self.up_sql + self.down_sql)
return current_checksum == self.checksum
class MigrationSystem:
"""Database migration system"""
def __init__(self, db_path: str = 'database.db', migrations_dir: str = 'migrations'):
self.db_path = db_path
self.migrations_dir = migrations_dir
self.connection = None
self.cursor = None
# Ensure migrations directory exists
os.makedirs(migrations_dir, exist_ok=True)
# Create migration lock table
self.migrations_table = '''
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(50) PRIMARY KEY,
description TEXT,
checksum VARCHAR(64),
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
rollback_sql TEXT
);
'''
def connect(self):
"""Connect to database"""
try:
self.connection = sqlite3.connect(self.db_path)
self.cursor = self.connection.cursor()
# Enable foreign keys
self.cursor.execute("PRAGMA foreign_keys = ON")
# Row factory for dictionaries
self.connection.row_factory = sqlite3.Row
# Create migration lock table
self.cursor.executescript(self.migrations_table)
self.connection.commit()
logger.info(f"Connected to database: {self.db_path}")
return True
except Exception as e:
logger.error(f"Error connecting to database: {e}")
return False
def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
logger.info("Disconnected from database")
def lock_database(self):
"""Acquire exclusive database lock for migration"""
try:
# SQLite's BEGIN IMMEDIATE starts a transaction and immediately acquires a write lock
self.cursor.execute("BEGIN IMMEDIATE")
logger.debug("Database locked for migration")
return True
except Exception as e:
logger.error(f"Error locking database: {e}")
return False
def unlock_database(self):
"""Release database lock"""
try:
self.connection.commit()
logger.debug("Database unlocked")
return True
except Exception as e:
logger.error(f"Error unlocking database: {e}")
self.connection.rollback()
return False
def create_migration(self, version: str, description: str, up_sql: str, down_sql: str) -> Migration:
"""Create a new migration file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{version}.py"
filepath = os.path.join(self.migrations_dir, filename)
migration_content = f'''"""
# Migration: {version}
# Description: {description}
# Created: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
UP_SQL = """
{up_sql.strip()}
"""
DOWN_SQL = """
{down_sql.strip()}
"""
# Import migration base
from migration_system import Migration
# Create migration instance
migration = Migration(
version="{version}",
description="{description}",
up_sql=UP_SQL,
down_sql=DOWN_SQL
)
"""
with open(filepath, 'w') as f:
f.write(migration_content)
logger.info(f"Created migration file: {filename}")
return Migration(version, description, up_sql, down_sql)
def load_migrations(self) -> List[Migration]:
"""Load all migration files"""
migrations = []
if not os.path.exists(self.migrations_dir):
return migrations
for filename in sorted(os.listdir(self.migrations_dir)):
if filename.endswith('.py'):
filepath = os.path.join(self.migrations_dir, filename)
try:
# Load and execute migration file
namespace = {}
with open(filepath, 'r') as f:
exec(f.read(), namespace)
if 'migration' in namespace:
migration = namespace['migration']
migrations.append(migration)
logger.debug(f"Loaded migration: {migration.version}")
except Exception as e:
logger.error(f"Error loading migration {filename}: {e}")
return migrations
def get_applied_migrations(self) -> List[Dict[str, Any]]:
"""Get list of applied migrations"""
try:
self.cursor.execute("SELECT * FROM schema_migrations ORDER BY applied_at")
return [dict(row) for row in self.cursor.fetchall()]
except Exception as e:
logger.error(f"Error getting applied migrations: {e}")
return []
def get_pending_migrations(self) -> List[Migration]:
"""Get list of pending migrations"""
all_migrations = self.load_migrations()
applied_migrations = {m['version'] for m in self.get_applied_migrations()}
pending = []
for migration in all_migrations:
if migration.version not in applied_migrations:
pending.append(migration)
else:
# Validate applied migration checksum
applied = next(m for m in self.get_applied_migrations() if m['version'] == migration.version)
if not migration.validate() or migration.checksum != applied['checksum']:
logger.warning(f"Migration {migration.version} validation failed, marking as pending")
pending.append(migration)
return pending
def apply_migration(self, migration: Migration) -> bool:
"""Apply a single migration"""
logger.info(f"Applying migration: {migration.version} - {migration.description}")
if not self.lock_database():
return False
try:
# Execute migration SQL
if migration.up_sql.strip():
self.cursor.executescript(migration.up_sql)
# Record migration in schema_migrations table
self.cursor.execute("""
INSERT INTO schema_migrations (version, description, checksum, rollback_sql)
VALUES (?, ?, ?, ?)
""", (
migration.version,
migration.description,
migration.checksum,
migration.down_sql
))
self.connection.commit()
logger.info(f"Successfully applied migration: {migration.version}")
return True
except Exception as e:
logger.error(f"Error applying migration {migration.version}: {e}")
self.connection.rollback()
return False
finally:
self.unlock_database()
def rollback_migration(self, migration: Migration) -> bool:
"""Rollback a migration"""
logger.info(f"Rolling back migration: {migration.version} - {migration.description}")
if not self.lock_database():
return False
try:
# Execute rollback SQL
if migration.down_sql.strip():
self.cursor.executescript(migration.down_sql)
# Remove migration from schema_migrations table
self.cursor.execute(
"DELETE FROM schema_migrations WHERE version = ?",
(migration.version,)
)
self.connection.commit()
logger.info(f"Successfully rolled back migration: {migration.version}")
return True
except Exception as e:
logger.error(f"Error rolling back migration {migration.version}: {e}")
self.connection.rollback()
return False
finally:
self.unlock_database()
def migrate_up(self, target_version: Optional[str] = None) -> bool:
"""Migrate up to target version"""
logger.info("Starting migration up...")
pending_migrations = self.get_pending_migrations()
if target_version:
pending_migrations = [m for m in pending_migrations if m.version <= target_version]
if not pending_migrations:
logger.info("No pending migrations to apply")
return True
success = True
for migration in pending_migrations:
if not self.apply_migration(migration):
success = False
break
if success:
logger.info("Migration up completed successfully")
else:
logger.error("Migration up failed")
return success
def migrate_down(self, target_version: str) -> bool:
"""Migrate down to target version"""
logger.info(f"Starting migration down to version: {target_version}")
applied_migrations = self.get_applied_migrations()
# Get migrations to rollback (in reverse order)
migrations_to_rollback = []
for applied in reversed(applied_migrations):
if applied['version'] > target_version:
# Load migration to get rollback SQL
all_migrations = self.load_migrations()
migration = next((m for m in all_migrations if m.version == applied['version']), None)
if migration:
# Set rollback SQL from stored data
migration.down_sql = applied.get('rollback_sql', '')
migrations_to_rollback.append(migration)
if not migrations_to_rollback:
logger.info("No migrations to rollback")
return True
success = True
for migration in migrations_to_rollback:
if not self.rollback_migration(migration):
success = False
break
if success:
logger.info("Migration down completed successfully")
else:
logger.error("Migration down failed")
return success
def get_migration_status(self) -> Dict[str, Any]:
"""Get current migration status"""
try:
applied_migrations = self.get_applied_migrations()
pending_migrations = self.get_pending_migrations()
all_migrations = self.load_migrations()
return {
'total_migrations': len(all_migrations),
'applied_migrations': len(applied_migrations),
'pending_migrations': len(pending_migrations),
'current_version': applied_migrations[-1]['version'] if applied_migrations else None,
'latest_version': all_migrations[-1].version if all_migrations else None,
'applied_list': [m['version'] for m in applied_migrations],
'pending_list': [m.version for m in pending_migrations]
}
except Exception as e:
logger.error(f"Error getting migration status: {e}")
return {}
class BackupManager:
"""Database backup and restore manager"""
def __init__(self, db_path: str, backup_dir: str = 'backups'):
self.db_path = db_path
self.backup_dir = backup_dir
os.makedirs(backup_dir, exist_ok=True)
def create_backup(self, backup_name: Optional[str] = None) -> str:
"""Create a database backup"""
if not backup_name:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}.db"
backup_path = os.path.join(self.backup_dir, backup_name)
try:
# Close any existing connections to avoid database locking issues
import shutil
shutil.copy2(self.db_path, backup_path)
logger.info(f"Created backup: {backup_path}")
return backup_path
except Exception as e:
logger.error(f"Error creating backup: {e}")
return ""
def restore_backup(self, backup_path: str, create_new: bool = False) -> bool:
"""Restore database from backup"""
if not os.path.exists(backup_path):
logger.error(f"Backup file not found: {backup_path}")
return False
try:
import shutil
if create_new:
backup_path_new = f"{backup_path}.restored"
shutil.copy2(backup_path, backup_path_new)
logger.info(f"Created restored database: {backup_path_new}")
else:
shutil.copy2(backup_path, self.db_path)
logger.info(f"Restored database from: {backup_path}")
return True
except Exception as e:
logger.error(f"Error restoring backup: {e}")
return False
def list_backups(self) -> List[Dict[str, Any]]:
"""List all available backups"""
backups = []
if not os.path.exists(self.backup_dir):
return backups
for filename in os.listdir(self.backup_dir):
if filename.endswith('.db'):
filepath = os.path.join(self.backup_dir, filename)
stat = os.stat(filepath)
backups.append({
'filename': filename,
'filepath': filepath,
'size': stat.st_size,
'created': datetime.fromtimestamp(stat.st_ctime),
'modified': datetime.fromtimestamp(stat.st_mtime)
})
# Sort by creation date (newest first)
backups.sort(key=lambda x: x['created'], reverse=True)
return backups
# Example migration template
class SampleMigrations:
"""Sample migration definitions"""
@staticmethod
def create_initial_schema():
"""Initial schema migration"""
return Migration(
version="001_initial_schema",
description="Create initial database schema",
up_sql='''
-- Create users table
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create posts table
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title VARCHAR(200) NOT NULL,
content TEXT,
author_id INTEGER NOT NULL,
status VARCHAR(20) DEFAULT 'draft',
view_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users (id)
);
-- Create categories table
CREATE TABLE categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) UNIQUE NOT NULL,
slug VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create indexes
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_posts_author_id ON posts(author_id);
CREATE INDEX idx_posts_status ON posts(status);
CREATE INDEX idx_posts_created_at ON posts(created_at);
CREATE INDEX idx_categories_name ON categories(name);
''',
down_sql='''
DROP INDEX IF EXISTS idx_categories_name;
DROP INDEX IF EXISTS idx_posts_created_at;
DROP INDEX IF EXISTS idx_posts_status;
DROP INDEX IF EXISTS idx_posts_author_id;
DROP INDEX IF EXISTS idx_users_email;
DROP INDEX IF EXISTS idx_users_username;
DROP TABLE IF EXISTS categories;
DROP TABLE IF EXISTS posts;
DROP TABLE IF EXISTS users;
'''
)
@staticmethod
def add_user_profile():
"""Add user profile fields"""
return Migration(
version="002_add_user_profile",
description="Add profile fields to users table",
up_sql='''
-- Add new columns to users table
ALTER TABLE users ADD COLUMN bio TEXT;
ALTER TABLE users ADD COLUMN avatar_url VARCHAR(255);
ALTER TABLE users ADD COLUMN birth_date DATE;
ALTER TABLE users ADD COLUMN phone VARCHAR(20);
ALTER TABLE users ADD COLUMN address TEXT;
ALTER TABLE users ADD COLUMN website VARCHAR(255);
-- Add is_verified column
ALTER TABLE users ADD COLUMN is_verified BOOLEAN DEFAULT 0;
ALTER TABLE users ADD COLUMN email_verified_at TIMESTAMP;
''',
down_sql='''
-- Remove columns from users table
ALTER TABLE users DROP COLUMN IF EXISTS email_verified_at;
ALTER TABLE users DROP COLUMN IF EXISTS is_verified;
ALTER TABLE users DROP COLUMN IF EXISTS website;
ALTER TABLE DROP COLUMN IF EXISTS address;
ALTER TABLE DROP COLUMN IF EXISTS phone;
ALTER TABLE DROP COLUMN IF EXISTS birth_date;
ALTER TABLE DROP COLUMN IF EXISTS avatar_url;
ALTER TABLE DROP COLUMN IF EXISTS bio;
'''
)
# Example usage
def main():
# Initialize migration system
migration_system = MigrationSystem('blog.db', 'migrations')
# Connect to database
if migration_system.connect():
try:
# Create initial migration if it doesn't exist
initial_migration = SampleMigrations.create_initial_schema()
all_migrations = migration_system.load_migrations()
if not all_migrations:
initial_migration = migration_system.create_migration(
version="001_initial_schema",
description="Create initial database schema",
up_sql='''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''',
down_sql='''
DROP TABLE IF EXISTS users;
'''
)
logger.info("Created initial migration file")
# Get migration status
status = migration_system.get_migration_status()
print("\n📊 Migration Status:")
print(f" Total Migrations: {status['total_migrations']}")
print(f" Applied: {status['applied_migrations']}")
print(f" Pending: {status['pending_migrations']}")
print(f" Current Version: {status['current_version']}")
print(f" Latest Version: {status['latest_version']}")
# Migrate up to latest
print("\n🔄 Running migrations...")
if migration_system.migrate_up():
print("✅ All migrations applied successfully")
else:
print("❌ Migration failed")
# Create backup after migration
backup_manager = BackupManager('blog.db')
print("\n💾 Creating backup...")
backup_path = backup_manager.create_backup()
# List available backups
print("\n📋 Available Backups:")
backups = backup_manager.list_backups()
for backup in backups:
print(f" {backup['filename']} ({backup['size']} bytes, created {backup['created']})")
except Exception as e:
logger.error(f"Error in migration system: {e}")
finally:
# Disconnect from database
migration_system.disconnect()
if __name__ == "__main__":
main()