Web Python 数据库示例

Web Python 数据库示例,包括SQLite连接、SQL查询和事务处理

💻 SQLite连接 python

🟢 simple ⭐⭐

连接SQLite数据库,创建表,并使用适当的错误处理管理数据库连接

⏱️ 20 min 🏷️ python, web, database, sqlite
Prerequisites: Basic Python, SQL basics
# Web Python SQLite Connection Examples
# Connecting to SQLite database and managing database connections

# 1. Basic Connection
import sqlite3
from typing import Optional, List, Tuple, Any
from contextlib import contextmanager
import os

def connect_to_database(db_path: str = ':memory:') -> sqlite3.Connection:
    """
    Connect to SQLite database

    Args:
        db_path: Database file path (:memory: for in-memory)

    Returns:
        Connection object
    """
    connection = sqlite3.connect(db_path)
    return connection

def close_connection(connection: sqlite3.Connection) -> None:
    """
    Close database connection

    Args:
        connection: Connection to close
    """
    if connection:
        connection.close()

def is_connection_open(connection: sqlite3.Connection) -> bool:
    """
    Check if connection is open

    Args:
        connection: Connection to check

    Returns:
        True if open
    """
    try:
        connection.execute('SELECT 1')
        return True
    except:
        return False

# 2. Connection with Context Manager
@contextmanager
def database_connection(db_path: str = ':memory:'):
    """
    Context manager for database connection

    Args:
        db_path: Database file path

    Yields:
        Connection object
    """
    conn = None
    try:
        conn = sqlite3.connect(db_path)
        yield conn
    finally:
        if conn:
            conn.close()

def with_connection(func):
    """
    Decorator for database connection

    Args:
        func: Function to wrap

    Returns:
        Wrapped function
    """
    def wrapper(*args, **kwargs):
        db_path = kwargs.get('db_path', ':memory:')
        with database_connection(db_path) as conn:
            kwargs['conn'] = conn
            return func(*args, **kwargs)
    return wrapper

# 3. Connection Configuration
def configure_connection(conn: sqlite3.Connection, **settings) -> None:
    """
    Configure connection settings

    Args:
        conn: Connection object
        **settings: Configuration settings
    """
    # Enable foreign keys
    if settings.get('foreign_keys', True):
        conn.execute('PRAGMA foreign_keys = ON')

    # Set row factory
    if settings.get('row_factory'):
        conn.row_factory = settings['row_factory']

    # Set timeout
    if 'timeout' in settings:
        conn.timeout = settings['timeout']

    # Set isolation level
    if 'isolation_level' in settings:
        conn.isolation_level = settings['isolation_level']

def get_connection_info(conn: sqlite3.Connection) -> dict:
    """
    Get connection information

    Args:
        conn: Connection object

    Returns:
        Connection info dictionary
    """
    cursor = conn.cursor()
    cursor.execute('PRAGMA database_list')
    databases = cursor.fetchall()

    cursor.execute('SELECT sqlite_version()')
    version = cursor.fetchone()[0]

    return {
        'databases': databases,
        'sqlite_version': version,
        'isolation_level': conn.isolation_level,
        'total_changes': conn.total_changes,
    }

# 4. Table Operations
def create_table(conn: sqlite3.Connection, table_name: str, schema: str) -> bool:
    """
    Create table

    Args:
        conn: Connection object
        table_name: Table name
        schema: Table schema (SQL)

    Returns:
        True if successful
    """
    try:
        cursor = conn.cursor()
        cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} ({schema})')
        conn.commit()
        return True
    except Exception as e:
        print(f"Error creating table: {e}")
        return False

def drop_table(conn: sqlite3.Connection, table_name: str) -> bool:
    """
    Drop table

    Args:
        conn: Connection object
        table_name: Table name

    Returns:
        True if successful
    """
    try:
        cursor = conn.cursor()
        cursor.execute(f'DROP TABLE IF EXISTS {table_name}')
        conn.commit()
        return True
    except Exception as e:
        print(f"Error dropping table: {e}")
        return False

def table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
    """
    Check if table exists

    Args:
        conn: Connection object
        table_name: Table name

    Returns:
        True if exists
    """
    cursor = conn.cursor()
    cursor.execute("""
        SELECT name FROM sqlite_master
        WHERE type='table' AND name=?
    """, (table_name,))
    return cursor.fetchone() is not None

def get_table_schema(conn: sqlite3.Connection, table_name: str) -> List[Tuple]:
    """
    Get table schema

    Args:
        conn: Connection object
        table_name: Table name

    Returns:
        List of column info
    """
    cursor = conn.cursor()
    cursor.execute(f'PRAGMA table_info({table_name})')
    return cursor.fetchall()

def list_tables(conn: sqlite3.Connection) -> List[str]:
    """
    List all tables

    Args:
        conn: Connection object

    Returns:
        List of table names
    """
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    return [row[0] for row in cursor.fetchall()]

# 5. Database File Operations
def create_database_file(file_path: str) -> bool:
    """
    Create new database file

    Args:
        file_path: Path to database file

    Returns:
        True if successful
    """
    try:
        conn = sqlite3.connect(file_path)
        conn.close()
        return True
    except Exception as e:
        print(f"Error creating database: {e}")
        return False

def delete_database_file(file_path: str) -> bool:
    """
    Delete database file

    Args:
        file_path: Path to database file

    Returns:
        True if deleted
    """
    try:
        if os.path.exists(file_path):
            os.remove(file_path)
            return True
        return False
    except Exception as e:
        print(f"Error deleting database: {e}")
        return False

