Bibliothèque Web3.py Python

Exemples Web3.py Python incluant l'interaction blockchain, contrats intelligents, gestion de portefeuilles, transactions et développement d'applications décentralisées

💻 Web3.py Bases - Guide de Démarrage python

🟢 simple ⭐⭐

Fondements complets de Web3.py incluant la configuration des fournisseurs, gestion des comptes, transactions et opérations blockchain de base

⏱️ 25 min 🏷️ web3py, python, ethereum, web3, basics
Prerequisites: Python 3.6+, pip, Basic understanding of Ethereum, API keys for providers
# Web3.py Basics - Complete Getting Started Guide
# Install: pip install web3

from web3 import Web3
from web3.middleware import geth_poa_middleware
import json
import time

# ===== PROVIDER EXAMPLES =====
print("=== PROVIDER EXAMPLES ===")

# 1. Connect to local Geth node
def connect_to_geth():
    """Connect to local Geth node"""
    w3 = Web3(Web3.HTTPProvider('http://127.0.0.1:8545'))
    if w3.is_connected():
        print("✅ Connected to local Geth node")
        return w3
    else:
        print("❌ Failed to connect to Geth")
        return None

# 2. Connect to Infura
def connect_to_infura():
    """Connect to Infura endpoint"""
    infura_url = "https://mainnet.infura.io/v3/YOUR_PROJECT_ID"
    w3 = Web3(Web3.HTTPProvider(infura_url))

    if w3.is_connected():
        print("✅ Connected to Infura")
        return w3
    else:
        print("❌ Failed to connect to Infura")
        return None

# 3. Connect to WebSocket provider
def connect_to_websocket():
    """Connect via WebSocket for real-time updates"""
    w3 = Web3(Web3.WebsocketProvider('wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID'))

    if w3.is_connected():
        print("✅ Connected to WebSocket provider")
        return w3
    else:
        print("❌ Failed to connect to WebSocket")
        return None

# ===== WALLET AND ACCOUNT MANAGEMENT =====
print("\n=== WALLET AND ACCOUNT MANAGEMENT ===")

def account_operations():
    """Demonstrate account creation and management"""

    # Create new account
    account = Web3().eth.account.create()
    print("🔑 New Account Created:")
    print(f"   Address: {account.address}")
    print(f"   Private Key: {account.key.hex()}")

    # Create account from private key
    private_key = "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
    account_from_key = Web3().eth.account.from_key(private_key)
    print("\n🔐 Account from Private Key:")
    print(f"   Address: {account_from_key.address}")

    # Generate mnemonic
    mnemonic, seed_phrase = Web3().eth.account.create_with_mnemonic()
    print("\n📝 Account with Mnemonic:")
    print(f"   Address: {mnemonic.address}")
    print(f"   Mnemonic: {seed_phrase}")

    return account, account_from_key

def wallet_operations(w3):
    """Demonstrate wallet operations"""

    if not w3:
        print("❌ No Web3 connection available")
        return

    # Get account balance
    address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"  # Vitalik's address
    balance = w3.eth.get_balance(address)

    print("\n💰 Balance Information:")
    print(f"   Address: {address}")
    print(f"   Balance (Wei): {balance}")
    print(f"   Balance (ETH): {w3.from_wei(balance, 'ether')}")

    # Get transaction count
    nonce = w3.eth.get_transaction_count(address)
    print(f"   Transaction Count: {nonce}")

    # Get account code (for contracts)
    code = w3.eth.get_code(address)
    print(f"   Code Length: {len(code)} bytes")

    return balance, nonce

# ===== TRANSACTION EXAMPLES =====
print("\n=== TRANSACTION EXAMPLES =====")

def basic_transaction(w3):
    """Send basic ETH transaction"""

    if not w3:
        print("❌ No Web3 connection available")
        return

    # Account setup (for demo - use test account)
    private_key = "YOUR_PRIVATE_KEY_HERE"
    sender_address = w3.eth.account.from_key(private_key).address
    recipient_address = "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"

    try:
        # Get nonce
        nonce = w3.eth.get_transaction_count(sender_address)

        # Transaction details
        tx = {
            'nonce': nonce,
            'to': recipient_address,
            'value': w3.to_wei(0.01, 'ether'),  # 0.01 ETH
            'gas': 21000,  # Standard ETH transfer
            'gasPrice': w3.eth.gas_price,
            'chainId': w3.eth.chain_id
        }

        print("💸 Transaction Details:")
        print(f"   From: {sender_address}")
        print(f"   To: {recipient_address}")
        print(f"   Value: {w3.from_wei(tx['value'], 'ether')} ETH")
        print(f"   Gas: {tx['gas']}")
        print(f"   Gas Price: {w3.from_wei(tx['gasPrice'], 'gwei')} gwei")

        # Sign transaction
        signed_tx = w3.eth.account.sign_transaction(tx, private_key)

        # Send transaction (commented out for safety)
        print("\n⏳ Ready to send (transaction commented out for safety):")
        print(f"   Transaction Hash: {signed_tx.hash.hex()}")

        # Uncomment to actually send:
        # tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
        # receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
        # print(f"   Transaction confirmed in block: {receipt.blockNumber}")

        return tx, signed_tx

    except Exception as e:
        print(f"❌ Transaction error: {str(e)}")
        return None, None

def estimate_gas_example(w3):
    """Estimate gas for transactions"""

    if not w3:
        print("❌ No Web3 connection available")
        return

    try:
        # Estimate gas for ETH transfer
        gas_estimate = w3.eth.estimate_gas({
            'to': "0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
            'value': w3.to_wei(1, 'ether'),
        })

        print(f"⛽ Gas estimate for 1 ETH transfer: {gas_estimate}")

        # Get current gas price
        gas_price = w3.eth.gas_price
        print(f"   Current gas price: {w3.from_wei(gas_price, 'gwei')} gwei")

        # Calculate total cost
        gas_cost = gas_estimate * gas_price
        print(f"   Total gas cost: {w3.from_wei(gas_cost, 'ether')} ETH")

        return gas_estimate, gas_price

    except Exception as e:
        print(f"❌ Gas estimation error: {str(e)}")
        return None, None

# ===== SMART CONTRACT INTERACTION =====
print("\n=== SMART CONTRACT INTERACTION ===")

