🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples de Base de Données Web Python
Exemples de base de données Web Python incluant la connexion SQLite, les requêtes SQL et la gestion des transactions
💻 Connexion SQLite python
🟢 simple
⭐⭐
Se connecter à la base de données SQLite, créer des tables et gérer les connexions de base de données avec une gestion appropriée des erreurs
⏱️ 20 min
🏷️ python, web, database, sqlite
Prerequisites:
Basic Python, SQL basics
# Web Python SQLite Connection Examples
# Connecting to SQLite database and managing database connections
# 1. Basic Connection
import sqlite3
from typing import Optional, List, Tuple, Any
from contextlib import contextmanager
import os
def connect_to_database(db_path: str = ':memory:') -> sqlite3.Connection:
"""
Connect to SQLite database
Args:
db_path: Database file path (:memory: for in-memory)
Returns:
Connection object
"""
connection = sqlite3.connect(db_path)
return connection
def close_connection(connection: sqlite3.Connection) -> None:
"""
Close database connection
Args:
connection: Connection to close
"""
if connection:
connection.close()
def is_connection_open(connection: sqlite3.Connection) -> bool:
"""
Check if connection is open
Args:
connection: Connection to check
Returns:
True if open
"""
try:
connection.execute('SELECT 1')
return True
except:
return False
# 2. Connection with Context Manager
@contextmanager
def database_connection(db_path: str = ':memory:'):
"""
Context manager for database connection
Args:
db_path: Database file path
Yields:
Connection object
"""
conn = None
try:
conn = sqlite3.connect(db_path)
yield conn
finally:
if conn:
conn.close()
def with_connection(func):
"""
Decorator for database connection
Args:
func: Function to wrap
Returns:
Wrapped function
"""
def wrapper(*args, **kwargs):
db_path = kwargs.get('db_path', ':memory:')
with database_connection(db_path) as conn:
kwargs['conn'] = conn
return func(*args, **kwargs)
return wrapper
# 3. Connection Configuration
def configure_connection(conn: sqlite3.Connection, **settings) -> None:
"""
Configure connection settings
Args:
conn: Connection object
**settings: Configuration settings
"""
# Enable foreign keys
if settings.get('foreign_keys', True):
conn.execute('PRAGMA foreign_keys = ON')
# Set row factory
if settings.get('row_factory'):
conn.row_factory = settings['row_factory']
# Set timeout
if 'timeout' in settings:
conn.timeout = settings['timeout']
# Set isolation level
if 'isolation_level' in settings:
conn.isolation_level = settings['isolation_level']
def get_connection_info(conn: sqlite3.Connection) -> dict:
"""
Get connection information
Args:
conn: Connection object
Returns:
Connection info dictionary
"""
cursor = conn.cursor()
cursor.execute('PRAGMA database_list')
databases = cursor.fetchall()
cursor.execute('SELECT sqlite_version()')
version = cursor.fetchone()[0]
return {
'databases': databases,
'sqlite_version': version,
'isolation_level': conn.isolation_level,
'total_changes': conn.total_changes,
}
# 4. Table Operations
def create_table(conn: sqlite3.Connection, table_name: str, schema: str) -> bool:
"""
Create table
Args:
conn: Connection object
table_name: Table name
schema: Table schema (SQL)
Returns:
True if successful
"""
try:
cursor = conn.cursor()
cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} ({schema})')
conn.commit()
return True
except Exception as e:
print(f"Error creating table: {e}")
return False
def drop_table(conn: sqlite3.Connection, table_name: str) -> bool:
"""
Drop table
Args:
conn: Connection object
table_name: Table name
Returns:
True if successful
"""
try:
cursor = conn.cursor()
cursor.execute(f'DROP TABLE IF EXISTS {table_name}')
conn.commit()
return True
except Exception as e:
print(f"Error dropping table: {e}")
return False
def table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
"""
Check if table exists
Args:
conn: Connection object
table_name: Table name
Returns:
True if exists
"""
cursor = conn.cursor()
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name=?
""", (table_name,))
return cursor.fetchone() is not None
def get_table_schema(conn: sqlite3.Connection, table_name: str) -> List[Tuple]:
"""
Get table schema
Args:
conn: Connection object
table_name: Table name
Returns:
List of column info
"""
cursor = conn.cursor()
cursor.execute(f'PRAGMA table_info({table_name})')
return cursor.fetchall()
def list_tables(conn: sqlite3.Connection) -> List[str]:
"""
List all tables
Args:
conn: Connection object
Returns:
List of table names
"""
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
return [row[0] for row in cursor.fetchall()]
# 5. Database File Operations
def create_database_file(file_path: str) -> bool:
"""
Create new database file
Args:
file_path: Path to database file
Returns:
True if successful
"""
try:
conn = sqlite3.connect(file_path)
conn.close()
return True
except Exception as e:
print(f"Error creating database: {e}")
return False
def delete_database_file(file_path: str) -> bool:
"""
Delete database file
Args:
file_path: Path to database file
Returns:
True if deleted
"""
try:
if os.path.exists(file_path):
os.remove(file_path)
return True
return False
except Exception as e:
print(f"Error deleting database: {e}")
return False
def database_file_exists(file_path: str) -> bool:
"""
Check if database file exists
Args:
file_path: Path to check
Returns:
True if exists
"""
return os.path.exists(file_path) and file_path.endswith('.db')
def get_database_size(file_path: str) -> int:
"""
Get database file size
Args:
file_path: Path to database
Returns:
Size in bytes
"""
if os.path.exists(file_path):
return os.path.getsize(file_path)
return 0
# 6. Row Factory Configuration
def set_dict_row_factory(conn: sqlite3.Connection) -> None:
"""
Set row factory to return dictionaries
Args:
conn: Connection object
"""
conn.row_factory = sqlite3.Row
def set_tuple_row_factory(conn: sqlite3.Connection) -> None:
"""
Set row factory to return tuples (default)
Args:
conn: Connection object
"""
conn.row_factory = None
# 7. Connection Pooling (Simple Implementation)
class ConnectionPool:
"""Simple connection pool"""
def __init__(self, db_path: str, pool_size: int = 5):
"""
Initialize connection pool
Args:
db_path: Database path
pool_size: Maximum connections
"""
self.db_path = db_path
self.pool_size = pool_size
self.pool: List[sqlite3.Connection] = []
self.used: List[sqlite3.Connection] = []
def get_connection(self) -> sqlite3.Connection:
"""Get connection from pool"""
if self.pool:
conn = self.pool.pop()
self.used.append(conn)
return conn
if len(self.used) < self.pool_size:
conn = sqlite3.connect(self.db_path)
self.used.append(conn)
return conn
raise Exception("Connection pool exhausted")
def return_connection(self, conn: sqlite3.Connection) -> None:
"""Return connection to pool"""
if conn in self.used:
self.used.remove(conn)
self.pool.append(conn)
def close_all(self) -> None:
"""Close all connections"""
for conn in self.pool + self.used:
conn.close()
self.pool.clear()
self.used.clear()
# 8. Backup and Restore
def backup_database(source: str, backup: str) -> bool:
"""
Create database backup
Args:
source: Source database path
backup: Backup database path
Returns:
True if successful
"""
try:
source_conn = sqlite3.connect(source)
backup_conn = sqlite3.connect(backup)
source_conn.backup(backup_conn)
backup_conn.close()
source_conn.close()
return True
except Exception as e:
print(f"Backup error: {e}")
return False
def restore_database(backup: str, target: str) -> bool:
"""
Restore database from backup
Args:
backup: Backup database path
target: Target database path
Returns:
True if successful
"""
return backup_database(backup, target)
# Usage Examples
def demonstrate_sqlite_connect():
print("=== Web Python SQLite Connection Examples ===\n")
# 1. Basic connection
print("--- 1. Basic Connection ---")
conn = connect_to_database()
print(f"Connected: {is_connection_open(conn)}")
print(f"Connection info: {get_connection_info(conn)}")
close_connection(conn)
# 2. Context manager
print("\n--- 2. Context Manager ---")
with database_connection() as conn:
print(f"Connected in context: {is_connection_open(conn)}")
print(f"Closed after context: {not is_connection_open(conn)}")
# 3. Configure connection
print("\n--- 3. Configure Connection ---")
with database_connection() as conn:
configure_connection(conn, row_factory=sqlite3.Row)
print(f"Configured connection: {get_connection_info(conn)}")
# 4. Table operations
print("\n--- 4. Table Operations ---")
with database_connection() as conn:
# Create table
create_table(conn, 'users', '''
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE
''')
print(f"Table exists: {table_exists(conn, 'users')}")
print(f"Table schema: {get_table_schema(conn, 'users')}")
print(f"All tables: {list_tables(conn)}")
# 5. Row factory
print("\n--- 5. Row Factory ---")
with database_connection() as conn:
create_table(conn, 'test', 'id INTEGER, value TEXT')
conn.execute('INSERT INTO test VALUES (1, "test")')
conn.commit()
# Default (tuple)
cursor = conn.execute('SELECT * FROM test')
print(f"Default row factory: {cursor.fetchone()}")
# Dict row factory
set_dict_row_factory(conn)
cursor = conn.execute('SELECT * FROM test')
row = cursor.fetchone()
print(f"Dict row factory: {dict(row)}")
# 6. Connection pool
print("\n--- 6. Connection Pool ---")
pool = ConnectionPool(':memory:', pool_size=2)
conn1 = pool.get_connection()
conn2 = pool.get_connection()
print(f"Pool size: {len(pool.used)}")
pool.return_connection(conn1)
print(f"After return: {len(pool.used)}")
pool.close_all()
print("\n=== All SQLite Connection Examples Completed ===")
# Export functions
export { connect_to_database, close_connection, is_connection_open }
export { database_connection, with_connection }
export { configure_connection, get_connection_info }
export { create_table, drop_table, table_exists, get_table_schema, list_tables }
export { create_database_file, delete_database_file, database_file_exists, get_database_size }
export { set_dict_row_factory, set_tuple_row_factory }
export { ConnectionPool }
export { backup_database, restore_database }
export { demonstrate_sqlite_connect }
💻 Exécuter une Requête SQL python
🟡 intermediate
⭐⭐⭐
Exécuter des requêtes SELECT, INSERT, UPDATE, DELETE avec liaison de paramètres et traitement des résultats
⏱️ 30 min
🏷️ python, web, database, sql
Prerequisites:
Intermediate Python, SQL knowledge, sqlite3 module
# Web Python SQL Query Examples
# Executing SQL queries with proper parameter binding and result handling
# 1. Basic Query Execution
import sqlite3
from typing import List, Tuple, Dict, Any, Optional
def execute_query(conn: sqlite3.Connection, query: str, params: Tuple = ()) -> bool:
"""
Execute a query without returning results
Args:
conn: Connection object
query: SQL query
params: Query parameters
Returns:
True if successful
"""
try:
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
return True
except Exception as e:
print(f"Query error: {e}")
conn.rollback()
return False
def fetch_one(conn: sqlite3.Connection, query: str, params: Tuple = ()) -> Optional[Tuple]:
"""
Fetch one row
Args:
conn: Connection object
query: SQL query
params: Query parameters
Returns:
Row tuple or None
"""
try:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchone()
except Exception as e:
print(f"Fetch error: {e}")
return None
def fetch_all(conn: sqlite3.Connection, query: str, params: Tuple = ()) -> List[Tuple]:
"""
Fetch all rows
Args:
conn: Connection object
query: SQL query
params: Query parameters
Returns:
List of row tuples
"""
try:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
except Exception as e:
print(f"Fetch error: {e}")
return []
def fetch_many(conn: sqlite3.Connection, query: str, size: int, params: Tuple = ()) -> List[Tuple]:
"""
Fetch multiple rows
Args:
conn: Connection object
query: SQL query
size: Number of rows
params: Query parameters
Returns:
List of row tuples
"""
try:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchmany(size)
except Exception as e:
print(f"Fetch error: {e}")
return []
# 2. INSERT Operations
def insert_single(conn: sqlite3.Connection, table: str, data: Dict[str, Any]) -> bool:
"""
Insert single row
Args:
conn: Connection object
table: Table name
data: Column-value pairs
Returns:
True if successful
"""
try:
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
query = f'INSERT INTO {table} ({columns}) VALUES ({placeholders})'
return execute_query(conn, query, tuple(data.values()))
except Exception as e:
print(f"Insert error: {e}")
return False
def insert_many(conn: sqlite3.Connection, table: str, columns: List[str], rows: List[Tuple]) -> int:
"""
Insert multiple rows
Args:
conn: Connection object
table: Table name
columns: Column names
rows: List of value tuples
Returns:
Number of rows inserted
"""
try:
cursor = conn.cursor()
placeholders = ', '.join(['?'] * len(columns))
query = f'INSERT INTO {table} ({", ".join(columns)}) VALUES ({placeholders})'
cursor.executemany(query, rows)
conn.commit()
return cursor.rowcount
except Exception as e:
print(f"Batch insert error: {e}")
conn.rollback()
return 0
def insert_or_replace(conn: sqlite3.Connection, table: str, data: Dict[str, Any]) -> bool:
"""
Insert or replace on conflict
Args:
conn: Connection object
table: Table name
data: Column-value pairs
Returns:
True if successful
"""
try:
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
query = f'INSERT OR REPLACE INTO {table} ({columns}) VALUES ({placeholders})'
return execute_query(conn, query, tuple(data.values()))
except Exception as e:
print(f"Insert or replace error: {e}")
return False
def insert_or_ignore(conn: sqlite3.Connection, table: str, data: Dict[str, Any]) -> bool:
"""
Insert or ignore on conflict
Args:
conn: Connection object
table: Table name
data: Column-value pairs
Returns:
True if successful
"""
try:
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
query = f'INSERT OR IGNORE INTO {table} ({columns}) VALUES ({placeholders})'
return execute_query(conn, query, tuple(data.values()))
except Exception as e:
print(f"Insert or ignore error: {e}")
return False
def get_last_insert_id(conn: sqlite3.Connection) -> int:
"""
Get last insert row ID
Args:
conn: Connection object
Returns:
Last insert ID
"""
cursor = conn.cursor()
cursor.execute('SELECT last_insert_rowid()')
return cursor.fetchone()[0]
# 3. SELECT Operations
def select_all(conn: sqlite3.Connection, table: str, where: str = '', params: Tuple = ()) -> List[Tuple]:
"""
Select all rows
Args:
conn: Connection object
table: Table name
where: WHERE clause
params: Query parameters
Returns:
List of rows
"""
query = f'SELECT * FROM {table}'
if where:
query += f' WHERE {where}'
return fetch_all(conn, query, params)
def select_by_id(conn: sqlite3.Connection, table: str, row_id: int) -> Optional[Tuple]:
"""
Select row by ID
Args:
conn: Connection object
table: Table name
row_id: Row ID
Returns:
Row tuple or None
"""
return fetch_one(conn, f'SELECT * FROM {table} WHERE id = ?', (row_id,))
def select_columns(conn: sqlite3.Connection, table: str, columns: List[str], where: str = '', params: Tuple = ()) -> List[Tuple]:
"""
Select specific columns
Args:
conn: Connection object
table: Table name
columns: Column names
where: WHERE clause
params: Query parameters
Returns:
List of rows
"""
query = f'SELECT {", ".join(columns)} FROM {table}'
if where:
query += f' WHERE {where}'
return fetch_all(conn, query, params)
def select_with_join(conn: sqlite3.Connection, table1: str, table2: str, on: str, columns: str = '*') -> List[Tuple]:
"""
Select with JOIN
Args:
conn: Connection object
table1: First table
table2: Second table
on: JOIN condition
columns: Columns to select
Returns:
List of rows
"""
query = f'SELECT {columns} FROM {table1} JOIN {table2} ON {on}'
return fetch_all(conn, query)
def select_distinct(conn: sqlite3.Connection, table: str, column: str) -> List[Any]:
"""
Select distinct values
Args:
conn: Connection object
table: Table name
column: Column name
Returns:
List of distinct values
"""
query = f'SELECT DISTINCT {column} FROM {table}'
rows = fetch_all(conn, query)
return [row[0] for row in rows]
# 4. UPDATE Operations
def update_by_id(conn: sqlite3.Connection, table: str, row_id: int, data: Dict[str, Any]) -> bool:
"""
Update row by ID
Args:
conn: Connection object
table: Table name
row_id: Row ID
data: Column-value pairs
Returns:
True if successful
"""
try:
set_clause = ', '.join([f'{k} = ?' for k in data.keys()])
query = f'UPDATE {table} SET {set_clause} WHERE id = ?'
params = tuple(data.values()) + (row_id,)
return execute_query(conn, query, params)
except Exception as e:
print(f"Update error: {e}")
return False
def update_where(conn: sqlite3.Connection, table: str, data: Dict[str, Any], where: str, params: Tuple = ()) -> int:
"""
Update rows matching condition
Args:
conn: Connection object
table: Table name
data: Column-value pairs
where: WHERE clause
params: Additional parameters
Returns:
Number of rows updated
"""
try:
set_clause = ', '.join([f'{k} = ?' for k in data.keys()])
query = f'UPDATE {table} SET {set_clause} WHERE {where}'
cursor = conn.cursor()
cursor.execute(query, tuple(data.values()) + params)
conn.commit()
return cursor.rowcount
except Exception as e:
print(f"Update where error: {e}")
conn.rollback()
return 0
def increment_column(conn: sqlite3.Connection, table: str, column: str, where: str = '', params: Tuple = ()) -> bool:
"""
Increment column value
Args:
conn: Connection object
table: Table name
column: Column to increment
where: WHERE clause
params: Query parameters
Returns:
True if successful
"""
query = f'UPDATE {table} SET {column} = {column} + 1'
if where:
query += f' WHERE {where}'
return execute_query(conn, query, params)
# 5. DELETE Operations
def delete_by_id(conn: sqlite3.Connection, table: str, row_id: int) -> bool:
"""
Delete row by ID
Args:
conn: Connection object
table: Table name
row_id: Row ID
Returns:
True if successful
"""
return execute_query(conn, f'DELETE FROM {table} WHERE id = ?', (row_id,))
def delete_where(conn: sqlite3.Connection, table: str, where: str, params: Tuple = ()) -> int:
"""
Delete rows matching condition
Args:
conn: Connection object
table: Table name
where: WHERE clause
params: Query parameters
Returns:
Number of rows deleted
"""
try:
query = f'DELETE FROM {table} WHERE {where}'
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
except Exception as e:
print(f"Delete where error: {e}")
conn.rollback()
return 0
def delete_all(conn: sqlite3.Connection, table: str) -> bool:
"""
Delete all rows from table
Args:
conn: Connection object
table: Table name
Returns:
True if successful
"""
return execute_query(conn, f'DELETE FROM {table}')
# 6. Aggregation Queries
def count_rows(conn: sqlite3.Connection, table: str, where: str = '', params: Tuple = ()) -> int:
"""
Count rows
Args:
conn: Connection object
table: Table name
where: WHERE clause
params: Query parameters
Returns:
Row count
"""
query = f'SELECT COUNT(*) FROM {table}'
if where:
query += f' WHERE {where}'
result = fetch_one(conn, query, params)
return result[0] if result else 0
def sum_column(conn: sqlite3.Connection, table: str, column: str, where: str = '') -> float:
"""
Sum column values
Args:
conn: Connection object
table: Table name
column: Column name
where: WHERE clause
Returns:
Sum value
"""
query = f'SELECT SUM({column}) FROM {table}'
if where:
query += f' WHERE {where}'
result = fetch_one(conn, query)
return result[0] if result and result[0] else 0
def avg_column(conn: sqlite3.Connection, table: str, column: str, where: str = '') -> float:
"""
Average column values
Args:
conn: Connection object
table: Table name
column: Column name
where: WHERE clause
Returns:
Average value
"""
query = f'SELECT AVG({column}) FROM {table}'
if where:
query += f' WHERE {where}'
result = fetch_one(conn, query)
return result[0] if result and result[0] else 0
def min_max_column(conn: sqlite3.Connection, table: str, column: str, where: str = '') -> Tuple:
"""
Get min and max of column
Args:
conn: Connection object
table: Table name
column: Column name
where: WHERE clause
Returns:
(min, max) tuple
"""
query = f'SELECT MIN({column}), MAX({column}) FROM {table}'
if where:
query += f' WHERE {where}'
result = fetch_one(conn, query)
return result if result else (None, None)
def group_by(conn: sqlite3.Connection, table: str, group_column: str, agg_column: str, operation: str = 'COUNT') -> List[Tuple]:
"""
Group by column
Args:
conn: Connection object
table: Table name
group_column: Column to group by
agg_column: Column to aggregate
operation: Aggregation operation
Returns:
List of (group, aggregated_value) tuples
"""
query = f'SELECT {group_column}, {operation}({agg_column}) FROM {table} GROUP BY {group_column}'
return fetch_all(conn, query)
# 7. Advanced Queries
def select_with_limit(conn: sqlite3.Connection, table: str, limit: int, offset: int = 0) -> List[Tuple]:
"""
Select with LIMIT and OFFSET
Args:
conn: Connection object
table: Table name
limit: Max rows
offset: Row offset
Returns:
List of rows
"""
query = f'SELECT * FROM {table} LIMIT {limit} OFFSET {offset}'
return fetch_all(conn, query)
def select_with_order(conn: sqlite3.Connection, table: str, order_column: str, ascending: bool = True, limit: int = 0) -> List[Tuple]:
"""
Select with ORDER BY
Args:
conn: Connection object
table: Table name
order_column: Column to order by
ascending: Sort direction
limit: Max rows
Returns:
List of rows
"""
direction = 'ASC' if ascending else 'DESC'
query = f'SELECT * FROM {table} ORDER BY {order_column} {direction}'
if limit > 0:
query += f' LIMIT {limit}'
return fetch_all(conn, query)
def search_like(conn: sqlite3.Connection, table: str, column: str, pattern: str) -> List[Tuple]:
"""
Search with LIKE
Args:
conn: Connection object
table: Table name
column: Column to search
pattern: Search pattern
Returns:
List of matching rows
"""
query = f'SELECT * FROM {table} WHERE {column} LIKE ?'
return fetch_all(conn, query, (pattern,))
def select_between(conn: sqlite3.Connection, table: str, column: str, start: Any, end: Any) -> List[Tuple]:
"""
Select with BETWEEN
Args:
conn: Connection object
table: Table name
column: Column name
start: Start value
end: End value
Returns:
List of rows
"""
query = f'SELECT * FROM {table} WHERE {column} BETWEEN ? AND ?'
return fetch_all(conn, query, (start, end))
# 8. Query Result Processing
def rows_to_dicts(rows: List[Tuple], columns: List[str]) -> List[Dict[str, Any]]:
"""
Convert rows to list of dictionaries
Args:
rows: List of row tuples
columns: Column names
Returns:
List of dictionaries
"""
return [dict(zip(columns, row)) for row in rows]
def get_row_count(conn: sqlite3.Connection, table: str) -> int:
"""Get total row count"""
return count_rows(conn, table)
def table_exists_with_data(conn: sqlite3.Connection, table: str) -> bool:
"""Check if table has data"""
return count_rows(conn, table) > 0
# Usage Examples
def demonstrate_sqlite_query():
print("=== Web Python SQL Query Examples ===\n")
# Setup database
conn = sqlite3.connect(':memory:')
conn.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT,
age INTEGER,
salary REAL
)
''')
conn.commit()
# 1. INSERT operations
print("--- 1. INSERT Operations ---")
insert_single(conn, 'users', {'name': 'Alice', 'email': '[email protected]', 'age': 30, 'salary': 50000})
insert_single(conn, 'users', {'name': 'Bob', 'email': '[email protected]', 'age': 25, 'salary': 45000})
insert_many(conn, 'users', ['name', 'email', 'age', 'salary'], [
('Charlie', '[email protected]', 35, 60000),
('David', '[email protected]', 28, 55000),
('Eve', '[email protected]', 32, 58000)
])
print(f"Rows inserted: {count_rows(conn, 'users')}")
print(f"Last insert ID: {get_last_insert_id(conn)}")
# 2. SELECT operations
print("\n--- 2. SELECT Operations ---")
print(f"All users: {fetch_all(conn, 'SELECT * FROM users')}")
print(f"User by ID 1: {fetch_one(conn, 'SELECT * FROM users WHERE id = ?', (1,))}")
print(f"Select columns: {select_columns(conn, 'users', ['name', 'email'])}")
# 3. UPDATE operations
print("\n--- 3. UPDATE Operations ---")
update_by_id(conn, 'users', 1, {'salary': 55000})
increment_column(conn, 'users', 'age', 'name = ?', ('Alice',))
print(f"After update: {fetch_one(conn, 'SELECT * FROM users WHERE id = 1')}")
# 4. DELETE operations
print("\n--- 4. DELETE Operations ---")
delete_by_id(conn, 'users', 5)
print(f"After delete ID 5: {count_rows(conn, 'users')}")
# 5. Aggregation
print("\n--- 5. Aggregation Queries ---")
print(f"Count: {count_rows(conn, 'users')}")
print(f"Average salary: {avg_column(conn, 'users', 'salary'):.2f}")
print(f"Total salary: {sum_column(conn, 'users', 'salary'):.2f}")
print(f"Min/max age: {min_max_column(conn, 'users', 'age')}")
print(f"Group by age range: {group_by(conn, 'users', 'age', 'salary', 'AVG')}")
# 6. Advanced queries
print("\n--- 6. Advanced Queries ---")
print(f"Limit 2: {select_with_limit(conn, 'users', 2)}")
print(f"Order by salary: {select_with_order(conn, 'users', 'salary', ascending=False)}")
print(f"Like search: {search_like(conn, 'users', 'name', '%a%')}")
# 7. BETWEEN
print("\n--- 7. BETWEEN Query ---")
print(f"Age between 25 and 32: {select_between(conn, 'users', 'age', 25, 32)}")
conn.close()
print("\n=== All SQL Query Examples Completed ===")
# Export functions
export { execute_query, fetch_one, fetch_all, fetch_many }
export { insert_single, insert_many, insert_or_replace, insert_or_ignore, get_last_insert_id }
export { select_all, select_by_id, select_columns, select_with_join, select_distinct }
export { update_by_id, update_where, increment_column }
export { delete_by_id, delete_where, delete_all }
export { count_rows, sum_column, avg_column, min_max_column, group_by }
export { select_with_limit, select_with_order, search_like, select_between }
export { rows_to_dicts, get_row_count, table_exists_with_data }
export { demonstrate_sqlite_query }
💻 Traitement des Transactions python
🟡 intermediate
⭐⭐⭐
Gérer les transactions de base de données avec des opérations de commit, rollback et savepoint
⏱️ 35 min
🏷️ python, web, database, transaction
Prerequisites:
Intermediate Python, Database concepts, ACID properties
# Web Python Transaction Examples
# Managing database transactions with ACID properties
# 1. Basic Transaction Control
import sqlite3
from typing import Optional, List, Callable, Any
from contextlib import contextmanager
def begin_transaction(conn: sqlite3.Connection) -> None:
"""
Begin explicit transaction
Args:
conn: Connection object
"""
conn.execute('BEGIN TRANSACTION')
def commit_transaction(conn: sqlite3.Connection) -> bool:
"""
Commit transaction
Args:
conn: Connection object
Returns:
True if successful
"""
try:
conn.commit()
return True
except Exception as e:
print(f"Commit error: {e}")
return False
def rollback_transaction(conn: sqlite3.Connection) -> bool:
"""
Rollback transaction
Args:
conn: Connection object
Returns:
True if successful
"""
try:
conn.rollback()
return True
except Exception as e:
print(f"Rollback error: {e}")
return False
def get_transaction_status(conn: sqlite3.Connection) -> str:
"""
Get transaction status
Args:
conn: Connection object
Returns:
Transaction status string
"""
# Check if in transaction
cursor = conn.cursor()
cursor.execute('BEGIN')
try:
cursor.execute('COMMIT')
return 'no transaction'
except:
return 'in transaction'
# 2. Context Manager Transactions
@contextmanager
def transaction(conn: sqlite3.Connection):
"""
Context manager for transactions
Args:
conn: Connection object
Yields:
Connection object
Example:
with transaction(conn):
# operations
pass # auto commit on success, rollback on error
"""
try:
begin_transaction(conn)
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
def execute_in_transaction(conn: sqlite3.Connection, operations: List[Callable]) -> bool:
"""
Execute multiple operations in transaction
Args:
conn: Connection object
operations: List of callable operations
Returns:
True if all successful
"""
try:
with transaction(conn):
for operation in operations:
operation(conn)
return True
except Exception as e:
print(f"Transaction failed: {e}")
return False
# 3. Savepoint Operations
def create_savepoint(conn: sqlite3.Connection, name: str) -> None:
"""
Create savepoint
Args:
conn: Connection object
name: Savepoint name
"""
conn.execute(f'SAVEPOINT {name}')
def release_savepoint(conn: sqlite3.Connection, name: str) -> bool:
"""
Release savepoint
Args:
conn: Connection object
name: Savepoint name
Returns:
True if successful
"""
try:
conn.execute(f'RELEASE SAVEPOINT {name}')
return True
except Exception as e:
print(f"Release savepoint error: {e}")
return False
def rollback_to_savepoint(conn: sqlite3.Connection, name: str) -> bool:
"""
Rollback to savepoint
Args:
conn: Connection object
name: Savepoint name
Returns:
True if successful
"""
try:
conn.execute(f'ROLLBACK TO SAVEPOINT {name}')
return True
except Exception as e:
print(f"Rollback to savepoint error: {e}")
return False
@contextmanager
def savepoint(conn: sqlite3.Connection, name: str):
"""
Context manager for savepoint
Args:
conn: Connection object
name: Savepoint name
Yields:
Connection object
"""
try:
create_savepoint(conn, name)
yield conn
release_savepoint(conn, name)
except Exception as e:
rollback_to_savepoint(conn, name)
raise e
# 4. Isolation Levels
def set_isolation_level(conn: sqlite3.Connection, level: str) -> None:
"""
Set isolation level
Args:
conn: Connection object
level: 'DEFERRED', 'IMMEDIATE', 'EXCLUSIVE', or None (autocommit)
"""
conn.isolation_level = level
def get_isolation_level(conn: sqlite3.Connection) -> str:
"""
Get current isolation level
Args:
conn: Connection object
Returns:
Isolation level string
"""
return str(conn.isolation_level)
def deferred_transaction(conn: sqlite3.Connection) -> None:
"""Begin deferred transaction (default)"""
conn.execute('BEGIN DEFERRED')
def immediate_transaction(conn: sqlite3.Connection) -> None:
"""Begin immediate transaction (reserves reserved locks)"""
conn.execute('BEGIN IMMEDIATE')
def exclusive_transaction(conn: sqlite3.Connection) -> None:
"""Begin exclusive transaction (reserves all locks)"""
conn.execute('BEGIN EXCLUSIVE')
# 5. Transaction Patterns
def transfer_funds(conn: sqlite3.Connection, from_account: int, to_account: int, amount: float) -> bool:
"""
Transfer funds between accounts (atomic operation)
Args:
conn: Connection object
from_account: Source account ID
to_account: Destination account ID
amount: Transfer amount
Returns:
True if successful
"""
try:
with transaction(conn):
# Deduct from source
conn.execute(
'UPDATE accounts SET balance = balance - ? WHERE id = ?',
(amount, from_account)
)
# Add to destination
conn.execute(
'UPDATE accounts SET balance = balance + ? WHERE id = ?',
(amount, to_account)
)
# Verify sufficient funds
cursor = conn.execute(
'SELECT balance FROM accounts WHERE id = ?',
(from_account,)
)
balance = cursor.fetchone()[0]
if balance < 0:
raise Exception("Insufficient funds")
return True
except Exception as e:
print(f"Transfer failed: {e}")
return False
def batch_insert(conn: sqlite3.Connection, table: str, rows: List[tuple], batch_size: int = 1000) -> int:
"""
Batch insert with transaction
Args:
conn: Connection object
table: Table name
rows: List of row tuples
batch_size: Rows per transaction
Returns:
Total rows inserted
"""
total_inserted = 0
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
try:
with transaction(conn):
cursor = conn.cursor()
cursor.executemany(f'INSERT INTO {table} VALUES ({", ".join(["?"]*len(batch[0]))})', batch)
total_inserted += cursor.rowcount
except Exception as e:
print(f"Batch insert failed at {i}: {e}")
break
return total_inserted
def upsert_record(conn: sqlite3.Connection, table: str, data: dict, key_column: str) -> bool:
"""
Insert or update record
Args:
conn: Connection object
table: Table name
data: Record data
key_column: Column to check for existence
Returns:
True if successful
"""
try:
with transaction(conn):
# Check if exists
cursor = conn.execute(f'SELECT id FROM {table} WHERE {key_column} = ?', (data[key_column],))
if cursor.fetchone():
# Update
set_clause = ', '.join([f'{k} = ?' for k in data.keys()])
conn.execute(f'UPDATE {table} SET {set_clause} WHERE {key_column} = ?', tuple(data.values()) + (data[key_column],))
else:
# Insert
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
conn.execute(f'INSERT INTO {table} ({columns}) VALUES ({placeholders})', tuple(data.values()))
return True
except Exception as e:
print(f"Upsert failed: {e}")
return False
# 6. Nested Transactions
def nested_transaction_example(conn: sqlite3.Connection) -> bool:
"""
Example of nested transactions with savepoints
Args:
conn: Connection object
Returns:
True if successful
"""
try:
with transaction(conn):
# Outer transaction operations
conn.execute('INSERT INTO users (name) VALUES (?)', ('Alice',))
# Nested savepoint
with savepoint(conn, 'nested'):
conn.execute('INSERT INTO orders (user_id) VALUES (?)', (1,))
# Can rollback to this savepoint if needed
return True
except Exception as e:
print(f"Nested transaction failed: {e}")
return False
# 7. Transaction Logging
class TransactionLogger:
"""Log all transaction operations"""
def __init__(self, conn: sqlite3.Connection):
self.conn = conn
self.log: List[str] = []
def execute(self, query: str, params: tuple = ()):
"""Execute with logging"""
self.log.append(f"EXECUTE: {query} with {params}")
return self.conn.execute(query, params)
def commit(self):
"""Commit with logging"""
self.log.append("COMMIT")
return self.conn.commit()
def rollback(self):
"""Rollback with logging"""
self.log.append("ROLLBACK")
return self.conn.rollback()
def get_log(self) -> List[str]:
"""Get operation log"""
return self.log
# 8. Atomic Operations
def atomic_operation(conn: sqlite3.Connection, operation: Callable, *args, **kwargs) -> Any:
"""
Execute operation atomically
Args:
conn: Connection object
operation: Callable to execute
*args: Operation arguments
**kwargs: Operation keyword arguments
Returns:
Operation result or None on failure
"""
try:
with transaction(conn):
return operation(conn, *args, **kwargs)
except Exception as e:
print(f"Atomic operation failed: {e}")
return None
def compare_and_update(conn: sqlite3.Connection, table: str, id: int, column: str, old_value: Any, new_value: Any) -> bool:
"""
Atomic compare-and-update
Args:
conn: Connection object
table: Table name
id: Row ID
column: Column name
old_value: Expected current value
new_value: New value
Returns:
True if updated (value matched)
"""
try:
with transaction(conn):
cursor = conn.execute(f'SELECT {column} FROM {table} WHERE id = ?', (id,))
current = cursor.fetchone()[0]
if current == old_value:
conn.execute(f'UPDATE {table} SET {column} = ? WHERE id = ?', (new_value, id))
return True
return False
except Exception as e:
print(f"Compare and update failed: {e}")
return False
# 9. Transaction State Management
def in_transaction(conn: sqlite3.Connection) -> bool:
"""
Check if connection is in transaction
Args:
conn: Connection object
Returns:
True if in transaction
"""
return conn.in_transaction
def get_changes_count(conn: sqlite3.Connection) -> int:
"""
Get number of changes in transaction
Args:
conn: Connection object
Returns:
Number of changes
"""
return conn.total_changes
def reset_transaction(conn: sqlite3.Connection) -> None:
"""
Reset transaction state
Args:
conn: Connection object
"""
conn.rollback()
# Usage Examples
def demonstrate_transaction():
print("=== Web Python Transaction Examples ===\n")
# Setup database
conn = sqlite3.connect(':memory:')
conn.execute('''
CREATE TABLE accounts (
id INTEGER PRIMARY KEY,
name TEXT,
balance REAL
)
''')
conn.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT
)
''')
conn.execute('''
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
user_id INTEGER
)
''')
conn.commit()
# Insert initial data
conn.execute('INSERT INTO accounts VALUES (1, "Alice", 1000)')
conn.execute('INSERT INTO accounts VALUES (2, "Bob", 500)')
conn.commit()
# 1. Basic transaction
print("--- 1. Basic Transaction ---")
try:
with transaction(conn):
conn.execute('UPDATE accounts SET balance = balance + 100 WHERE id = 1')
print("In transaction: updated, not committed yet")
# Auto commit on success
except Exception as e:
print(f"Transaction failed: {e}")
print(f"After transaction: {fetch_all(conn, 'SELECT * FROM accounts')}")
# 2. Rollback example
print("\n--- 2. Rollback Example ---")
try:
with transaction(conn):
conn.execute('UPDATE accounts SET balance = balance - 1000 WHERE id = 1')
raise Exception("Intentional error")
except:
pass
print(f"After rollback: {fetch_all(conn, 'SELECT * FROM accounts')}")
# 3. Savepoints
print("\n--- 3. Savepoints ---")
try:
with transaction(conn):
conn.execute('UPDATE accounts SET balance = balance + 50 WHERE id = 1')
with savepoint(conn, 'sp1'):
conn.execute('UPDATE accounts SET balance = balance + 50 WHERE id = 2')
print("After nested update")
# Outer transaction continues
except Exception as e:
print(f"Error: {e}")
print(f"After savepoint: {fetch_all(conn, 'SELECT * FROM accounts')}")
# 4. Transfer funds
print("\n--- 4. Transfer Funds (Atomic) ---")
print(f"Before transfer: {fetch_all(conn, 'SELECT * FROM accounts')}")
success = transfer_funds(conn, 1, 2, 200)
print(f"Transfer success: {success}")
print(f"After transfer: {fetch_all(conn, 'SELECT * FROM accounts')}")
# 5. Batch insert
print("\n--- 5. Batch Insert ---")
batch_data = [
('User1',),
('User2',),
('User3',),
]
inserted = batch_insert(conn, 'users', batch_data, batch_size=2)
print(f"Batch inserted {inserted} rows")
print(f"Users: {fetch_all(conn, 'SELECT * FROM users')}")
# 6. Upsert
print("\n--- 6. Upsert ---")
upsert_record(conn, 'users', {'id': 4, 'name': 'User4'}, 'id')
upsert_record(conn, 'users', {'id': 1, 'name': 'UpdatedUser1'}, 'id')
print(f"After upsert: {fetch_all(conn, 'SELECT * FROM users')}")
# 7. Compare and update
print("\n--- 7. Compare and Update ---")
result = compare_and_update(conn, 'accounts', 1, 'balance', 1150, 1200)
print(f"Compare and update success: {result}")
print(f"Accounts: {fetch_all(conn, 'SELECT * FROM accounts')}")
# 8. Transaction logging
print("\n--- 8. Transaction Logging ---")
logger = TransactionLogger(conn)
try:
with transaction(conn):
logger.execute('UPDATE accounts SET balance = balance + 10 WHERE id = 1')
logger.execute('UPDATE accounts SET balance = balance - 10 WHERE id = 2')
except:
pass
print(f"Transaction log: {logger.get_log()}")
conn.close()
print("\n=== All Transaction Examples Completed ===")
def fetch_all(conn, query, params=()):
"""Helper function"""
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
# Export functions
export { begin_transaction, commit_transaction, rollback_transaction, get_transaction_status }
export { transaction, execute_in_transaction }
export { create_savepoint, release_savepoint, rollback_to_savepoint, savepoint }
export { set_isolation_level, get_isolation_level, deferred_transaction, immediate_transaction, exclusive_transaction }
export { transfer_funds, batch_insert, upsert_record, nested_transaction_example }
export { TransactionLogger }
export { atomic_operation, compare_and_update }
export { in_transaction, get_changes_count, reset_transaction }
export { demonstrate_transaction }