def database_file_exists(file_path: str) -> bool:
    """
    Check if database file exists

    Args:
        file_path: Path to check

    Returns:
        True if exists
    """
    return os.path.exists(file_path) and file_path.endswith('.db')

def get_database_size(file_path: str) -> int:
    """
    Get database file size

    Args:
        file_path: Path to database

    Returns:
        Size in bytes
    """
    if os.path.exists(file_path):
        return os.path.getsize(file_path)
    return 0

# 6. Row Factory Configuration
def set_dict_row_factory(conn: sqlite3.Connection) -> None:
    """
    Set row factory to return dictionaries

    Args:
        conn: Connection object
    """
    conn.row_factory = sqlite3.Row

def set_tuple_row_factory(conn: sqlite3.Connection) -> None:
    """
    Set row factory to return tuples (default)

    Args:
        conn: Connection object
    """
    conn.row_factory = None

# 7. Connection Pooling (Simple Implementation)
class ConnectionPool:
    """Simple connection pool"""

    def __init__(self, db_path: str, pool_size: int = 5):
        """
        Initialize connection pool

        Args:
            db_path: Database path
            pool_size: Maximum connections
        """
        self.db_path = db_path
        self.pool_size = pool_size
        self.pool: List[sqlite3.Connection] = []
        self.used: List[sqlite3.Connection] = []

    def get_connection(self) -> sqlite3.Connection:
        """Get connection from pool"""
        if self.pool:
            conn = self.pool.pop()
            self.used.append(conn)
            return conn

        if len(self.used) < self.pool_size:
            conn = sqlite3.connect(self.db_path)
            self.used.append(conn)
            return conn

        raise Exception("Connection pool exhausted")

    def return_connection(self, conn: sqlite3.Connection) -> None:
        """Return connection to pool"""
        if conn in self.used:
            self.used.remove(conn)
            self.pool.append(conn)

    def close_all(self) -> None:
        """Close all connections"""
        for conn in self.pool + self.used:
            conn.close()
        self.pool.clear()
        self.used.clear()

# 8. Backup and Restore
def backup_database(source: str, backup: str) -> bool:
    """
    Create database backup

    Args:
        source: Source database path
        backup: Backup database path

    Returns:
        True if successful
    """
    try:
        source_conn = sqlite3.connect(source)
        backup_conn = sqlite3.connect(backup)

        source_conn.backup(backup_conn)

        backup_conn.close()
        source_conn.close()
        return True
    except Exception as e:
        print(f"Backup error: {e}")
        return False

def restore_database(backup: str, target: str) -> bool:
    """
    Restore database from backup

    Args:
        backup: Backup database path
        target: Target database path

    Returns:
        True if successful
    """
    return backup_database(backup, target)

# Usage Examples
def demonstrate_sqlite_connect():
    print("=== Web Python SQLite Connection Examples ===\n")

    # 1. Basic connection
    print("--- 1. Basic Connection ---")
    conn = connect_to_database()
    print(f"Connected: {is_connection_open(conn)}")
    print(f"Connection info: {get_connection_info(conn)}")
    close_connection(conn)

    # 2. Context manager
    print("\n--- 2. Context Manager ---")
    with database_connection() as conn:
        print(f"Connected in context: {is_connection_open(conn)}")
    print(f"Closed after context: {not is_connection_open(conn)}")

    # 3. Configure connection
    print("\n--- 3. Configure Connection ---")
    with database_connection() as conn:
        configure_connection(conn, row_factory=sqlite3.Row)
        print(f"Configured connection: {get_connection_info(conn)}")

    # 4. Table operations
    print("\n--- 4. Table Operations ---")
    with database_connection() as conn:
        # Create table
        create_table(conn, 'users', '''
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE
        ''')

        print(f"Table exists: {table_exists(conn, 'users')}")
        print(f"Table schema: {get_table_schema(conn, 'users')}")
        print(f"All tables: {list_tables(conn)}")

    # 5. Row factory
    print("\n--- 5. Row Factory ---")
    with database_connection() as conn:
        create_table(conn, 'test', 'id INTEGER, value TEXT')
        conn.execute('INSERT INTO test VALUES (1, "test")')
        conn.commit()

        # Default (tuple)
        cursor = conn.execute('SELECT * FROM test')
        print(f"Default row factory: {cursor.fetchone()}")

        # Dict row factory
        set_dict_row_factory(conn)
        cursor = conn.execute('SELECT * FROM test')
        row = cursor.fetchone()
        print(f"Dict row factory: {dict(row)}")

    # 6. Connection pool
    print("\n--- 6. Connection Pool ---")
    pool = ConnectionPool(':memory:', pool_size=2)
    conn1 = pool.get_connection()
    conn2 = pool.get_connection()
    print(f"Pool size: {len(pool.used)}")
    pool.return_connection(conn1)
    print(f"After return: {len(pool.used)}")
    pool.close_all()

    print("\n=== All SQLite Connection Examples Completed ===")

# Export functions
export { connect_to_database, close_connection, is_connection_open }
export { database_connection, with_connection }
export { configure_connection, get_connection_info }
export { create_table, drop_table, table_exists, get_table_schema, list_tables }
export { create_database_file, delete_database_file, database_file_exists, get_database_size }
export { set_dict_row_factory, set_tuple_row_factory }
export { ConnectionPool }
export { backup_database, restore_database }
export { demonstrate_sqlite_connect }

💻 执行SQL查询 python

🟡 intermediate ⭐⭐⭐

执行SELECT、INSERT、UPDATE、DELETE查询,支持参数绑定和结果处理

⏱️ 30 min 🏷️ python, web, database, sql
Prerequisites: Intermediate Python, SQL knowledge, sqlite3 module
# Web Python SQL Query Examples
# Executing SQL queries with proper parameter binding and result handling