# ERC20 Token ABI (simplified)
erc20_abi = [
    {
        "constant": True,
        "inputs": [],
        "name": "name",
        "outputs": [{"name": "", "type": "string"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": True,
        "inputs": [],
        "name": "symbol",
        "outputs": [{"name": "", "type": "string"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": True,
        "inputs": [],
        "name": "decimals",
        "outputs": [{"name": "", "type": "uint8"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": True,
        "inputs": [],
        "name": "totalSupply",
        "outputs": [{"name": "", "type": "uint256"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": True,
        "inputs": [{"name": "_owner", "type": "address"}],
        "name": "balanceOf",
        "outputs": [{"name": "balance", "type": "uint256"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": False,
        "inputs": [
            {"name": "_to", "type": "address"},
            {"name": "_value", "type": "uint256"}
        ],
        "name": "transfer",
        "outputs": [{"name": "", "type": "bool"}],
        "payable": False,
        "stateMutability": "nonpayable",
        "type": "function"
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True, "name": "from", "type": "address"},
            {"indexed": True, "name": "to", "type": "address"},
            {"indexed": False, "name": "value", "type": "uint256"}
        ],
        "name": "Transfer",
        "type": "event"
    }
]

def contract_interaction(w3):
    """Demonstrate smart contract interaction"""

    if not w3:
        print("❌ No Web3 connection available")
        return

    # WETH contract address on mainnet
    weth_address = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"

    try:
        # Create contract instance
        contract = w3.eth.contract(address=weth_address, abi=erc20_abi)

        print("📄 Contract Interaction:")
        print(f"   Contract Address: {contract.address}")

        # Read contract data (view functions)
        name = contract.functions.name().call()
        symbol = contract.functions.symbol().call()
        decimals = contract.functions.decimals().call()
        total_supply = contract.functions.totalSupply().call()

        print("\n📊 Contract Data:")
        print(f"   Name: {name}")
        print(f"   Symbol: {symbol}")
        print(f"   Decimals: {decimals}")
        print(f"   Total Supply: {w3.from_wei(total_supply, 'ether')} {symbol}")

        # Get balance of specific address
        address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
        balance = contract.functions.balanceOf(address).call()

        print("\n💼 Balance Information:")
        print(f"   Address: {address}")
        print(f"   Balance: {w3.from_wei(balance, 'ether')} {symbol}")

        return contract

    except Exception as e:
        print(f"❌ Contract interaction error: {str(e)}")
        return None

def write_to_contract(w3, contract):
    """Demonstrate writing to contract"""

    if not w3 or not contract:
        print("❌ No Web3 connection or contract available")
        return

    # Account setup
    private_key = "YOUR_PRIVATE_KEY_HERE"
    account = w3.eth.account.from_key(private_key)

    try:
        # Get nonce
        nonce = w3.eth.get_transaction_count(account.address)

        # Build transaction
        transaction = contract.functions.transfer(
            "0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
            w3.to_wei(0.1, 'ether')  # 0.1 WETH
        ).build_transaction({
            'gas': 200000,
            'gasPrice': w3.eth.gas_price,
            'nonce': nonce,
            'chainId': w3.eth.chain_id
        })

        # Sign transaction
        signed_txn = w3.eth.account.sign_transaction(transaction, private_key)

        print("\n✍️  Contract Write Transaction:")
        print(f"   From: {account.address}")
        print(f"   Function: transfer")
        print(f"   Value: 0.1 WETH")
        print(f"   Gas Limit: {transaction['gas']}")
        print(f"   Transaction Hash: {signed_txn.hash.hex()}")

        # Send transaction (commented out for safety)
        print("\n⏳ Ready to send (transaction commented out for safety)")

        # Uncomment to actually send:
        # tx_hash = w3.eth.send_raw_transaction(signed_txn.rawTransaction)
        # receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
        # print(f"   Transaction confirmed in block: {receipt.blockNumber}")

        return signed_txn

    except Exception as e:
        print(f"❌ Contract write error: {str(e)}")
        return None

# ===== BLOCKCHAIN DATA OPERATIONS =====
print("\n=== BLOCKCHAIN DATA OPERATIONS =====")

def blockchain_info(w3):
    """Get blockchain information"""

    if not w3:
        print("❌ No Web3 connection available")
        return

    try:
        # Get network information
        chain_id = w3.eth.chain_id
        block_number = w3.eth.block_number
        gas_price = w3.eth.gas_price

        print("🌐 Blockchain Information:")
        print(f"   Chain ID: {chain_id}")
        print(f"   Latest Block: {block_number}")
        print(f"   Gas Price: {w3.from_wei(gas_price, 'gwei')} gwei")

        # Get latest block details
        block = w3.eth.get_block('latest')

        print("\n📦 Latest Block Details:")
        print(f"   Block Number: {block.number}")
        print(f"   Block Hash: {block.hash.hex()}")
        print(f"   Parent Hash: {block.parentHash.hex()}")
        print(f"   Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(block.timestamp))}")
        print(f"   Gas Limit: {block.gasLimit}")
        print(f"   Gas Used: {block.gasUsed}")
        print(f"   Transaction Count: {len(block.transactions)}")

        return block

    except Exception as e:
        print(f"❌ Blockchain info error: {str(e)}")
        return None

def transaction_details(w3, tx_hash):
    """Get transaction details"""

    if not w3:
        print("❌ No Web3 connection available")
        return

    try:
        # Get transaction
        tx = w3.eth.get_transaction(tx_hash)

        print(f"📄 Transaction Details: {tx_hash}")
        print(f"   From: {tx['from']}")
        print(f"   To: {tx.get('to', 'Contract Creation')}")
        print(f"   Value: {w3.from_wei(tx['value'], 'ether')} ETH")
        print(f"   Gas: {tx['gas']}")
        print(f"   Gas Price: {w3.from_wei(tx['gasPrice'], 'gwei')} gwei")
        print(f"   Nonce: {tx['nonce']}")
        print(f"   Block Number: {tx['blockNumber']}")

        # Get transaction receipt
        receipt = w3.eth.get_transaction_receipt(tx_hash)

        print("\n🧾 Transaction Receipt:")
        print(f"   Status: {'Success' if receipt['status'] == 1 else 'Failed'}")
        print(f"   Gas Used: {receipt['gasUsed']}")
        print(f"   Cumulative Gas Used: {receipt['cumulativeGasUsed']}")
        print(f"   Logs Count: {len(receipt['logs'])}")

        return tx, receipt

    except Exception as e:
        print(f"❌ Transaction details error: {str(e)}")
        return None, None

# ===== EVENT LISTENING =====
print("\n=== EVENT LISTENING =====")

def setup_event_listener(w3, contract):
    """Setup event listener for smart contract"""

    if not w3 or not contract:
        print("❌ No Web3 connection or contract available")
        return

    print("🎧 Setting up event listener...")

    # Create filter for Transfer events
    transfer_filter = contract.events.Transfer.create_filter(
        from_block='latest',
        argument_filters={'to': "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"}
    )

    print("📡 Listening for Transfer events to specific address...")
    print("   (This would run in a separate thread in production)")

    # Example of processing events
    def process_events():
        """Process new events"""
        try:
            events = w3.eth.get_logs(transfer_filter.filter_params)
            print(f"   Found {len(events)} matching events")

            for event in events:
                # Decode log
                decoded_event = contract.events.Transfer().process_log(event)
                print(f"   🎯 Transfer: {decoded_event['args']}")

        except Exception as e:
            print(f"   Event processing error: {str(e)}")

    return transfer_filter, process_events

# ===== UTILITY FUNCTIONS =====
print("\n=== UTILITY FUNCTIONS =====")

def utility_examples():
    """Demonstrate Web3 utility functions"""

    print("🔧 Web3 Utilities:")

    # Unit conversions
    wei_amount = 1000000000000000000  # 1 ETH in wei
    eth_amount = Web3.from_wei(wei_amount, 'ether')

    print(f"\n🔄 Unit Conversions:")
    print(f"   1 ETH = {wei_amount} Wei")
    print(f"   {wei_amount} Wei = {eth_amount} ETH")

    # Address operations
    address = "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"
    checksum_address = Web3.to_checksum_address(address)

    print(f"\n📍 Address Operations:")
    print(f"   Original: {address}")
    print(f"   Checksum: {checksum_address}")
    print(f"   Is Address: {Web3.is_address(address)}")

    # Hashing
    message = "Hello, Ethereum!"
    message_hash = Web3.keccak(text=message)

    print(f"\n🔐 Hashing:")
    print(f"   Message: {message}")
    print(f"   Keccak256 Hash: {message_hash.hex()}")

    # Encoding/Decoding
    text = "Hello, Web3!"
    hex_data = Web3.to_hex(text.encode('utf-8'))
    decoded_text = Web3.to_text(hex_data)

    print(f"\n📝 Encoding/Decoding:")
    print(f"   Text: {text}")
    print(f"   Hex: {hex_data}")
    print(f"   Decoded: {decoded_text}")

    return {
        'wei_amount': wei_amount,
        'eth_amount': eth_amount,
        'checksum_address': checksum_address,
        'message_hash': message_hash,
        'hex_data': hex_data
    }

# ===== MAIN EXECUTION =====
def main():
    """Main execution function"""
    print("🚀 WEB3.PY COMPLETE GUIDE")
    print("=" * 50)

    # Initialize Web3 connection
    w3 = connect_to_infura()

    if not w3:
        print("❌ Could not establish Web3 connection")
        return

    try:
        # Execute all examples
        account, account_from_key = account_operations()
        print("\n" + "-" * 50)

        balance, nonce = wallet_operations(w3)
        print("\n" + "-" * 50)

        tx, signed_tx = basic_transaction(w3)
        print("\n" + "-" * 50)

        gas_estimate, gas_price = estimate_gas_example(w3)
        print("\n" + "-" * 50)

        contract = contract_interaction(w3)
        print("\n" + "-" * 50)

        write_to_contract(w3, contract)
        print("\n" + "-" * 50)

        block = blockchain_info(w3)
        print("\n" + "-" * 50)

        # Transaction details example (using a real transaction hash)
        # transaction_details(w3, "0x...")

        setup_event_listener(w3, contract)
        print("\n" + "-" * 50)

        utility_examples()
        print("\n" + "-" * 50)

        print("\n✅ ALL EXAMPLES COMPLETED SUCCESSFULLY!")

        print("\n📚 Next Steps:")
        print("1. Replace 'YOUR_PROJECT_ID' with actual Infura project ID")
        print("2. Replace 'YOUR_PRIVATE_KEY_HERE' with your private key")
        print("3. Uncomment transaction examples to test with real ETH")
        print("4. Explore more complex contract interactions")
        print("5. Learn about async Web3.py for better performance")

    except Exception as e:
        print(f"\n❌ FATAL ERROR: {str(e)}")

if __name__ == "__main__":
    main()

# Export functions for individual testing
__all__ = [
    'connect_to_geth',
    'connect_to_infura',
    'connect_to_websocket',
    'account_operations',
    'wallet_operations',
    'basic_transaction',
    'estimate_gas_example',
    'contract_interaction',
    'write_to_contract',
    'blockchain_info',
    'transaction_details',
    'setup_event_listener',
    'utility_examples',
]

💻 Opérations de Contrats Asynchrones et Avancées Web3.py python

🟡 intermediate ⭐⭐⭐⭐

Patterns avancés Web3.py incluant les opérations asynchrones, les requêtes batch, le filtrage d'événements et l'interaction complexe avec les contrats

⏱️ 35 min 🏷️ web3py, async, contracts, advanced
Prerequisites: Web3.py basics, Python async/await, Understanding of DeFi protocols
# Web3.py Async and Advanced Contract Operations
# Install: pip install web3 aiohttp

import asyncio
import json
import time
from web3 import Web3
from web3.middleware import geth_poa_middleware
from web3.contract import Contract
from typing import List, Dict, Any, Optional

# ===== ASYNC WEB3 SETUP =====
print("=== ASYNC WEB3 SETUP ===")

class AsyncWeb3Manager:
    """Async Web3 manager for high-performance operations"""

    def __init__(self, provider_url: str):
        self.provider_url = provider_url
        self.w3 = None
        self._loop = None

    async def connect(self) -> bool:
        """Establish async connection"""
        try:
            # For async operations, we can use standard HTTP provider
            # Web3.py doesn't have native async support yet, but we can work around it
            self.w3 = Web3(Web3.HTTPProvider(self.provider_url))

            if self.w3.is_connected():
                print("✅ Async Web3 connection established")
                return True
            else:
                print("❌ Failed to establish connection")
                return False
        except Exception as e:
            print(f"❌ Connection error: {str(e)}")
            return False

    async def get_balance_batch(self, addresses: List[str]) -> Dict[str, int]:
        """Get balances for multiple addresses concurrently"""
        if not self.w3:
            return {}

        print(f"🔄 Getting balances for {len(addresses)} addresses...")

        # Create tasks for concurrent execution
        tasks = []
        for address in addresses:
            task = asyncio.create_task(self._get_balance_async(address))
            tasks.append(task)

        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Process results
        balances = {}
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"   ❌ Error getting balance for {addresses[i]}: {str(result)}")
            else:
                balances[addresses[i]] = result

        return balances

    async def _get_balance_async(self, address: str) -> int:
        """Async wrapper for get_balance"""
        # In a real async implementation, this would use async HTTP calls
        # For now, we simulate async behavior
        await asyncio.sleep(0.1)  # Simulate network delay
        return self.w3.eth.get_balance(address)

    async def get_transaction_receipt_batch(self, tx_hashes: List[str]) -> Dict[str, Any]:
        """Get transaction receipts for multiple transactions concurrently"""
        if not self.w3:
            return {}

        print(f"🔄 Getting receipts for {len(tx_hashes)} transactions...")

        tasks = []
        for tx_hash in tx_hashes:
            task = asyncio.create_task(self._get_receipt_async(tx_hash))
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        receipts = {}
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"   ❌ Error getting receipt for {tx_hashes[i]}: {str(result)}")
            else:
                receipts[tx_hashes[i]] = result

        return receipts

    async def _get_receipt_async(self, tx_hash: str) -> Any:
        """Async wrapper for get_transaction_receipt"""
        await asyncio.sleep(0.2)  # Simulate network delay
        return self.w3.eth.get_transaction_receipt(tx_hash)

# ===== BATCH OPERATIONS =====
print("\n=== BATCH OPERATIONS =====")

class BatchOperationManager:
    """Manager for batch blockchain operations"""

    def __init__(self, w3: Web3):
        self.w3 = w3
        self.batch_size = 100

    def create_batch_call(self, calls: List[Dict[str, Any]]) -> Any:
        """Create a batch call using eth_call for multiple operations"""

        print(f"📦 Creating batch call with {len(calls)} operations")

        # Prepare batch data
        batch_data = []
        for call in calls:
            batch_data.append({
                'jsonrpc': '2.0',
                'method': 'eth_call',
                'params': [call, 'latest'],
                'id': len(batch_data)
            })

        return batch_data

    def execute_batch_call(self, batch_data: List[Dict[str, Any]]) -> List[Any]:
        """Execute batch call (simplified - in production use proper JSON-RPC batch)"""

        results = []
        for call_data in batch_data:
            try:
                result = self.w3.eth.call(call_data['params'][0])
                results.append(result)
            except Exception as e:
                print(f"   ❌ Batch call error: {str(e)}")
                results.append(None)

        return results

    def multi_call_balances(self, addresses: List[str]) -> Dict[str, int]:
        """Get multiple balances using multicall pattern"""

        print(f"💼 Getting balances for {len(addresses)} addresses using multicall...")

        # Create balance calls
        balance_calls = []
        for address in addresses:
            # ETH balance call using different block positions
            call_data = {
                'to': address,
                'data': '0x'  # Empty data for balance
            }
            balance_calls.append(call_data)

        # Execute batch (simplified example)
        balances = {}
        for i, address in enumerate(addresses):
            try:
                balance = self.w3.eth.get_balance(address)
                balances[address] = balance
            except Exception as e:
                print(f"   ❌ Error getting balance for {address}: {str(e)}")
                balances[address] = 0

        return balances

# ===== ADVANCED CONTRACT INTERACTION =====
print("\n=== ADVANCED CONTRACT INTERACTION =====")

# DeFi Protocol ABIs (simplified examples)
uniswap_v2_router_abi = [
    {
        "inputs": [
            {"name": "amountOutMin", "type": "uint256"},
            {"name": "path", "type": "address[]"},
            {"name": "to", "type": "address"},
            {"name": "deadline", "type": "uint256"}
        ],
        "name": "swapExactETHForTokens",
        "outputs": [{"name": "amounts", "type": "uint256[]"}],
        "stateMutability": "payable",
        "type": "function"
    },
    {
        "inputs": [
            {"name": "amountIn", "type": "uint256"},
            {"name": "amountOutMin", "type": "uint256"},
            {"name": "path", "type": "address[]"},
            {"name": "to", "type": "address"},
            {"name": "deadline", "type": "uint256"}
        ],
        "name": "swapExactTokensForTokens",
        "outputs": [{"name": "amounts", "type": "uint256[]"}],
        "stateMutability": "nonpayable",
        "type": "function"
    }
]

erc20_abi = [
    {
        "constant": True,
        "inputs": [{"name": "_owner", "type": "address"}],
        "name": "balanceOf",
        "outputs": [{"name": "balance", "type": "uint256"}],
        "type": "function"
    },
    {
        "constant": False,
        "inputs": [
            {"name": "_spender", "type": "address"},
            {"name": "_value", "type": "uint256"}
        ],
        "name": "approve",
        "outputs": [{"name": "", "type": "bool"}],
        "type": "function"
    },
    {
        "constant": False,
        "inputs": [
            {"name": "_to", "type": "address"},
            {"name": "_value", "type": "uint256"}
        ],
        "name": "transfer",
        "outputs": [{"name": "", "type": "bool"}],
        "type": "function"
    }
]

class DeFiProtocol:
    """DeFi protocol interaction class"""

    def __init__(self, w3: Web3):
        self.w3 = w3
        self.protocols = {}

    def add_protocol(self, name: str, address: str, abi: List[Dict]):
        """Add a DeFi protocol"""
        contract = self.w3.eth.contract(address=address, abi=abi)
        self.protocols[name] = {
            'address': address,
            'contract': contract,
            'abi': abi
        }
        print(f"✅ Added protocol: {name} at {address}")

    def get_token_price(self, token_address: str, base_token_address: str = None) -> Dict[str, Any]:
        """Get token price from multiple sources"""

        print(f"💰 Getting price for token {token_address}")

        prices = {}

        # Method 1: Direct contract call (if price oracle available)
        if base_token_address:
            try:
                # This would be implemented with actual price oracle
                # For demo purposes, return mock data
                prices['oracle'] = {
                    'price': 1.0,
                    'source': 'oracle',
                    'timestamp': int(time.time())
                }
                print(f"   📊 Oracle price: $1.00")
            except Exception as e:
                print(f"   ❌ Oracle price error: {str(e)}")

        # Method 2: DEX price (simplified)
        try:
            # This would calculate price from DEX reserves
            prices['dex'] = {
                'price': 0.98,
                'source': 'dex',
                'timestamp': int(time.time())
            }
            print(f"   🔄 DEX price: $0.98")
        except Exception as e:
            print(f"   ❌ DEX price error: {str(e)}")

        return prices

    def calculate_swap_amounts(self, token_in: str, token_out: str, amount_in: int) -> Dict[str, Any]:
        """Calculate swap amounts and expected output"""

        print(f"🔄 Calculating swap: {self.w3.from_wei(amount_in, 'ether')} {token_in} → {token_out}")

        # Simplified swap calculation (in reality, use AMM formula)
        reserves_in = 1000000  # Mock reserves
        reserves_out = 2000000

        # Constant product formula: x * y = k
        # amount_out = reserves_out * amount_in / (reserves_in + amount_in) * 0.997 (0.3% fee)
        fee = 0.003
        amount_out = reserves_out * amount_in / (reserves_in + amount_in) * (1 - fee)

        # Price impact
        price_before = reserves_out / reserves_in
        price_after = (reserves_out - amount_out) / (reserves_in + amount_in)
        price_impact = abs(price_before - price_after) / price_before * 100

        result = {
            'amount_in': amount_in,
            'amount_out': int(amount_out),
            'price_before': price_before,
            'price_after': price_after,
            'price_impact': price_impact,
            'fee': int(amount_in * fee),
            'minimum_out': int(amount_out * 0.995)  # 0.5% slippage protection
        }

        print(f"   Expected out: {self.w3.from_wei(result['amount_out'], 'ether')} tokens")
        print(f"   Price impact: {result['price_impact']:.2f}%")
        print(f"   Fee: {self.w3.from_wei(result['fee'], 'ether')} tokens")

        return result

    def approve_token(self, token_address: str, spender_address: str, amount: int, private_key: str):
        """Approve token spending"""

        print(f"✅ Approving {self.w3.from_wei(amount, 'ether')} tokens for {spender_address}")

        try:
            # Create token contract
            token_contract = self.w3.eth.contract(address=token_address, abi=erc20_abi)

            # Get account
            account = self.w3.eth.account.from_key(private_key)

            # Get nonce
            nonce = self.w3.eth.get_transaction_count(account.address)

            # Build transaction
            transaction = token_contract.functions.approve(
                spender_address,
                amount
            ).build_transaction({
                'gas': 50000,
                'gasPrice': self.w3.eth.gas_price,
                'nonce': nonce,
                'chainId': self.w3.eth.chain_id
            })

            # Sign transaction
            signed_tx = self.w3.eth.account.sign_transaction(transaction, private_key)

            print(f"   Transaction hash: {signed_tx.hash.hex()}")
            print("   ⏳ Ready to send (transaction commented out for safety)")

            # Uncomment to send:
            # tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
            # receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
            # print(f"   Approved in block: {receipt.blockNumber}")

            return signed_tx

        except Exception as e:
            print(f"❌ Approval error: {str(e)}")
            return None

# ===== EVENT PROCESSING =====
print("\n=== EVENT PROCESSING =====")

class EventProcessor:
    """Advanced event processing and analysis"""

    def __init__(self, w3: Web3):
        self.w3 = w3
        self.event_cache = {}
        self.filters = {}

    def create_comprehensive_filter(self, contract_address: str, event_abi: List[Dict],
                                   from_block: int = None, to_block: str = 'latest',
                                   address_filters: List[str] = None) -> Any:
        """Create comprehensive event filter"""

        contract = self.w3.eth.contract(address=contract_address, abi=event_abi)

        # Create filter for all events
        all_events_filter = contract.events.create_filter(
            from_block=from_block,
            to_block=to_block
        )

        print(f"📡 Created comprehensive filter for events from block {from_block}")

        return {
            'contract': contract,
            'filter': all_events_filter,
            'address': contract_address
        }

    def analyze_events(self, filter_data: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze events and generate insights"""

        print("📊 Analyzing blockchain events...")

        contract = filter_data['contract']
        filter_params = filter_data['filter'].filter_params

        try:
            # Get all logs
            logs = self.w3.eth.get_logs(filter_params)

            analysis = {
                'total_events': len(logs),
                'unique_addresses': set(),
                'event_types': {},
                'gas_used_total': 0,
                'time_range': {},
                'top_addresses': {},
                'hourly_distribution': {}
            }

            # Process each log
            for log in logs:
                analysis['unique_addresses'].add(log.address)

                # Get block for timestamp
                block = self.w3.eth.get_block(log.blockNumber)

                # Decode log (simplified)
                try:
                    # This would decode using the actual ABI
                    event_type = log.topics[0][:10] if log.topics else 'unknown'
                    analysis['event_types'][event_type] = analysis['event_types'].get(event_type, 0) + 1
                except:
                    pass

                # Time analysis
                hour = time.strftime('%H', time.localtime(block.timestamp))
                analysis['hourly_distribution'][hour] = analysis['hourly_distribution'].get(hour, 0) + 1

            # Convert sets to lists for JSON serialization
            analysis['unique_addresses'] = list(analysis['unique_addresses'])

            print(f"   Found {analysis['total_events']} events")
            print(f"   Unique addresses: {len(analysis['unique_addresses'])}")
            print(f"   Event types: {len(analysis['event_types'])}")

            return analysis

        except Exception as e:
            print(f"❌ Event analysis error: {str(e)}")
            return {}

    def track_real_time_events(self, contract_address: str, event_abi: List[Dict],
                               callback=None):
        """Track real-time events (conceptual example)"""

        print("🎯 Setting up real-time event tracking...")

        contract = self.w3.eth.contract(address=contract_address, abi=event_abi)

        # Create filter for new blocks
        new_block_filter = self.w3.eth.filter('latest')

        print("   📡 Monitoring for new blocks...")
        print("   (In production, use WebSocket or polling with proper error handling)")

        # Event tracking logic
        def process_new_block(block_hash):
            try:
                block = self.w3.eth.get_block(block_hash)

                # Get transactions in this block
                for tx_hash in block.transactions:
                    tx = self.w3.eth.get_transaction(tx_hash)

                    # Check if transaction interacts with our contract
                    if tx.to and tx.to.lower() == contract_address.lower():
                        print(f"   🎯 Contract interaction: {tx_hash.hex()}")

                        # Get receipt for events
                        receipt = self.w3.eth.get_transaction_receipt(tx_hash)

                        for log in receipt.logs:
                            if log.address == contract_address:
                                print(f"      📝 Event in transaction: {log.topics[0][:10] if log.topics else 'unknown'}")

                                # Call custom callback if provided
                                if callback:
                                    callback(log, tx)

            except Exception as e:
                print(f"   ❌ Error processing block {block_hash}: {str(e)}")

        # Return processing function
        return process_new_block

    def export_events_to_csv(self, events: List[Dict], filename: str):
        """Export events to CSV file"""

        import csv

        print(f"💾 Exporting {len(events)} events to {filename}")

        if not events:
            print("   No events to export")
            return

        # Define CSV headers
        headers = ['block_number', 'transaction_hash', 'address', 'topics', 'data', 'log_index']

        try:
            with open(filename, 'w', newline='') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=headers)
                writer.writeheader()

                for event in events:
                    row = {
                        'block_number': event.get('blockNumber', ''),
                        'transaction_hash': event.get('transactionHash', ''),
                        'address': event.get('address', ''),
                        'topics': str(event.get('topics', [])),
                        'data': event.get('data', ''),
                        'log_index': event.get('logIndex', '')
                    }
                    writer.writerow(row)

            print(f"   ✅ Events exported to {filename}")

        except Exception as e:
            print(f"   ❌ Export error: {str(e)}")

# ===== MAIN EXECUTION =====
async def main():
    """Main async execution function"""
    print("🚀 ADVANCED WEB3.PY EXAMPLES")
    print("=" * 50)

    # Initialize Web3 connection
    w3 = Web3(Web3.HTTPProvider('https://mainnet.infura.io/v3/YOUR_PROJECT_ID'))

    if not w3.is_connected():
        print("❌ Could not establish Web3 connection")
        return

    try:
        # Execute async examples
        print("\n" + "-" * 50)

        async_manager = AsyncWeb3Manager('https://mainnet.infura.io/v3/YOUR_PROJECT_ID')
        await async_manager.connect()

        # Test batch operations
        batch_manager = BatchOperationManager(w3)
        test_addresses = [
            "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",  # Vitalik
            "0x742d35Cc6634C0532925a3b8D4C9db96C4b4Db45",  # Binance
            "0x47ac0Fb4F2D84898e4D9E7b4Dab3C24592667209",  # Kraken
        ]

        balances = await async_manager.get_balance_batch(test_addresses)
        print(f"📊 Batch balances retrieved: {len(balances)}")

        # DeFi protocol examples
        print("\n" + "-" * 50)
        defi = DeFiProtocol(w3)

        # Add Uniswap V2 Router
        uniswap_router = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
        defi.add_protocol("uniswap_v2_router", uniswap_router, uniswap_v2_router_abi)

        # Calculate swap amounts
        swap_calculation = defi.calculate_swap_amounts(
            "0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8",  # Mock token address
            "0xB0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E9",  # Mock token address
            w3.to_wei(1, 'ether')
        )

        # Event processing examples
        print("\n" + "-" * 50)
        event_processor = EventProcessor(w3)

        # Create event filter for WETH transfers
        weth_address = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"

        filter_data = event_processor.create_comprehensive_filter(
            weth_address,
            erc20_abi,
            from_block=w3.eth.block_number - 100,  # Last 100 blocks
        )

        # Analyze events
        event_analysis = event_processor.analyze_events(filter_data)
        print(f"📈 Event analysis: {len(event_analysis)} metrics")

        # Export sample events
        sample_events = [
            {
                'blockNumber': 12345,
                'transactionHash': '0xabc123...',
                'address': weth_address,
                'topics': ['0x...'],
                'data': '0x...',
                'logIndex': 0
            }
        ]

        event_processor.export_events_to_csv(sample_events, 'events_export.csv')

        print("\n✅ ALL ADVANCED EXAMPLES COMPLETED!")

        print("\n📚 Advanced Features Demonstrated:")
        print("✅ Async Web3 operations")
        print("✅ Batch request processing")
        print("✅ DeFi protocol integration")
        print("✅ Event processing and analysis")
        print("✅ Real-time event tracking")
        print("✅ Data export functionality")

        print("\n🔧 Production Recommendations:")
        print("1. Use proper error handling and retries")
        print("2. Implement rate limiting for API calls")
        print("3. Use WebSocket providers for real-time data")
        print("4. Cache frequently accessed data")
        print("5. Monitor gas prices and optimize transactions")

    except Exception as e:
        print(f"\n❌ FATAL ERROR: {str(e)}")

# Additional utility functions
def create_contract_instance(w3, address: str, abi_file: str) -> Optional[Contract]:
    """Create contract instance from ABI file"""
    try:
        with open(abi_file, 'r') as f:
            abi = json.load(f)

        contract = w3.eth.contract(address=address, abi=abi)
        print(f"✅ Contract instance created for {address}")
        return contract
    except Exception as e:
        print(f"❌ Error creating contract instance: {str(e)}")
        return None

def monitor_gas_prices(w3, duration: int = 60):
    """Monitor gas prices over time"""
    print(f"⛽ Monitoring gas prices for {duration} seconds...")

    start_time = time.time()
    gas_prices = []

    while time.time() - start_time < duration:
        try:
            gas_price = w3.eth.gas_price
            gas_prices.append({
                'timestamp': int(time.time()),
                'gas_price': gas_price,
                'gas_price_gwei': w3.from_wei(gas_price, 'gwei')
            })

            print(f"   Current gas price: {w3.from_wei(gas_price, 'gwei')} gwei")
            time.sleep(5)

        except Exception as e:
            print(f"   ❌ Error getting gas price: {str(e)}")
            break

    if gas_prices:
        avg_gas = sum(gp['gas_price'] for gp in gas_prices) / len(gas_prices)
        print(f"\n📊 Gas Price Summary:")
        print(f"   Samples: {len(gas_prices)}")
        print(f"   Average: {w3.from_wei(avg_gas, 'gwei')} gwei")
        print(f"   Min: {min(gp['gas_price_gwei'] for gp in gas_prices):.2f} gwei")
        print(f"   Max: {max(gp['gas_price_gwei'] for gp in gas_prices):.2f} gwei")

    return gas_prices

if __name__ == "__main__":
    asyncio.run(main())

# Export for individual testing
__all__ = [
    'AsyncWeb3Manager',
    'BatchOperationManager',
    'DeFiProtocol',
    'EventProcessor',
    'create_contract_instance',
    'monitor_gas_prices',
]

💻 Intégration DeFi Web3.py python

🔴 complex ⭐⭐⭐⭐⭐

Exemples complets d'intégration DeFi incluant Uniswap, Compound, Aave, le yield farming et les stratégies de trading automatisées

⏱️ 45 min 🏷️ web3py, defi, yield, trading, advanced
Prerequisites: Web3.py advanced, Understanding of DeFi protocols, Python Decimal precision, Financial concepts
# Web3.py DeFi Integration
# Complete DeFi ecosystem integration with popular protocols

import asyncio
import json
import time
import math
from decimal import Decimal, getcontext
from typing import Dict, List, Optional, Tuple
from web3 import Web3
from web3.middleware import geth_poa_middleware

# Set high precision for Decimal calculations
getcontext().prec = 18

# ===== DEFI PROTOCOL ABIs =====
print("=== DEFI PROTOCOL ABIs ===")

# Uniswap V2 Router ABI (simplified)
uniswap_v2_router_abi = [
    {
        "inputs": [
            {"name": "amountOutMin", "type": "uint256"},
            {"name": "path", "type": "address[]"},
            {"name": "to", "type": "address"},
            {"name": "deadline", "type": "uint256"}
        ],
        "name": "swapExactETHForTokens",
        "outputs": [{"name": "amounts", "type": "uint256[]"}],
        "stateMutability": "payable",
        "type": "function"
    },
    {
        "inputs": [
            {"name": "amountIn", "type": "uint256"},
            {"name": "amountOutMin", "type": "uint256"},
            {"name": "path", "type": "address[]"},
            {"name": "to", "type": "address"},
            {"name": "deadline", "type": "uint256"}
        ],
        "name": "swapExactTokensForTokens",
        "outputs": [{"name": "amounts", "type": "uint256[]"}],
        "stateMutability": "nonpayable",
        "type": "function"
    },
    {
        "inputs": [
            {"name": "tokenA", "type": "address"},
            {"name": "tokenB", "type": "address"},
            {"name": "amountADesired", "type": "uint256"},
            {"name": "amountBDesired", "type": "uint256"},
            {"name": "amountAMin", "type": "uint256"},
            {"name": "amountBMin", "type": "uint256"},
            {"name": "to", "type": "address"},
            {"name": "deadline", "type": "uint256"}
        ],
        "name": "addLiquidity",
        "outputs": [
            {"name": "amountA", "type": "uint256"},
            {"name": "amountB", "type": "uint256"},
            {"name": "liquidity", "type": "uint256"}
        ],
        "stateMutability": "nonpayable",
        "type": "function"
    }
]

# Uniswap V2 Pair ABI (simplified)
uniswap_v2_pair_abi = [
    {
        "constant": True,
        "inputs": [],
        "name": "getReserves",
        "outputs": [
            {"name": "reserve0", "type": "uint112"},
            {"name": "reserve1", "type": "uint112"},
            {"name": "blockTimestampLast", "type": "uint32"}
        ],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": True,
        "inputs": [],
        "name": "token0",
        "outputs": [{"name": "", "type": "address"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": True,
        "inputs": [],
        "name": "token1",
        "outputs": [{"name": "", "type": "address"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    }
]

# ERC20 Token ABI
erc20_abi = [
    {
        "constant": True,
        "inputs": [{"name": "_owner", "type": "address"}],
        "name": "balanceOf",
        "outputs": [{"name": "balance", "type": "uint256"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": False,
        "inputs": [
            {"name": "_spender", "type": "address"},
            {"name": "_value", "type": "uint256"}
        ],
        "name": "approve",
        "outputs": [{"name": "", "type": "bool"}],
        "payable": False,
        "stateMutability": "nonpayable",
        "type": "function"
    },
    {
        "constant": False,
        "inputs": [
            {"name": "_to", "type": "address"},
            {"name": "_value", "type": "uint256"}
        ],
        "name": "transfer",
        "outputs": [{"name": "", "type": "bool"}],
        "payable": False,
        "stateMutability": "nonpayable",
        "type": "function"
    }
]

# Compound cToken ABI (simplified)
compound_ctoken_abi = [
    {
        "constant": False,
        "inputs": [{"name": "mint", "type": "uint256"}],
        "name": "mint",
        "outputs": [{"name": "", "type": "uint256"}],
        "payable": False,
        "stateMutability": "nonpayable",
        "type": "function"
    },
    {
        "constant": False,
        "inputs": [{"name": "redeem", "type": "uint256"}],
        "name": "redeem",
        "outputs": [{"name": "", "type": "uint256"}],
        "payable": False,
        "stateMutability": "nonpayable",
        "type": "function"
    },
    {
        "constant": True,
        "inputs": [],
        "name": "exchangeRateCurrent",
        "outputs": [{"name": "", "type": "uint256"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": True,
        "inputs": [],
        "name": "supplyRatePerBlock",
        "outputs": [{"name": "", "type": "uint256"}],
        "payable": False,
        "stateMutability": "view",
        "type": "function"
    }
]

# ===== YIELD FARMING MANAGER =====
class YieldFarmingManager:
    """Comprehensive yield farming management"""

    def __init__(self, w3: Web3, private_key: str):
        self.w3 = w3
        self.account = w3.eth.account.from_key(private_key)
        self.protocols = {}
        self.positions = {}
        self.apr_cache = {}

    def add_protocol(self, name: str, config: Dict[str, Any]):
        """Add a DeFi protocol configuration"""
        self.protocols[name] = config
        print(f"✅ Added protocol: {name}")

    def get_token_price(self, token_address: str, base_token: str = None) -> Decimal:
        """Get token price from Uniswap"""

        print(f"💰 Getting price for token {token_address}")

        if token_address.lower() == "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2".lower():  # WETH
            return Decimal('1.0')

        # Create WETH/token pair
        pair_address = self.get_pair_address(token_address, "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")

        if pair_address:
            try:
                pair_contract = self.w3.eth.contract(
                    address=pair_address,
                    abi=uniswap_v2_pair_abi
                )

                reserves = pair_contract.functions.getReserves().call()
                token0 = pair_contract.functions.token0().call()

                # Calculate price based on reserves
                if token0.lower() == token_address.lower():
                    price = Decimal(reserves[1]) / Decimal(reserves[0])
                else:
                    price = Decimal(reserves[0]) / Decimal(reserves[1])

                print(f"   📊 Uniswap price: ${price:.6f}")
                return price

            except Exception as e:
                print(f"   ❌ Uniswap price error: {str(e)}")

        # Fallback to other price sources
        print(f"   ⚠️  Using fallback price source")
        return Decimal('1.0')

    def get_pair_address(self, token_a: str, token_b: str) -> Optional[str]:
        """Get Uniswap pair address (simplified)"""

        # This would use the Uniswap V2 Factory to get pair address
        # For demo purposes, return mock addresses
        pairs = {
            "0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8-0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0xBb2b8038a1640196FbE3e38816F3e67Cba72D940",
            "0x6B175474E89094C44Da98b954EedeAC495271d0F-0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0xA478c2975Ab1Ea89e8196811F51A7B7Ade33eB11"
        }

        key = f"{token_a}-{token_b}"
        reverse_key = f"{token_b}-{token_a}"

        return pairs.get(key) or pairs.get(reverse_key)

    def calculate_uniswap_swap(self, amount_in: int, token_in: str, token_out: str) -> Dict[str, Decimal]:
        """Calculate Uniswap swap amounts"""

        print(f"🔄 Calculating Uniswap swap: {self.w3.from_wei(amount_in, 'ether')} {token_in} → {token_out}")

        pair_address = self.get_pair_address(token_in, token_out)
        if not pair_address:
            raise ValueError("No liquidity pair found")

        try:
            pair_contract = self.w3.eth.contract(
                address=pair_address,
                abi=uniswap_v2_pair_abi
            )

            reserves = pair_contract.functions.getReserves().call()
            token0 = pair_contract.functions.token0().call()

            # Determine which reserve corresponds to which token
            if token0.lower() == token_in.lower():
                reserve_in, reserve_out = reserves[0], reserves[1]
            else:
                reserve_in, reserve_out = reserves[1], reserves[0]

            # Uniswap constant product formula: x * y = k
            # Apply 0.3% fee
            fee = Decimal('0.003')
            amount_in_with_fee = Decimal(amount_in)
            numerator = amount_in_with_fee * Decimal(reserve_out)
            denominator = Decimal(reserve_in) + amount_in_with_fee
            amount_out = numerator / denominator

            # Calculate price impact
            price_before = Decimal(reserve_out) / Decimal(reserve_in)
            price_after = (Decimal(reserve_out) - amount_out) / (Decimal(reserve_in) + amount_in)
            price_impact = abs(price_before - price_after) / price_before * Decimal('100')

            result = {
                'amount_in': Decimal(amount_in),
                'amount_out': amount_out,
                'price_before': price_before,
                'price_after': price_after,
                'price_impact': price_impact,
                'fee': amount_in_with_fee * fee,
                'minimum_out': amount_out * Decimal('0.995')  # 0.5% slippage
            }

            print(f"   Expected out: {amount_out:.6f} tokens")
            print(f"   Price impact: {price_impact:.2f}%")
            print(f"   Fee: {result['fee']:.6f} tokens")
            print(f"   Minimum out (0.5% slippage): {result['minimum_out']:.6f} tokens")

            return result

        except Exception as e:
            print(f"❌ Swap calculation error: {str(e)}")
            raise

    def execute_uniswap_swap(self, amount_in: int, token_in: str, token_out: str,
                              slippage_tolerance: float = 0.5) -> Dict[str, Any]:
        """Execute Uniswap swap"""

        print(f"🚀 Executing Uniswap swap")

        # Calculate expected output
        swap_calc = self.calculate_uniswap_swap(amount_in, token_in, token_out)

        # Get Uniswap Router contract
        router_address = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
        router_contract = self.w3.eth.contract(
            address=router_address,
            abi=uniswap_v2_router_abi
        )

        try:
            # Build swap path
            path = [token_in, token_out]

            # Calculate minimum output with slippage tolerance
            min_out = int(swap_calc['amount_out'] * (Decimal('1') - Decimal(slippage_tolerance / 100)))

            # Get deadline (20 minutes from now)
            deadline = int(time.time()) + 1200

            # Get nonce and build transaction
            nonce = self.w3.eth.get_transaction_count(self.account.address)

            if token_in.lower() == "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2".lower():  # WETH
                # ETH to token swap
                transaction = router_contract.functions.swapExactETHForTokens(
                    min_out,
                    path,
                    self.account.address,
                    deadline
                ).build_transaction({
                    'value': amount_in,
                    'gas': 200000,
                    'gasPrice': self.w3.eth.gas_price,
                    'nonce': nonce,
                    'chainId': self.w3.eth.chain_id
                })
            else:
                # Token to token swap
                # First need to approve token spending
                self.approve_token(token_in, router_address, amount_in)

                transaction = router_contract.functions.swapExactTokensForTokens(
                    amount_in,
                    min_out,
                    path,
                    self.account.address,
                    deadline
                ).build_transaction({
                    'gas': 200000,
                    'gasPrice': self.w3.eth.gas_price,
                    'nonce': nonce + 1,
                    'chainId': self.w3.eth.chain_id
                })

            # Sign transaction
            signed_tx = self.w3.eth.account.sign_transaction(transaction, self.account.key)

            print(f"   Transaction hash: {signed_tx.hash.hex()}")
            print("   ⏳ Ready to send (transaction commented out for safety)")

            # Uncomment to send:
            # tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
            # receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
            # print(f"   ✅ Swap completed in block: {receipt.blockNumber}")

            return {
                'transaction': signed_tx,
                'calculation': swap_calc,
                'expected_min_out': min_out
            }

        except Exception as e:
            print(f"❌ Swap execution error: {str(e)}")
            raise

    def approve_token(self, token_address: str, spender_address: str, amount: int):
        """Approve token spending"""

        print(f"✅ Approving token {token_address} for spender {spender_address}")

        try:
            token_contract = self.w3.eth.contract(
                address=token_address,
                abi=erc20_abi
            )

            nonce = self.w3.eth.get_transaction_count(self.account.address)

            transaction = token_contract.functions.approve(
                spender_address,
                amount
            ).build_transaction({
                'gas': 50000,
                'gasPrice': self.w3.eth.gas_price,
                'nonce': nonce,
                'chainId': self.w3.eth.chain_id
            })

            signed_tx = self.w3.eth.account.sign_transaction(transaction, self.account.key)

            print(f"   Approval transaction hash: {signed_tx.hash.hex()}")
            print("   ⏳ Ready to send (transaction commented out for safety)")

            return signed_tx

        except Exception as e:
            print(f"❌ Approval error: {str(e)}")
            raise

# ===== LENDING PROTOCOL MANAGER =====
class LendingProtocolManager:
    """Manage lending protocols like Compound and Aave"""

    def __init__(self, w3: Web3, private_key: str):
        self.w3 = w3
        self.account = w3.eth.account.from_key(private_key)
        self.protocols = {}

    def add_protocol(self, name: str, config: Dict[str, Any]):
        """Add a lending protocol"""
        self.protocols[name] = config
        print(f"✅ Added lending protocol: {name}")

    def calculate_compound_apy(self, token_address: str) -> Dict[str, Decimal]:
        """Calculate Compound APY for a token"""

        print(f"📈 Calculating Compound APY for token {token_address}")

        # This would use actual Compound protocol data
        # For demo purposes, use mock data
        supply_rate = Decimal('0.05') / Decimal(365) / Decimal(86400)  # 5% annual

        # Blocks per day (Ethereum ~ 6,500 blocks/day)
        blocks_per_day = Decimal(6500)
        eth_blocks_per_year = blocks_per_day * Decimal(365)

        # Calculate APY
        apy = (Decimal('1') + supply_rate * eth_blocks_per_year) - Decimal('1')

        result = {
            'supply_rate': supply_rate,
            'apy': apy * Decimal('100'),
            'blocks_per_day': blocks_per_day,
            'eth_blocks_per_year': eth_blocks_per_year
        }

        print(f"   Supply Rate: {supply_rate:.8f} per block")
        print(f"   APY: {apy * Decimal('100'):.2f}%")

        return result

    def supply_to_compound(self, token_address: str, amount: int) -> Dict[str, Any]:
        """Supply tokens to Compound"""

        print(f"💰 Supplying {self.w3.from_wei(amount, 'ether')} tokens to Compound")

        # This would interact with the actual Compound cToken
        ctoken_address = self.get_ctoken_address(token_address)

        if not ctoken_address:
            raise ValueError("No cToken found for this token")

        try:
            ctoken_contract = self.w3.eth.contract(
                address=ctoken_address,
                abi=compound_ctoken_abi
            )

            # Approve Compound to spend tokens
            compound_comptroller = "0x3d9819210a31b4961b30ef54be2aed79b9c9cd3b7"
            self.approve_token(token_address, ctoken_address, amount)

            nonce = self.w3.eth.get_transaction_count(self.account.address)

            transaction = ctoken_contract.functions.mint(amount).build_transaction({
                'gas': 200000,
                'gasPrice': self.w3.eth.gas_price,
                'nonce': nonce,
                'chainId': self.w3.eth.chain_id
            })

            signed_tx = self.w3.eth.account.sign_transaction(transaction, self.account.key)

            print(f"   Mint transaction hash: {signed_tx.hash.hex()}")
            print("   ⏳ Ready to send (transaction commented out for safety)")

            return {
                'transaction': signed_tx,
                'token_address': token_address,
                'ctoken_address': ctoken_address,
                'amount': amount
            }

        except Exception as e:
            print(f"❌ Compound supply error: {str(e)}")
            raise

    def get_ctoken_address(self, token_address: str) -> Optional[str]:
        """Get corresponding cToken address"""

        # Mock mapping - in reality, this would query Compound
        ctokens = {
            "0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8": "0x39AA39c021dfbaE8faC545936693aC917d5E75663",  # USDC
            "0x6B175474E89094C44Da98b954EedeAC495271d0F": "0x4Ddc2D193948926D02F9B1fE9e1daaE1C1d329329",  # DAI
            "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x4Ddc2D193948926D02F9B1fE9e1daaE1C1d329329",  # WETH (same as DAI in this mock)
        }

        return ctokens.get(token_address.lower())

# ===== AUTOMATED TRADING STRATEGY =====
class AutomatedTradingStrategy:
    """Automated trading strategy with risk management"""

    def __init__(self, w3: Web3, private_key: str):
        self.w3 = w3
        self.account = w3.eth.account.from_key(private_key)
        self.farming_manager = YieldFarmingManager(w3, private_key)
        self.lending_manager = LendingProtocolManager(w3, private_key)
        self.trading_history = []

    def calculate_yield_farming_apy(self, pair_address: str, staking_apr: float = None) -> Dict[str, Decimal]:
        """Calculate total APY for yield farming (trading fees + staking)"""

        print(f"📊 Calculating yield farming APY for {pair_address}")

        # Mock data for daily trading volume and fees
        daily_volume = Decimal('1000000')  # $1M daily volume
        trading_fee_rate = Decimal('0.003')  # 0.3%
        daily_fees = daily_volume * trading_fee_rate

        # Calculate pool reserves (mock)
        total_liquidity = Decimal('5000000')  # $5M total liquidity
        our_share = Decimal('0.01')  # 1% of pool
        our_liquidity = total_liquidity * our_share

        # Calculate our share of daily fees
        our_daily_fees = daily_fees * our_share
        our_daily_earnings = our_daily_fees / our_liquidity

        # Annualize trading fee APY
        trading_fee_apy = our_daily_earnings * Decimal('365')

        # Add staking rewards if provided
        total_apy = trading_fee_apy
        if staking_apr:
            staking_apy = Decimal(staking_apr) / Decimal('100')
            total_apy += staking_apy

        result = {
            'daily_volume': daily_volume,
            'daily_fees': daily_fees,
            'our_daily_fees': our_daily_fees,
            'trading_fee_apy': trading_fee_apy * Decimal('100'),
            'staking_apy': staking_apr or 0,
            'total_apy': total_apy * Decimal('100'),
            'our_liquidity': our_liquidity
        }

        print(f"   Daily Volume: ${daily_volume:,.0f}")
        print(f"   Trading Fee APY: {trading_fee_apy * Decimal('100'):.2f}%")
        print(f"   Staking APR: {staking_apr or 0}%")
        print(f"   Total APY: {total_apy * Decimal('100'):.2f}%")

        return result

    def implement_flash_loan_arbitrage(self, token_a: str, token_b: str,
                                         amount: int) -> Dict[str, Any]:
        """Implement flash loan arbitrage strategy"""

        print(f"⚡ Implementing flash loan arbitrage: {token_a} ↔ {token_b}")

        # Calculate price differences between exchanges
        price_uniswap = self.farming_manager.get_token_price(token_a, token_b)
        price_sushiswap = self.get_sushiswap_price(token_a, token_b)  # Mock function

        price_diff = abs(price_uniswap - price_sushiswap)
        price_diff_pct = (price_diff / min(price_uniswap, price_sushiswap)) * 100

        if price_diff_pct < 1:  # Less than 1% difference
            print(f"   ⚠️  Price difference ({price_diff_pct:.2f}%) too small for arbitrage")
            return {'profitable': False, 'reason': 'Price difference too small'}

        # Calculate potential profit
        # This is simplified - actual implementation would account for:
        # - Flash loan fees (usually 0.09%)
        # - Gas costs
        # - Slippage
        # - DEX fees

        flash_loan_fee = Decimal('0.0009')  # 0.09%
        estimated_profit = amount * Decimal(price_diff_pct / 100) * Decimal('0.99')
        flash_loan_cost = amount * flash_loan_fee

        net_profit = estimated_profit - flash_loan_cost

        if net_profit <= 0:
            print(f"   ❌ Net profit negative after fees: ${self.w3.from_wei(int(net_profit), 'ether')}")
            return {'profitable': False, 'reason': 'No profit after fees'}

        result = {
            'profitable': True,
            'price_diff_pct': price_diff_pct,
            'estimated_profit': estimated_profit,
            'flash_loan_cost': flash_loan_cost,
            'net_profit': net_profit,
            'amount': amount
        }

        print(f"   💰 Potential profit: ${self.w3.from_wei(int(net_profit), 'ether')}")
        print(f"   Flash loan cost: ${self.w3.from_wei(int(flash_loan_cost), 'ether')}")

        return result

    def get_sushiswap_price(self, token_a: str, token_b: str) -> float:
        """Get price from Sushiswap (mock)"""
        # This would use actual Sushiswap data
        return 1.02  # Mock price with slight difference

    def rebalance_portfolio(self, target_allocations: Dict[str, float]) -> Dict[str, Any]:
        """Rebalance portfolio to target allocations"""

        print("🔄 Rebalancing portfolio to target allocations")

        current_value = self.calculate_portfolio_value()
        total_value = sum(current_value.values())

        rebalance_actions = []

        for token, target_pct in target_allocations.items():
            target_value = total_value * Decimal(str(target_pct))
            current_token_value = current_value.get(token, Decimal('0'))

            diff = target_value - current_token_value
            diff_pct = (diff / total_value) * 100

            if abs(diff_pct) > 1:  # Rebalance if difference > 1%
                action = {
                    'token': token,
                    'current_value': current_token_value,
                    'target_value': target_value,
                    'difference': diff,
                    'difference_pct': diff_pct,
                    'action': 'buy' if diff > 0 else 'sell'
                }
                rebalance_actions.append(action)

                print(f"   {token}: {action['action']} ${abs(diff_pct):.2f}% ({'buy' if diff > 0 else 'sell'})")

        return {
            'total_value': total_value,
            'current_allocations': {k: v/total_value*100 for k, v in current_value.items()},
            'target_allocations': target_allocations,
            'rebalance_actions': rebalance_actions,
            'actions_required': len(rebalance_actions) > 0
        }

    def calculate_portfolio_value(self) -> Dict[str, Decimal]:
        """Calculate current portfolio value"""

        # Mock portfolio data - in reality, this would query blockchain
        return {
            'ETH': Decimal('10.5'),
            'USDC': Decimal('5000'),
            'DAI': Decimal('3000'),
            'WBTC': Decimal('0.5')
        }

    def run_strategy(self, strategy_config: Dict[str, Any]):
        """Run automated trading strategy"""

        print("🤖 Running automated trading strategy")

        strategy_type = strategy_config.get('type')

        if strategy_type == 'yield_farming':
            # Implement yield farming strategy
            pairs = strategy_config.get('pairs', [])

            for pair in pairs:
                apy_data = self.calculate_yield_farming_apy(pair)

                if apy_data['total_apy'] > Decimal('20'):  # Target > 20% APY
                    print(f"   🎯 High APY opportunity: {pair} - {apy_data['total_apy']:.2f}%")

        elif strategy_type == 'arbitrage':
            # Implement arbitrage strategy
            pairs = strategy_config.get('pairs', [])

            for token_a, token_b in pairs:
                result = self.implement_flash_loan_arbitrage(token_a, token_b,
                                                                    strategy_config.get('amount', 1000000))

                if result['profitable']:
                    print(f"   💰 Profitable arbitrage found: {token_a}/{token_b}")

        elif strategy_type == 'portfolio_rebalance':
            # Implement portfolio rebalancing
            target_allocs = strategy_config.get('target_allocations', {})
            rebalance_result = self.rebalance_portfolio(target_allocs)

            if rebalance_result['actions_required']:
                print(f"   🔄 Rebalancing required: {len(rebalance_result['rebalance_actions'])} actions")

        print("   ✅ Strategy execution completed")

# ===== MAIN EXECUTION =====
async def main():
    """Main async execution function"""
    print("🚀 WEB3.PY DEFI INTEGRATION")
    print("=" * 50)

    # Initialize Web3 connection
    w3 = Web3(Web3.HTTPProvider('https://mainnet.infura.io/v3/YOUR_PROJECT_ID'))

    if not w3.is_connected():
        print("❌ Could not establish Web3 connection")
        return

    # Initialize with test private key (DO NOT use real private key in production)
    private_key = "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"

    try:
        # Initialize managers
        farming_manager = YieldFarmingManager(w3, private_key)
        lending_manager = LendingProtocolManager(w3, private_key)
        trading_strategy = AutomatedTradingStrategy(w3, private_key)

        # Add protocol configurations
        farming_manager.add_protocol("uniswap_v2", {
            'router': '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D',
            'factory': '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f'
        })

        # Yield farming example
        print("\n" + "-" * 50)
        farming_manager.calculate_uniswap_swap(
            w3.to_wei(1, 'ether'),
            "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",  # WETH
            "0x6B175474E89094C44Da98b954EedeAC495271d0F"   # DAI
        )

        # Lending example
        print("\n" + "-" * 50)
        lending_manager.calculate_compound_apy("0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8")  # USDC

        # Yield farming APY calculation
        print("\n" + "-" * 50)
        trading_strategy.calculate_yield_farming_apy(
            "0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8-0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
            staking_apr=5.0
        )

        # Arbitrage example
        print("\n" + "-" * 50)
        trading_strategy.implement_flash_loan_arbitrage(
            "0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8",  # USDC
            "0x6B175474E89094C44Da98b954EedeAC495271d0F",  # DAI
            w3.to_wei(100000, 'ether')  # $100K
        )

        # Portfolio rebalancing
        print("\n" + "-" * 50)
        target_allocations = {
            'ETH': 0.40,      # 40%
            'USDC': 0.30,     # 30%
            'DAI': 0.20,      # 20%
            'WBTC': 0.10      # 10%
        }
        trading_strategy.rebalance_portfolio(target_allocations)

        # Run automated strategy
        print("\n" + "-" * 50)
        strategy_config = {
            'type': 'yield_farming',
            'pairs': [
                "0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8-0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
            ],
            'min_apy': 15.0
        }
        trading_strategy.run_strategy(strategy_config)

        print("\n✅ ALL DEFI EXAMPLES COMPLETED!")

        print("\n📚 DeFi Features Demonstrated:")
        print("✅ Uniswap DEX integration")
        print("✅ Compound lending protocol")
        print("✅ Yield farming APY calculations")
        print("✅ Flash loan arbitrage strategies")
        print("✅ Portfolio management and rebalancing")
        print("✅ Automated trading strategies")
        print("✅ Risk management and slippage protection")

        print("\n⚠️  IMPORTANT SECURITY NOTES:")
        print("1. Never use real private keys in code examples")
        print("2. Always use proper error handling and retries")
        print("3. Implement proper slippage protection")
        print("4. Monitor gas prices and network congestion")
        print("5. Use reputable DeFi protocols and audits")
        print("6. Test on testnets before mainnet deployment")
        print("7. Consider using oracles for price data")
        print("8. Implement proper access controls")

    except Exception as e:
        print(f"\n❌ FATAL ERROR: {str(e)}")

# Additional utility functions
def calculate_impermanent_loss(price_initial: float, price_final: float) -> float:
    """Calculate impermanent loss"""

    if price_final == 0:
        return 1.0  # 100% loss

    price_ratio = price_final / price_initial
    sqrt_ratio = math.sqrt(price_ratio)

    # Impermanent loss formula: 2 * sqrt(ratio) / (1 + ratio) - 1
    impermanent_loss = (2 * sqrt_ratio / (1 + price_ratio) - 1) * 100

    return impermanent_loss

def monitor_defi_protocols(w3, duration: int = 300):
    """Monitor DeFi protocols for opportunities"""

    print(f"🔍 Monitoring DeFi protocols for {duration} seconds...")

    opportunities = []

    end_time = time.time() + duration

    while time.time() < end_time:
        try:
            # This would monitor real DeFi protocols
            # For demo purposes, simulate finding opportunities

            if time.time() % 60 < 1:  # Every minute
                opportunity = {
                    'timestamp': int(time.time()),
                    'protocol': 'Uniswap V2',
                    'pair': 'ETH/USDC',
                    'apr': random.uniform(10, 50),
                    'liquidity': random.uniform(100000, 1000000),
                    'volume_24h': random.uniform(500000, 5000000)
                }

                opportunities.append(opportunity)

                if opportunity['apr'] > 30:
                    print(f"   🎯 High APY opportunity: {opportunity['pair']} - {opportunity['apr']:.2f}%")

            time.sleep(10)

        except Exception as e:
            print(f"   ❌ Monitoring error: {str(e)}")
            break

    print(f"\n📊 Found {len(opportunities)} opportunities")

    return opportunities

if __name__ == "__main__":
    asyncio.run(main())

# Export for individual testing
__all__ = [
    'YieldFarmingManager',
    'LendingProtocolManager',
    'AutomatedTradingStrategy',
    'calculate_impermanent_loss',
    'monitor_defi_protocols',
]