# 1. Basic Query Execution
import sqlite3
from typing import List, Tuple, Dict, Any, Optional

def execute_query(conn: sqlite3.Connection, query: str, params: Tuple = ()) -> bool:
    """
    Execute a query without returning results

    Args:
        conn: Connection object
        query: SQL query
        params: Query parameters

    Returns:
        True if successful
    """
    try:
        cursor = conn.cursor()
        cursor.execute(query, params)
        conn.commit()
        return True
    except Exception as e:
        print(f"Query error: {e}")
        conn.rollback()
        return False

def fetch_one(conn: sqlite3.Connection, query: str, params: Tuple = ()) -> Optional[Tuple]:
    """
    Fetch one row

    Args:
        conn: Connection object
        query: SQL query
        params: Query parameters

    Returns:
        Row tuple or None
    """
    try:
        cursor = conn.cursor()
        cursor.execute(query, params)
        return cursor.fetchone()
    except Exception as e:
        print(f"Fetch error: {e}")
        return None

def fetch_all(conn: sqlite3.Connection, query: str, params: Tuple = ()) -> List[Tuple]:
    """
    Fetch all rows

    Args:
        conn: Connection object
        query: SQL query
        params: Query parameters

    Returns:
        List of row tuples
    """
    try:
        cursor = conn.cursor()
        cursor.execute(query, params)
        return cursor.fetchall()
    except Exception as e:
        print(f"Fetch error: {e}")
        return []

def fetch_many(conn: sqlite3.Connection, query: str, size: int, params: Tuple = ()) -> List[Tuple]:
    """
    Fetch multiple rows

    Args:
        conn: Connection object
        query: SQL query
        size: Number of rows
        params: Query parameters

    Returns:
        List of row tuples
    """
    try:
        cursor = conn.cursor()
        cursor.execute(query, params)
        return cursor.fetchmany(size)
    except Exception as e:
        print(f"Fetch error: {e}")
        return []

# 2. INSERT Operations
def insert_single(conn: sqlite3.Connection, table: str, data: Dict[str, Any]) -> bool:
    """
    Insert single row

    Args:
        conn: Connection object
        table: Table name
        data: Column-value pairs

    Returns:
        True if successful
    """
    try:
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['?'] * len(data))
        query = f'INSERT INTO {table} ({columns}) VALUES ({placeholders})'
        return execute_query(conn, query, tuple(data.values()))
    except Exception as e:
        print(f"Insert error: {e}")
        return False

def insert_many(conn: sqlite3.Connection, table: str, columns: List[str], rows: List[Tuple]) -> int:
    """
    Insert multiple rows

    Args:
        conn: Connection object
        table: Table name
        columns: Column names
        rows: List of value tuples

    Returns:
        Number of rows inserted
    """
    try:
        cursor = conn.cursor()
        placeholders = ', '.join(['?'] * len(columns))
        query = f'INSERT INTO {table} ({", ".join(columns)}) VALUES ({placeholders})'
        cursor.executemany(query, rows)
        conn.commit()
        return cursor.rowcount
    except Exception as e:
        print(f"Batch insert error: {e}")
        conn.rollback()
        return 0

def insert_or_replace(conn: sqlite3.Connection, table: str, data: Dict[str, Any]) -> bool:
    """
    Insert or replace on conflict

    Args:
        conn: Connection object
        table: Table name
        data: Column-value pairs

    Returns:
        True if successful
    """
    try:
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['?'] * len(data))
        query = f'INSERT OR REPLACE INTO {table} ({columns}) VALUES ({placeholders})'
        return execute_query(conn, query, tuple(data.values()))
    except Exception as e:
        print(f"Insert or replace error: {e}")
        return False

def insert_or_ignore(conn: sqlite3.Connection, table: str, data: Dict[str, Any]) -> bool:
    """
    Insert or ignore on conflict

    Args:
        conn: Connection object
        table: Table name
        data: Column-value pairs

    Returns:
        True if successful
    """
    try:
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['?'] * len(data))
        query = f'INSERT OR IGNORE INTO {table} ({columns}) VALUES ({placeholders})'
        return execute_query(conn, query, tuple(data.values()))
    except Exception as e:
        print(f"Insert or ignore error: {e}")
        return False

def get_last_insert_id(conn: sqlite3.Connection) -> int:
    """
    Get last insert row ID

    Args:
        conn: Connection object

    Returns:
        Last insert ID
    """
    cursor = conn.cursor()
    cursor.execute('SELECT last_insert_rowid()')
    return cursor.fetchone()[0]

# 3. SELECT Operations
def select_all(conn: sqlite3.Connection, table: str, where: str = '', params: Tuple = ()) -> List[Tuple]:
    """
    Select all rows

    Args:
        conn: Connection object
        table: Table name
        where: WHERE clause
        params: Query parameters

    Returns:
        List of rows
    """
    query = f'SELECT * FROM {table}'
    if where:
        query += f' WHERE {where}'
    return fetch_all(conn, query, params)

def select_by_id(conn: sqlite3.Connection, table: str, row_id: int) -> Optional[Tuple]:
    """
    Select row by ID

    Args:
        conn: Connection object
        table: Table name
        row_id: Row ID

    Returns:
        Row tuple or None
    """
    return fetch_one(conn, f'SELECT * FROM {table} WHERE id = ?', (row_id,))

def select_columns(conn: sqlite3.Connection, table: str, columns: List[str], where: str = '', params: Tuple = ()) -> List[Tuple]:
    """
    Select specific columns

    Args:
        conn: Connection object
        table: Table name
        columns: Column names
        where: WHERE clause
        params: Query parameters

    Returns:
        List of rows
    """
    query = f'SELECT {", ".join(columns)} FROM {table}'
    if where:
        query += f' WHERE {where}'
    return fetch_all(conn, query, params)

def select_with_join(conn: sqlite3.Connection, table1: str, table2: str, on: str, columns: str = '*') -> List[Tuple]:
    """
    Select with JOIN

    Args:
        conn: Connection object
        table1: First table
        table2: Second table
        on: JOIN condition
        columns: Columns to select

    Returns:
        List of rows
    """
    query = f'SELECT {columns} FROM {table1} JOIN {table2} ON {on}'
    return fetch_all(conn, query)

def select_distinct(conn: sqlite3.Connection, table: str, column: str) -> List[Any]:
    """
    Select distinct values

    Args:
        conn: Connection object
        table: Table name
        column: Column name

    Returns:
        List of distinct values
    """
    query = f'SELECT DISTINCT {column} FROM {table}'
    rows = fetch_all(conn, query)
    return [row[0] for row in rows]

# 4. UPDATE Operations
def update_by_id(conn: sqlite3.Connection, table: str, row_id: int, data: Dict[str, Any]) -> bool:
    """
    Update row by ID

    Args:
        conn: Connection object
        table: Table name
        row_id: Row ID
        data: Column-value pairs

    Returns:
        True if successful
    """
    try:
        set_clause = ', '.join([f'{k} = ?' for k in data.keys()])
        query = f'UPDATE {table} SET {set_clause} WHERE id = ?'
        params = tuple(data.values()) + (row_id,)
        return execute_query(conn, query, params)
    except Exception as e:
        print(f"Update error: {e}")
        return False

def update_where(conn: sqlite3.Connection, table: str, data: Dict[str, Any], where: str, params: Tuple = ()) -> int:
    """
    Update rows matching condition

    Args:
        conn: Connection object
        table: Table name
        data: Column-value pairs
        where: WHERE clause
        params: Additional parameters

    Returns:
        Number of rows updated
    """
    try:
        set_clause = ', '.join([f'{k} = ?' for k in data.keys()])
        query = f'UPDATE {table} SET {set_clause} WHERE {where}'
        cursor = conn.cursor()
        cursor.execute(query, tuple(data.values()) + params)
        conn.commit()
        return cursor.rowcount
    except Exception as e:
        print(f"Update where error: {e}")
        conn.rollback()
        return 0

def increment_column(conn: sqlite3.Connection, table: str, column: str, where: str = '', params: Tuple = ()) -> bool:
    """
    Increment column value

    Args:
        conn: Connection object
        table: Table name
        column: Column to increment
        where: WHERE clause
        params: Query parameters

    Returns:
        True if successful
    """
    query = f'UPDATE {table} SET {column} = {column} + 1'
    if where:
        query += f' WHERE {where}'
    return execute_query(conn, query, params)

# 5. DELETE Operations
def delete_by_id(conn: sqlite3.Connection, table: str, row_id: int) -> bool:
    """
    Delete row by ID

    Args:
        conn: Connection object
        table: Table name
        row_id: Row ID

    Returns:
        True if successful
    """
    return execute_query(conn, f'DELETE FROM {table} WHERE id = ?', (row_id,))

def delete_where(conn: sqlite3.Connection, table: str, where: str, params: Tuple = ()) -> int:
    """
    Delete rows matching condition

    Args:
        conn: Connection object
        table: Table name
        where: WHERE clause
        params: Query parameters

    Returns:
        Number of rows deleted
    """
    try:
        query = f'DELETE FROM {table} WHERE {where}'
        cursor = conn.cursor()
        cursor.execute(query, params)
        conn.commit()
        return cursor.rowcount
    except Exception as e:
        print(f"Delete where error: {e}")
        conn.rollback()
        return 0

def delete_all(conn: sqlite3.Connection, table: str) -> bool:
    """
    Delete all rows from table

    Args:
        conn: Connection object
        table: Table name

    Returns:
        True if successful
    """
    return execute_query(conn, f'DELETE FROM {table}')

# 6. Aggregation Queries
def count_rows(conn: sqlite3.Connection, table: str, where: str = '', params: Tuple = ()) -> int:
    """
    Count rows

    Args:
        conn: Connection object
        table: Table name
        where: WHERE clause
        params: Query parameters

    Returns:
        Row count
    """
    query = f'SELECT COUNT(*) FROM {table}'
    if where:
        query += f' WHERE {where}'
    result = fetch_one(conn, query, params)
    return result[0] if result else 0

def sum_column(conn: sqlite3.Connection, table: str, column: str, where: str = '') -> float:
    """
    Sum column values

    Args:
        conn: Connection object
        table: Table name
        column: Column name
        where: WHERE clause

    Returns:
        Sum value
    """
    query = f'SELECT SUM({column}) FROM {table}'
    if where:
        query += f' WHERE {where}'
    result = fetch_one(conn, query)
    return result[0] if result and result[0] else 0

def avg_column(conn: sqlite3.Connection, table: str, column: str, where: str = '') -> float:
    """
    Average column values

    Args:
        conn: Connection object
        table: Table name
        column: Column name
        where: WHERE clause

    Returns:
        Average value
    """
    query = f'SELECT AVG({column}) FROM {table}'
    if where:
        query += f' WHERE {where}'
    result = fetch_one(conn, query)
    return result[0] if result and result[0] else 0

def min_max_column(conn: sqlite3.Connection, table: str, column: str, where: str = '') -> Tuple:
    """
    Get min and max of column

    Args:
        conn: Connection object
        table: Table name
        column: Column name
        where: WHERE clause

    Returns:
        (min, max) tuple
    """
    query = f'SELECT MIN({column}), MAX({column}) FROM {table}'
    if where:
        query += f' WHERE {where}'
    result = fetch_one(conn, query)
    return result if result else (None, None)

def group_by(conn: sqlite3.Connection, table: str, group_column: str, agg_column: str, operation: str = 'COUNT') -> List[Tuple]:
    """
    Group by column

    Args:
        conn: Connection object
        table: Table name
        group_column: Column to group by
        agg_column: Column to aggregate
        operation: Aggregation operation

    Returns:
        List of (group, aggregated_value) tuples
    """
    query = f'SELECT {group_column}, {operation}({agg_column}) FROM {table} GROUP BY {group_column}'
    return fetch_all(conn, query)

# 7. Advanced Queries
def select_with_limit(conn: sqlite3.Connection, table: str, limit: int, offset: int = 0) -> List[Tuple]:
    """
    Select with LIMIT and OFFSET

    Args:
        conn: Connection object
        table: Table name
        limit: Max rows
        offset: Row offset

    Returns:
        List of rows
    """
    query = f'SELECT * FROM {table} LIMIT {limit} OFFSET {offset}'
    return fetch_all(conn, query)

def select_with_order(conn: sqlite3.Connection, table: str, order_column: str, ascending: bool = True, limit: int = 0) -> List[Tuple]:
    """
    Select with ORDER BY

    Args:
        conn: Connection object
        table: Table name
        order_column: Column to order by
        ascending: Sort direction
        limit: Max rows

    Returns:
        List of rows
    """
    direction = 'ASC' if ascending else 'DESC'
    query = f'SELECT * FROM {table} ORDER BY {order_column} {direction}'
    if limit > 0:
        query += f' LIMIT {limit}'
    return fetch_all(conn, query)

def search_like(conn: sqlite3.Connection, table: str, column: str, pattern: str) -> List[Tuple]:
    """
    Search with LIKE

    Args:
        conn: Connection object
        table: Table name
        column: Column to search
        pattern: Search pattern

    Returns:
        List of matching rows
    """
    query = f'SELECT * FROM {table} WHERE {column} LIKE ?'
    return fetch_all(conn, query, (pattern,))

def select_between(conn: sqlite3.Connection, table: str, column: str, start: Any, end: Any) -> List[Tuple]:
    """
    Select with BETWEEN

    Args:
        conn: Connection object
        table: Table name
        column: Column name
        start: Start value
        end: End value

    Returns:
        List of rows
    """
    query = f'SELECT * FROM {table} WHERE {column} BETWEEN ? AND ?'
    return fetch_all(conn, query, (start, end))

# 8. Query Result Processing
def rows_to_dicts(rows: List[Tuple], columns: List[str]) -> List[Dict[str, Any]]:
    """
    Convert rows to list of dictionaries

    Args:
        rows: List of row tuples
        columns: Column names

    Returns:
        List of dictionaries
    """
    return [dict(zip(columns, row)) for row in rows]

def get_row_count(conn: sqlite3.Connection, table: str) -> int:
    """Get total row count"""
    return count_rows(conn, table)

def table_exists_with_data(conn: sqlite3.Connection, table: str) -> bool:
    """Check if table has data"""
    return count_rows(conn, table) > 0

# Usage Examples
def demonstrate_sqlite_query():
    print("=== Web Python SQL Query Examples ===\n")

    # Setup database
    conn = sqlite3.connect(':memory:')
    conn.execute('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT,
            age INTEGER,
            salary REAL
        )
    ''')
    conn.commit()

    # 1. INSERT operations
    print("--- 1. INSERT Operations ---")
    insert_single(conn, 'users', {'name': 'Alice', 'email': '[email protected]', 'age': 30, 'salary': 50000})
    insert_single(conn, 'users', {'name': 'Bob', 'email': '[email protected]', 'age': 25, 'salary': 45000})
    insert_many(conn, 'users', ['name', 'email', 'age', 'salary'], [
        ('Charlie', '[email protected]', 35, 60000),
        ('David', '[email protected]', 28, 55000),
        ('Eve', '[email protected]', 32, 58000)
    ])
    print(f"Rows inserted: {count_rows(conn, 'users')}")
    print(f"Last insert ID: {get_last_insert_id(conn)}")

    # 2. SELECT operations
    print("\n--- 2. SELECT Operations ---")
    print(f"All users: {fetch_all(conn, 'SELECT * FROM users')}")
    print(f"User by ID 1: {fetch_one(conn, 'SELECT * FROM users WHERE id = ?', (1,))}")
    print(f"Select columns: {select_columns(conn, 'users', ['name', 'email'])}")

    # 3. UPDATE operations
    print("\n--- 3. UPDATE Operations ---")
    update_by_id(conn, 'users', 1, {'salary': 55000})
    increment_column(conn, 'users', 'age', 'name = ?', ('Alice',))
    print(f"After update: {fetch_one(conn, 'SELECT * FROM users WHERE id = 1')}")

    # 4. DELETE operations
    print("\n--- 4. DELETE Operations ---")
    delete_by_id(conn, 'users', 5)
    print(f"After delete ID 5: {count_rows(conn, 'users')}")

    # 5. Aggregation
    print("\n--- 5. Aggregation Queries ---")
    print(f"Count: {count_rows(conn, 'users')}")
    print(f"Average salary: {avg_column(conn, 'users', 'salary'):.2f}")
    print(f"Total salary: {sum_column(conn, 'users', 'salary'):.2f}")
    print(f"Min/max age: {min_max_column(conn, 'users', 'age')}")
    print(f"Group by age range: {group_by(conn, 'users', 'age', 'salary', 'AVG')}")

    # 6. Advanced queries
    print("\n--- 6. Advanced Queries ---")
    print(f"Limit 2: {select_with_limit(conn, 'users', 2)}")
    print(f"Order by salary: {select_with_order(conn, 'users', 'salary', ascending=False)}")
    print(f"Like search: {search_like(conn, 'users', 'name', '%a%')}")

    # 7. BETWEEN
    print("\n--- 7. BETWEEN Query ---")
    print(f"Age between 25 and 32: {select_between(conn, 'users', 'age', 25, 32)}")

    conn.close()
    print("\n=== All SQL Query Examples Completed ===")

# Export functions
export { execute_query, fetch_one, fetch_all, fetch_many }
export { insert_single, insert_many, insert_or_replace, insert_or_ignore, get_last_insert_id }
export { select_all, select_by_id, select_columns, select_with_join, select_distinct }
export { update_by_id, update_where, increment_column }
export { delete_by_id, delete_where, delete_all }
export { count_rows, sum_column, avg_column, min_max_column, group_by }
export { select_with_limit, select_with_order, search_like, select_between }
export { rows_to_dicts, get_row_count, table_exists_with_data }
export { demonstrate_sqlite_query }

💻 事务处理 python

🟡 intermediate ⭐⭐⭐

使用提交、回滚和保存点操作处理数据库事务

⏱️ 35 min 🏷️ python, web, database, transaction
Prerequisites: Intermediate Python, Database concepts, ACID properties
# Web Python Transaction Examples
# Managing database transactions with ACID properties

# 1. Basic Transaction Control
import sqlite3
from typing import Optional, List, Callable, Any
from contextlib import contextmanager

def begin_transaction(conn: sqlite3.Connection) -> None:
    """
    Begin explicit transaction

    Args:
        conn: Connection object
    """
    conn.execute('BEGIN TRANSACTION')

def commit_transaction(conn: sqlite3.Connection) -> bool:
    """
    Commit transaction

    Args:
        conn: Connection object

    Returns:
        True if successful
    """
    try:
        conn.commit()
        return True
    except Exception as e:
        print(f"Commit error: {e}")
        return False

def rollback_transaction(conn: sqlite3.Connection) -> bool:
    """
    Rollback transaction

    Args:
        conn: Connection object

    Returns:
        True if successful
    """
    try:
        conn.rollback()
        return True
    except Exception as e:
        print(f"Rollback error: {e}")
        return False

def get_transaction_status(conn: sqlite3.Connection) -> str:
    """
    Get transaction status

    Args:
        conn: Connection object

    Returns:
        Transaction status string
    """
    # Check if in transaction
    cursor = conn.cursor()
    cursor.execute('BEGIN')
    try:
        cursor.execute('COMMIT')
        return 'no transaction'
    except:
        return 'in transaction'

# 2. Context Manager Transactions
@contextmanager
def transaction(conn: sqlite3.Connection):
    """
    Context manager for transactions

    Args:
        conn: Connection object

    Yields:
        Connection object

    Example:
        with transaction(conn):
            # operations
            pass  # auto commit on success, rollback on error
    """
    try:
        begin_transaction(conn)
        yield conn
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e

def execute_in_transaction(conn: sqlite3.Connection, operations: List[Callable]) -> bool:
    """
    Execute multiple operations in transaction

    Args:
        conn: Connection object
        operations: List of callable operations

    Returns:
        True if all successful
    """
    try:
        with transaction(conn):
            for operation in operations:
                operation(conn)
        return True
    except Exception as e:
        print(f"Transaction failed: {e}")
        return False

# 3. Savepoint Operations
def create_savepoint(conn: sqlite3.Connection, name: str) -> None:
    """
    Create savepoint

    Args:
        conn: Connection object
        name: Savepoint name
    """
    conn.execute(f'SAVEPOINT {name}')

def release_savepoint(conn: sqlite3.Connection, name: str) -> bool:
    """
    Release savepoint

    Args:
        conn: Connection object
        name: Savepoint name

    Returns:
        True if successful
    """
    try:
        conn.execute(f'RELEASE SAVEPOINT {name}')
        return True
    except Exception as e:
        print(f"Release savepoint error: {e}")
        return False

def rollback_to_savepoint(conn: sqlite3.Connection, name: str) -> bool:
    """
    Rollback to savepoint

    Args:
        conn: Connection object
        name: Savepoint name

    Returns:
        True if successful
    """
    try:
        conn.execute(f'ROLLBACK TO SAVEPOINT {name}')
        return True
    except Exception as e:
        print(f"Rollback to savepoint error: {e}")
        return False

@contextmanager
def savepoint(conn: sqlite3.Connection, name: str):
    """
    Context manager for savepoint

    Args:
        conn: Connection object
        name: Savepoint name

    Yields:
        Connection object
    """
    try:
        create_savepoint(conn, name)
        yield conn
        release_savepoint(conn, name)
    except Exception as e:
        rollback_to_savepoint(conn, name)
        raise e

# 4. Isolation Levels
def set_isolation_level(conn: sqlite3.Connection, level: str) -> None:
    """
    Set isolation level

    Args:
        conn: Connection object
        level: 'DEFERRED', 'IMMEDIATE', 'EXCLUSIVE', or None (autocommit)
    """
    conn.isolation_level = level

def get_isolation_level(conn: sqlite3.Connection) -> str:
    """
    Get current isolation level

    Args:
        conn: Connection object

    Returns:
        Isolation level string
    """
    return str(conn.isolation_level)

def deferred_transaction(conn: sqlite3.Connection) -> None:
    """Begin deferred transaction (default)"""
    conn.execute('BEGIN DEFERRED')

def immediate_transaction(conn: sqlite3.Connection) -> None:
    """Begin immediate transaction (reserves reserved locks)"""
    conn.execute('BEGIN IMMEDIATE')

def exclusive_transaction(conn: sqlite3.Connection) -> None:
    """Begin exclusive transaction (reserves all locks)"""
    conn.execute('BEGIN EXCLUSIVE')

# 5. Transaction Patterns
def transfer_funds(conn: sqlite3.Connection, from_account: int, to_account: int, amount: float) -> bool:
    """
    Transfer funds between accounts (atomic operation)

    Args:
        conn: Connection object
        from_account: Source account ID
        to_account: Destination account ID
        amount: Transfer amount

    Returns:
        True if successful
    """
    try:
        with transaction(conn):
            # Deduct from source
            conn.execute(
                'UPDATE accounts SET balance = balance - ? WHERE id = ?',
                (amount, from_account)
            )

            # Add to destination
            conn.execute(
                'UPDATE accounts SET balance = balance + ? WHERE id = ?',
                (amount, to_account)
            )

            # Verify sufficient funds
            cursor = conn.execute(
                'SELECT balance FROM accounts WHERE id = ?',
                (from_account,)
            )
            balance = cursor.fetchone()[0]

            if balance < 0:
                raise Exception("Insufficient funds")

        return True
    except Exception as e:
        print(f"Transfer failed: {e}")
        return False

def batch_insert(conn: sqlite3.Connection, table: str, rows: List[tuple], batch_size: int = 1000) -> int:
    """
    Batch insert with transaction

    Args:
        conn: Connection object
        table: Table name
        rows: List of row tuples
        batch_size: Rows per transaction

    Returns:
        Total rows inserted
    """
    total_inserted = 0

    for i in range(0, len(rows), batch_size):
        batch = rows[i:i + batch_size]

        try:
            with transaction(conn):
                cursor = conn.cursor()
                cursor.executemany(f'INSERT INTO {table} VALUES ({", ".join(["?"]*len(batch[0]))})', batch)
                total_inserted += cursor.rowcount
        except Exception as e:
            print(f"Batch insert failed at {i}: {e}")
            break

    return total_inserted

def upsert_record(conn: sqlite3.Connection, table: str, data: dict, key_column: str) -> bool:
    """
    Insert or update record

    Args:
        conn: Connection object
        table: Table name
        data: Record data
        key_column: Column to check for existence

    Returns:
        True if successful
    """
    try:
        with transaction(conn):
            # Check if exists
            cursor = conn.execute(f'SELECT id FROM {table} WHERE {key_column} = ?', (data[key_column],))

            if cursor.fetchone():
                # Update
                set_clause = ', '.join([f'{k} = ?' for k in data.keys()])
                conn.execute(f'UPDATE {table} SET {set_clause} WHERE {key_column} = ?', tuple(data.values()) + (data[key_column],))
            else:
                # Insert
                columns = ', '.join(data.keys())
                placeholders = ', '.join(['?'] * len(data))
                conn.execute(f'INSERT INTO {table} ({columns}) VALUES ({placeholders})', tuple(data.values()))

        return True
    except Exception as e:
        print(f"Upsert failed: {e}")
        return False

# 6. Nested Transactions
def nested_transaction_example(conn: sqlite3.Connection) -> bool:
    """
    Example of nested transactions with savepoints

    Args:
        conn: Connection object

    Returns:
        True if successful
    """
    try:
        with transaction(conn):
            # Outer transaction operations
            conn.execute('INSERT INTO users (name) VALUES (?)', ('Alice',))

            # Nested savepoint
            with savepoint(conn, 'nested'):
                conn.execute('INSERT INTO orders (user_id) VALUES (?)', (1,))
                # Can rollback to this savepoint if needed

            return True
    except Exception as e:
        print(f"Nested transaction failed: {e}")
        return False

# 7. Transaction Logging
class TransactionLogger:
    """Log all transaction operations"""

    def __init__(self, conn: sqlite3.Connection):
        self.conn = conn
        self.log: List[str] = []

    def execute(self, query: str, params: tuple = ()):
        """Execute with logging"""
        self.log.append(f"EXECUTE: {query} with {params}")
        return self.conn.execute(query, params)

    def commit(self):
        """Commit with logging"""
        self.log.append("COMMIT")
        return self.conn.commit()

    def rollback(self):
        """Rollback with logging"""
        self.log.append("ROLLBACK")
        return self.conn.rollback()

    def get_log(self) -> List[str]:
        """Get operation log"""
        return self.log

# 8. Atomic Operations
def atomic_operation(conn: sqlite3.Connection, operation: Callable, *args, **kwargs) -> Any:
    """
    Execute operation atomically

    Args:
        conn: Connection object
        operation: Callable to execute
        *args: Operation arguments
        **kwargs: Operation keyword arguments

    Returns:
        Operation result or None on failure
    """
    try:
        with transaction(conn):
            return operation(conn, *args, **kwargs)
    except Exception as e:
        print(f"Atomic operation failed: {e}")
        return None

def compare_and_update(conn: sqlite3.Connection, table: str, id: int, column: str, old_value: Any, new_value: Any) -> bool:
    """
    Atomic compare-and-update

    Args:
        conn: Connection object
        table: Table name
        id: Row ID
        column: Column name
        old_value: Expected current value
        new_value: New value

    Returns:
        True if updated (value matched)
    """
    try:
        with transaction(conn):
            cursor = conn.execute(f'SELECT {column} FROM {table} WHERE id = ?', (id,))
            current = cursor.fetchone()[0]

            if current == old_value:
                conn.execute(f'UPDATE {table} SET {column} = ? WHERE id = ?', (new_value, id))
                return True

            return False
    except Exception as e:
        print(f"Compare and update failed: {e}")
        return False

# 9. Transaction State Management
def in_transaction(conn: sqlite3.Connection) -> bool:
    """
    Check if connection is in transaction

    Args:
        conn: Connection object

    Returns:
        True if in transaction
    """
    return conn.in_transaction

def get_changes_count(conn: sqlite3.Connection) -> int:
    """
    Get number of changes in transaction

    Args:
        conn: Connection object

    Returns:
        Number of changes
    """
    return conn.total_changes

def reset_transaction(conn: sqlite3.Connection) -> None:
    """
    Reset transaction state

    Args:
        conn: Connection object
    """
    conn.rollback()

# Usage Examples
def demonstrate_transaction():
    print("=== Web Python Transaction Examples ===\n")

    # Setup database
    conn = sqlite3.connect(':memory:')
    conn.execute('''
        CREATE TABLE accounts (
            id INTEGER PRIMARY KEY,
            name TEXT,
            balance REAL
        )
    ''')
    conn.execute('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            name TEXT
        )
    ''')
    conn.execute('''
        CREATE TABLE orders (
            id INTEGER PRIMARY KEY,
            user_id INTEGER
        )
    ''')
    conn.commit()

    # Insert initial data
    conn.execute('INSERT INTO accounts VALUES (1, "Alice", 1000)')
    conn.execute('INSERT INTO accounts VALUES (2, "Bob", 500)')
    conn.commit()

    # 1. Basic transaction
    print("--- 1. Basic Transaction ---")
    try:
        with transaction(conn):
            conn.execute('UPDATE accounts SET balance = balance + 100 WHERE id = 1')
            print("In transaction: updated, not committed yet")
            # Auto commit on success
    except Exception as e:
        print(f"Transaction failed: {e}")
    print(f"After transaction: {fetch_all(conn, 'SELECT * FROM accounts')}")

    # 2. Rollback example
    print("\n--- 2. Rollback Example ---")
    try:
        with transaction(conn):
            conn.execute('UPDATE accounts SET balance = balance - 1000 WHERE id = 1')
            raise Exception("Intentional error")
    except:
        pass
    print(f"After rollback: {fetch_all(conn, 'SELECT * FROM accounts')}")

    # 3. Savepoints
    print("\n--- 3. Savepoints ---")
    try:
        with transaction(conn):
            conn.execute('UPDATE accounts SET balance = balance + 50 WHERE id = 1')

            with savepoint(conn, 'sp1'):
                conn.execute('UPDATE accounts SET balance = balance + 50 WHERE id = 2')
                print("After nested update")

            # Outer transaction continues
    except Exception as e:
        print(f"Error: {e}")
    print(f"After savepoint: {fetch_all(conn, 'SELECT * FROM accounts')}")

    # 4. Transfer funds
    print("\n--- 4. Transfer Funds (Atomic) ---")
    print(f"Before transfer: {fetch_all(conn, 'SELECT * FROM accounts')}")
    success = transfer_funds(conn, 1, 2, 200)
    print(f"Transfer success: {success}")
    print(f"After transfer: {fetch_all(conn, 'SELECT * FROM accounts')}")

    # 5. Batch insert
    print("\n--- 5. Batch Insert ---")
    batch_data = [
        ('User1',),
        ('User2',),
        ('User3',),
    ]
    inserted = batch_insert(conn, 'users', batch_data, batch_size=2)
    print(f"Batch inserted {inserted} rows")
    print(f"Users: {fetch_all(conn, 'SELECT * FROM users')}")

    # 6. Upsert
    print("\n--- 6. Upsert ---")
    upsert_record(conn, 'users', {'id': 4, 'name': 'User4'}, 'id')
    upsert_record(conn, 'users', {'id': 1, 'name': 'UpdatedUser1'}, 'id')
    print(f"After upsert: {fetch_all(conn, 'SELECT * FROM users')}")

    # 7. Compare and update
    print("\n--- 7. Compare and Update ---")
    result = compare_and_update(conn, 'accounts', 1, 'balance', 1150, 1200)
    print(f"Compare and update success: {result}")
    print(f"Accounts: {fetch_all(conn, 'SELECT * FROM accounts')}")

    # 8. Transaction logging
    print("\n--- 8. Transaction Logging ---")
    logger = TransactionLogger(conn)
    try:
        with transaction(conn):
            logger.execute('UPDATE accounts SET balance = balance + 10 WHERE id = 1')
            logger.execute('UPDATE accounts SET balance = balance - 10 WHERE id = 2')
    except:
        pass
    print(f"Transaction log: {logger.get_log()}")

    conn.close()
    print("\n=== All Transaction Examples Completed ===")

def fetch_all(conn, query, params=()):
    """Helper function"""
    cursor = conn.cursor()
    cursor.execute(query, params)
    return cursor.fetchall()

# Export functions
export { begin_transaction, commit_transaction, rollback_transaction, get_transaction_status }
export { transaction, execute_in_transaction }
export { create_savepoint, release_savepoint, rollback_to_savepoint, savepoint }
export { set_isolation_level, get_isolation_level, deferred_transaction, immediate_transaction, exclusive_transaction }
export { transfer_funds, batch_insert, upsert_record, nested_transaction_example }
export { TransactionLogger }
export { atomic_operation, compare_and_update }
export { in_transaction, get_changes_count, reset_transaction }
export { demonstrate_transaction }