🎯 empfohlene Sammlungen
Balanced sample collections from various categories for you to explore
Web3.py Python Bibliothek
Web3.py Python Beispiele einschließlich Blockchain-Interaktion, Smart Contracts, Wallet-Management, Transaktionen und dezentrale Anwendungsentwicklung
💻 Web3.py Basics - Getting Started python
🟢 simple
⭐⭐
Vollständige Web3.py Grundlagen inklusive Provider-Setup, Account-Management, Transaktionen und grundlegendende Blockchain-Operationen
⏱️ 25 min
🏷️ web3py, python, ethereum, web3, basics
Prerequisites:
Python 3.6+, pip, Basic understanding of Ethereum, API keys for providers
# Web3.py Basics - Complete Getting Started Guide
# Install: pip install web3
from web3 import Web3
from web3.middleware import geth_poa_middleware
import json
import time
# ===== PROVIDER EXAMPLES =====
print("=== PROVIDER EXAMPLES ===")
# 1. Connect to local Geth node
def connect_to_geth():
"""Connect to local Geth node"""
w3 = Web3(Web3.HTTPProvider('http://127.0.0.1:8545'))
if w3.is_connected():
print("✅ Connected to local Geth node")
return w3
else:
print("❌ Failed to connect to Geth")
return None
# 2. Connect to Infura
def connect_to_infura():
"""Connect to Infura endpoint"""
infura_url = "https://mainnet.infura.io/v3/YOUR_PROJECT_ID"
w3 = Web3(Web3.HTTPProvider(infura_url))
if w3.is_connected():
print("✅ Connected to Infura")
return w3
else:
print("❌ Failed to connect to Infura")
return None
# 3. Connect to WebSocket provider
def connect_to_websocket():
"""Connect via WebSocket for real-time updates"""
w3 = Web3(Web3.WebsocketProvider('wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID'))
if w3.is_connected():
print("✅ Connected to WebSocket provider")
return w3
else:
print("❌ Failed to connect to WebSocket")
return None
# ===== WALLET AND ACCOUNT MANAGEMENT =====
print("\n=== WALLET AND ACCOUNT MANAGEMENT ===")
def account_operations():
"""Demonstrate account creation and management"""
# Create new account
account = Web3().eth.account.create()
print("🔑 New Account Created:")
print(f" Address: {account.address}")
print(f" Private Key: {account.key.hex()}")
# Create account from private key
private_key = "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
account_from_key = Web3().eth.account.from_key(private_key)
print("\n🔐 Account from Private Key:")
print(f" Address: {account_from_key.address}")
# Generate mnemonic
mnemonic, seed_phrase = Web3().eth.account.create_with_mnemonic()
print("\n📝 Account with Mnemonic:")
print(f" Address: {mnemonic.address}")
print(f" Mnemonic: {seed_phrase}")
return account, account_from_key
def wallet_operations(w3):
"""Demonstrate wallet operations"""
if not w3:
print("❌ No Web3 connection available")
return
# Get account balance
address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045" # Vitalik's address
balance = w3.eth.get_balance(address)
print("\n💰 Balance Information:")
print(f" Address: {address}")
print(f" Balance (Wei): {balance}")
print(f" Balance (ETH): {w3.from_wei(balance, 'ether')}")
# Get transaction count
nonce = w3.eth.get_transaction_count(address)
print(f" Transaction Count: {nonce}")
# Get account code (for contracts)
code = w3.eth.get_code(address)
print(f" Code Length: {len(code)} bytes")
return balance, nonce
# ===== TRANSACTION EXAMPLES =====
print("\n=== TRANSACTION EXAMPLES =====")
def basic_transaction(w3):
"""Send basic ETH transaction"""
if not w3:
print("❌ No Web3 connection available")
return
# Account setup (for demo - use test account)
private_key = "YOUR_PRIVATE_KEY_HERE"
sender_address = w3.eth.account.from_key(private_key).address
recipient_address = "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"
try:
# Get nonce
nonce = w3.eth.get_transaction_count(sender_address)
# Transaction details
tx = {
'nonce': nonce,
'to': recipient_address,
'value': w3.to_wei(0.01, 'ether'), # 0.01 ETH
'gas': 21000, # Standard ETH transfer
'gasPrice': w3.eth.gas_price,
'chainId': w3.eth.chain_id
}
print("💸 Transaction Details:")
print(f" From: {sender_address}")
print(f" To: {recipient_address}")
print(f" Value: {w3.from_wei(tx['value'], 'ether')} ETH")
print(f" Gas: {tx['gas']}")
print(f" Gas Price: {w3.from_wei(tx['gasPrice'], 'gwei')} gwei")
# Sign transaction
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
# Send transaction (commented out for safety)
print("\n⏳ Ready to send (transaction commented out for safety):")
print(f" Transaction Hash: {signed_tx.hash.hex()}")
# Uncomment to actually send:
# tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
# print(f" Transaction confirmed in block: {receipt.blockNumber}")
return tx, signed_tx
except Exception as e:
print(f"❌ Transaction error: {str(e)}")
return None, None
def estimate_gas_example(w3):
"""Estimate gas for transactions"""
if not w3:
print("❌ No Web3 connection available")
return
try:
# Estimate gas for ETH transfer
gas_estimate = w3.eth.estimate_gas({
'to': "0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
'value': w3.to_wei(1, 'ether'),
})
print(f"⛽ Gas estimate for 1 ETH transfer: {gas_estimate}")
# Get current gas price
gas_price = w3.eth.gas_price
print(f" Current gas price: {w3.from_wei(gas_price, 'gwei')} gwei")
# Calculate total cost
gas_cost = gas_estimate * gas_price
print(f" Total gas cost: {w3.from_wei(gas_cost, 'ether')} ETH")
return gas_estimate, gas_price
except Exception as e:
print(f"❌ Gas estimation error: {str(e)}")
return None, None
# ===== SMART CONTRACT INTERACTION =====
print("\n=== SMART CONTRACT INTERACTION ===")
# ERC20 Token ABI (simplified)
erc20_abi = [
{
"constant": True,
"inputs": [],
"name": "name",
"outputs": [{"name": "", "type": "string"}],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": True,
"inputs": [],
"name": "symbol",
"outputs": [{"name": "", "type": "string"}],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": True,
"inputs": [],
"name": "decimals",
"outputs": [{"name": "", "type": "uint8"}],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": True,
"inputs": [],
"name": "totalSupply",
"outputs": [{"name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": False,
"inputs": [
{"name": "_to", "type": "address"},
{"name": "_value", "type": "uint256"}
],
"name": "transfer",
"outputs": [{"name": "", "type": "bool"}],
"payable": False,
"stateMutability": "nonpayable",
"type": "function"
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "from", "type": "address"},
{"indexed": True, "name": "to", "type": "address"},
{"indexed": False, "name": "value", "type": "uint256"}
],
"name": "Transfer",
"type": "event"
}
]
def contract_interaction(w3):
"""Demonstrate smart contract interaction"""
if not w3:
print("❌ No Web3 connection available")
return
# WETH contract address on mainnet
weth_address = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
try:
# Create contract instance
contract = w3.eth.contract(address=weth_address, abi=erc20_abi)
print("📄 Contract Interaction:")
print(f" Contract Address: {contract.address}")
# Read contract data (view functions)
name = contract.functions.name().call()
symbol = contract.functions.symbol().call()
decimals = contract.functions.decimals().call()
total_supply = contract.functions.totalSupply().call()
print("\n📊 Contract Data:")
print(f" Name: {name}")
print(f" Symbol: {symbol}")
print(f" Decimals: {decimals}")
print(f" Total Supply: {w3.from_wei(total_supply, 'ether')} {symbol}")
# Get balance of specific address
address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
balance = contract.functions.balanceOf(address).call()
print("\n💼 Balance Information:")
print(f" Address: {address}")
print(f" Balance: {w3.from_wei(balance, 'ether')} {symbol}")
return contract
except Exception as e:
print(f"❌ Contract interaction error: {str(e)}")
return None
def write_to_contract(w3, contract):
"""Demonstrate writing to contract"""
if not w3 or not contract:
print("❌ No Web3 connection or contract available")
return
# Account setup
private_key = "YOUR_PRIVATE_KEY_HERE"
account = w3.eth.account.from_key(private_key)
try:
# Get nonce
nonce = w3.eth.get_transaction_count(account.address)
# Build transaction
transaction = contract.functions.transfer(
"0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
w3.to_wei(0.1, 'ether') # 0.1 WETH
).build_transaction({
'gas': 200000,
'gasPrice': w3.eth.gas_price,
'nonce': nonce,
'chainId': w3.eth.chain_id
})
# Sign transaction
signed_txn = w3.eth.account.sign_transaction(transaction, private_key)
print("\n✍️ Contract Write Transaction:")
print(f" From: {account.address}")
print(f" Function: transfer")
print(f" Value: 0.1 WETH")
print(f" Gas Limit: {transaction['gas']}")
print(f" Transaction Hash: {signed_txn.hash.hex()}")
# Send transaction (commented out for safety)
print("\n⏳ Ready to send (transaction commented out for safety)")
# Uncomment to actually send:
# tx_hash = w3.eth.send_raw_transaction(signed_txn.rawTransaction)
# receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
# print(f" Transaction confirmed in block: {receipt.blockNumber}")
return signed_txn
except Exception as e:
print(f"❌ Contract write error: {str(e)}")
return None
# ===== BLOCKCHAIN DATA OPERATIONS =====
print("\n=== BLOCKCHAIN DATA OPERATIONS =====")
def blockchain_info(w3):
"""Get blockchain information"""
if not w3:
print("❌ No Web3 connection available")
return
try:
# Get network information
chain_id = w3.eth.chain_id
block_number = w3.eth.block_number
gas_price = w3.eth.gas_price
print("🌐 Blockchain Information:")
print(f" Chain ID: {chain_id}")
print(f" Latest Block: {block_number}")
print(f" Gas Price: {w3.from_wei(gas_price, 'gwei')} gwei")
# Get latest block details
block = w3.eth.get_block('latest')
print("\n📦 Latest Block Details:")
print(f" Block Number: {block.number}")
print(f" Block Hash: {block.hash.hex()}")
print(f" Parent Hash: {block.parentHash.hex()}")
print(f" Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(block.timestamp))}")
print(f" Gas Limit: {block.gasLimit}")
print(f" Gas Used: {block.gasUsed}")
print(f" Transaction Count: {len(block.transactions)}")
return block
except Exception as e:
print(f"❌ Blockchain info error: {str(e)}")
return None
def transaction_details(w3, tx_hash):
"""Get transaction details"""
if not w3:
print("❌ No Web3 connection available")
return
try:
# Get transaction
tx = w3.eth.get_transaction(tx_hash)
print(f"📄 Transaction Details: {tx_hash}")
print(f" From: {tx['from']}")
print(f" To: {tx.get('to', 'Contract Creation')}")
print(f" Value: {w3.from_wei(tx['value'], 'ether')} ETH")
print(f" Gas: {tx['gas']}")
print(f" Gas Price: {w3.from_wei(tx['gasPrice'], 'gwei')} gwei")
print(f" Nonce: {tx['nonce']}")
print(f" Block Number: {tx['blockNumber']}")
# Get transaction receipt
receipt = w3.eth.get_transaction_receipt(tx_hash)
print("\n🧾 Transaction Receipt:")
print(f" Status: {'Success' if receipt['status'] == 1 else 'Failed'}")
print(f" Gas Used: {receipt['gasUsed']}")
print(f" Cumulative Gas Used: {receipt['cumulativeGasUsed']}")
print(f" Logs Count: {len(receipt['logs'])}")
return tx, receipt
except Exception as e:
print(f"❌ Transaction details error: {str(e)}")
return None, None
# ===== EVENT LISTENING =====
print("\n=== EVENT LISTENING =====")
def setup_event_listener(w3, contract):
"""Setup event listener for smart contract"""
if not w3 or not contract:
print("❌ No Web3 connection or contract available")
return
print("🎧 Setting up event listener...")
# Create filter for Transfer events
transfer_filter = contract.events.Transfer.create_filter(
from_block='latest',
argument_filters={'to': "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"}
)
print("📡 Listening for Transfer events to specific address...")
print(" (This would run in a separate thread in production)")
# Example of processing events
def process_events():
"""Process new events"""
try:
events = w3.eth.get_logs(transfer_filter.filter_params)
print(f" Found {len(events)} matching events")
for event in events:
# Decode log
decoded_event = contract.events.Transfer().process_log(event)
print(f" 🎯 Transfer: {decoded_event['args']}")
except Exception as e:
print(f" Event processing error: {str(e)}")
return transfer_filter, process_events
# ===== UTILITY FUNCTIONS =====
print("\n=== UTILITY FUNCTIONS =====")
def utility_examples():
"""Demonstrate Web3 utility functions"""
print("🔧 Web3 Utilities:")
# Unit conversions
wei_amount = 1000000000000000000 # 1 ETH in wei
eth_amount = Web3.from_wei(wei_amount, 'ether')
print(f"\n🔄 Unit Conversions:")
print(f" 1 ETH = {wei_amount} Wei")
print(f" {wei_amount} Wei = {eth_amount} ETH")
# Address operations
address = "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"
checksum_address = Web3.to_checksum_address(address)
print(f"\n📍 Address Operations:")
print(f" Original: {address}")
print(f" Checksum: {checksum_address}")
print(f" Is Address: {Web3.is_address(address)}")
# Hashing
message = "Hello, Ethereum!"
message_hash = Web3.keccak(text=message)
print(f"\n🔐 Hashing:")
print(f" Message: {message}")
print(f" Keccak256 Hash: {message_hash.hex()}")
# Encoding/Decoding
text = "Hello, Web3!"
hex_data = Web3.to_hex(text.encode('utf-8'))
decoded_text = Web3.to_text(hex_data)
print(f"\n📝 Encoding/Decoding:")
print(f" Text: {text}")
print(f" Hex: {hex_data}")
print(f" Decoded: {decoded_text}")
return {
'wei_amount': wei_amount,
'eth_amount': eth_amount,
'checksum_address': checksum_address,
'message_hash': message_hash,
'hex_data': hex_data
}
# ===== MAIN EXECUTION =====
def main():
"""Main execution function"""
print("🚀 WEB3.PY COMPLETE GUIDE")
print("=" * 50)
# Initialize Web3 connection
w3 = connect_to_infura()
if not w3:
print("❌ Could not establish Web3 connection")
return
try:
# Execute all examples
account, account_from_key = account_operations()
print("\n" + "-" * 50)
balance, nonce = wallet_operations(w3)
print("\n" + "-" * 50)
tx, signed_tx = basic_transaction(w3)
print("\n" + "-" * 50)
gas_estimate, gas_price = estimate_gas_example(w3)
print("\n" + "-" * 50)
contract = contract_interaction(w3)
print("\n" + "-" * 50)
write_to_contract(w3, contract)
print("\n" + "-" * 50)
block = blockchain_info(w3)
print("\n" + "-" * 50)
# Transaction details example (using a real transaction hash)
# transaction_details(w3, "0x...")
setup_event_listener(w3, contract)
print("\n" + "-" * 50)
utility_examples()
print("\n" + "-" * 50)
print("\n✅ ALL EXAMPLES COMPLETED SUCCESSFULLY!")
print("\n📚 Next Steps:")
print("1. Replace 'YOUR_PROJECT_ID' with actual Infura project ID")
print("2. Replace 'YOUR_PRIVATE_KEY_HERE' with your private key")
print("3. Uncomment transaction examples to test with real ETH")
print("4. Explore more complex contract interactions")
print("5. Learn about async Web3.py for better performance")
except Exception as e:
print(f"\n❌ FATAL ERROR: {str(e)}")
if __name__ == "__main__":
main()
# Export functions for individual testing
__all__ = [
'connect_to_geth',
'connect_to_infura',
'connect_to_websocket',
'account_operations',
'wallet_operations',
'basic_transaction',
'estimate_gas_example',
'contract_interaction',
'write_to_contract',
'blockchain_info',
'transaction_details',
'setup_event_listener',
'utility_examples',
]
💻 Asynchrone und erweiterte Contract-Operationen Web3.py python
🟡 intermediate
⭐⭐⭐⭐
Erweiterte Web3.py Patterns inklusive asynchrone Operationen, Batch-Requests, Event-Filtering und komplexe Contract-Interaktion
⏱️ 35 min
🏷️ web3py, async, contracts, advanced
Prerequisites:
Web3.py basics, Python async/await, Understanding of DeFi protocols
# Web3.py Async and Advanced Contract Operations
# Install: pip install web3 aiohttp
import asyncio
import json
import time
from web3 import Web3
from web3.middleware import geth_poa_middleware
from web3.contract import Contract
from typing import List, Dict, Any, Optional
# ===== ASYNC WEB3 SETUP =====
print("=== ASYNC WEB3 SETUP ===")
class AsyncWeb3Manager:
"""Async Web3 manager for high-performance operations"""
def __init__(self, provider_url: str):
self.provider_url = provider_url
self.w3 = None
self._loop = None
async def connect(self) -> bool:
"""Establish async connection"""
try:
# For async operations, we can use standard HTTP provider
# Web3.py doesn't have native async support yet, but we can work around it
self.w3 = Web3(Web3.HTTPProvider(self.provider_url))
if self.w3.is_connected():
print("✅ Async Web3 connection established")
return True
else:
print("❌ Failed to establish connection")
return False
except Exception as e:
print(f"❌ Connection error: {str(e)}")
return False
async def get_balance_batch(self, addresses: List[str]) -> Dict[str, int]:
"""Get balances for multiple addresses concurrently"""
if not self.w3:
return {}
print(f"🔄 Getting balances for {len(addresses)} addresses...")
# Create tasks for concurrent execution
tasks = []
for address in addresses:
task = asyncio.create_task(self._get_balance_async(address))
tasks.append(task)
# Wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
balances = {}
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f" ❌ Error getting balance for {addresses[i]}: {str(result)}")
else:
balances[addresses[i]] = result
return balances
async def _get_balance_async(self, address: str) -> int:
"""Async wrapper for get_balance"""
# In a real async implementation, this would use async HTTP calls
# For now, we simulate async behavior
await asyncio.sleep(0.1) # Simulate network delay
return self.w3.eth.get_balance(address)
async def get_transaction_receipt_batch(self, tx_hashes: List[str]) -> Dict[str, Any]:
"""Get transaction receipts for multiple transactions concurrently"""
if not self.w3:
return {}
print(f"🔄 Getting receipts for {len(tx_hashes)} transactions...")
tasks = []
for tx_hash in tx_hashes:
task = asyncio.create_task(self._get_receipt_async(tx_hash))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
receipts = {}
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f" ❌ Error getting receipt for {tx_hashes[i]}: {str(result)}")
else:
receipts[tx_hashes[i]] = result
return receipts
async def _get_receipt_async(self, tx_hash: str) -> Any:
"""Async wrapper for get_transaction_receipt"""
await asyncio.sleep(0.2) # Simulate network delay
return self.w3.eth.get_transaction_receipt(tx_hash)
# ===== BATCH OPERATIONS =====
print("\n=== BATCH OPERATIONS =====")
class BatchOperationManager:
"""Manager for batch blockchain operations"""
def __init__(self, w3: Web3):
self.w3 = w3
self.batch_size = 100
def create_batch_call(self, calls: List[Dict[str, Any]]) -> Any:
"""Create a batch call using eth_call for multiple operations"""
print(f"📦 Creating batch call with {len(calls)} operations")
# Prepare batch data
batch_data = []
for call in calls:
batch_data.append({
'jsonrpc': '2.0',
'method': 'eth_call',
'params': [call, 'latest'],
'id': len(batch_data)
})
return batch_data
def execute_batch_call(self, batch_data: List[Dict[str, Any]]) -> List[Any]:
"""Execute batch call (simplified - in production use proper JSON-RPC batch)"""
results = []
for call_data in batch_data:
try:
result = self.w3.eth.call(call_data['params'][0])
results.append(result)
except Exception as e:
print(f" ❌ Batch call error: {str(e)}")
results.append(None)
return results
def multi_call_balances(self, addresses: List[str]) -> Dict[str, int]:
"""Get multiple balances using multicall pattern"""
print(f"💼 Getting balances for {len(addresses)} addresses using multicall...")
# Create balance calls
balance_calls = []
for address in addresses:
# ETH balance call using different block positions
call_data = {
'to': address,
'data': '0x' # Empty data for balance
}
balance_calls.append(call_data)
# Execute batch (simplified example)
balances = {}
for i, address in enumerate(addresses):
try:
balance = self.w3.eth.get_balance(address)
balances[address] = balance
except Exception as e:
print(f" ❌ Error getting balance for {address}: {str(e)}")
balances[address] = 0
return balances
# ===== ADVANCED CONTRACT INTERACTION =====
print("\n=== ADVANCED CONTRACT INTERACTION =====")
# DeFi Protocol ABIs (simplified examples)
uniswap_v2_router_abi = [
{
"inputs": [
{"name": "amountOutMin", "type": "uint256"},
{"name": "path", "type": "address[]"},
{"name": "to", "type": "address"},
{"name": "deadline", "type": "uint256"}
],
"name": "swapExactETHForTokens",
"outputs": [{"name": "amounts", "type": "uint256[]"}],
"stateMutability": "payable",
"type": "function"
},
{
"inputs": [
{"name": "amountIn", "type": "uint256"},
{"name": "amountOutMin", "type": "uint256"},
{"name": "path", "type": "address[]"},
{"name": "to", "type": "address"},
{"name": "deadline", "type": "uint256"}
],
"name": "swapExactTokensForTokens",
"outputs": [{"name": "amounts", "type": "uint256[]"}],
"stateMutability": "nonpayable",
"type": "function"
}
]
erc20_abi = [
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"type": "function"
},
{
"constant": False,
"inputs": [
{"name": "_spender", "type": "address"},
{"name": "_value", "type": "uint256"}
],
"name": "approve",
"outputs": [{"name": "", "type": "bool"}],
"type": "function"
},
{
"constant": False,
"inputs": [
{"name": "_to", "type": "address"},
{"name": "_value", "type": "uint256"}
],
"name": "transfer",
"outputs": [{"name": "", "type": "bool"}],
"type": "function"
}
]
class DeFiProtocol:
"""DeFi protocol interaction class"""
def __init__(self, w3: Web3):
self.w3 = w3
self.protocols = {}
def add_protocol(self, name: str, address: str, abi: List[Dict]):
"""Add a DeFi protocol"""
contract = self.w3.eth.contract(address=address, abi=abi)
self.protocols[name] = {
'address': address,
'contract': contract,
'abi': abi
}
print(f"✅ Added protocol: {name} at {address}")
def get_token_price(self, token_address: str, base_token_address: str = None) -> Dict[str, Any]:
"""Get token price from multiple sources"""
print(f"💰 Getting price for token {token_address}")
prices = {}
# Method 1: Direct contract call (if price oracle available)
if base_token_address:
try:
# This would be implemented with actual price oracle
# For demo purposes, return mock data
prices['oracle'] = {
'price': 1.0,
'source': 'oracle',
'timestamp': int(time.time())
}
print(f" 📊 Oracle price: $1.00")
except Exception as e:
print(f" ❌ Oracle price error: {str(e)}")
# Method 2: DEX price (simplified)
try:
# This would calculate price from DEX reserves
prices['dex'] = {
'price': 0.98,
'source': 'dex',
'timestamp': int(time.time())
}
print(f" 🔄 DEX price: $0.98")
except Exception as e:
print(f" ❌ DEX price error: {str(e)}")
return prices
def calculate_swap_amounts(self, token_in: str, token_out: str, amount_in: int) -> Dict[str, Any]:
"""Calculate swap amounts and expected output"""
print(f"🔄 Calculating swap: {self.w3.from_wei(amount_in, 'ether')} {token_in} → {token_out}")
# Simplified swap calculation (in reality, use AMM formula)
reserves_in = 1000000 # Mock reserves
reserves_out = 2000000
# Constant product formula: x * y = k
# amount_out = reserves_out * amount_in / (reserves_in + amount_in) * 0.997 (0.3% fee)
fee = 0.003
amount_out = reserves_out * amount_in / (reserves_in + amount_in) * (1 - fee)
# Price impact
price_before = reserves_out / reserves_in
price_after = (reserves_out - amount_out) / (reserves_in + amount_in)
price_impact = abs(price_before - price_after) / price_before * 100
result = {
'amount_in': amount_in,
'amount_out': int(amount_out),
'price_before': price_before,
'price_after': price_after,
'price_impact': price_impact,
'fee': int(amount_in * fee),
'minimum_out': int(amount_out * 0.995) # 0.5% slippage protection
}
print(f" Expected out: {self.w3.from_wei(result['amount_out'], 'ether')} tokens")
print(f" Price impact: {result['price_impact']:.2f}%")
print(f" Fee: {self.w3.from_wei(result['fee'], 'ether')} tokens")
return result
def approve_token(self, token_address: str, spender_address: str, amount: int, private_key: str):
"""Approve token spending"""
print(f"✅ Approving {self.w3.from_wei(amount, 'ether')} tokens for {spender_address}")
try:
# Create token contract
token_contract = self.w3.eth.contract(address=token_address, abi=erc20_abi)
# Get account
account = self.w3.eth.account.from_key(private_key)
# Get nonce
nonce = self.w3.eth.get_transaction_count(account.address)
# Build transaction
transaction = token_contract.functions.approve(
spender_address,
amount
).build_transaction({
'gas': 50000,
'gasPrice': self.w3.eth.gas_price,
'nonce': nonce,
'chainId': self.w3.eth.chain_id
})
# Sign transaction
signed_tx = self.w3.eth.account.sign_transaction(transaction, private_key)
print(f" Transaction hash: {signed_tx.hash.hex()}")
print(" ⏳ Ready to send (transaction commented out for safety)")
# Uncomment to send:
# tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
# print(f" Approved in block: {receipt.blockNumber}")
return signed_tx
except Exception as e:
print(f"❌ Approval error: {str(e)}")
return None
# ===== EVENT PROCESSING =====
print("\n=== EVENT PROCESSING =====")
class EventProcessor:
"""Advanced event processing and analysis"""
def __init__(self, w3: Web3):
self.w3 = w3
self.event_cache = {}
self.filters = {}
def create_comprehensive_filter(self, contract_address: str, event_abi: List[Dict],
from_block: int = None, to_block: str = 'latest',
address_filters: List[str] = None) -> Any:
"""Create comprehensive event filter"""
contract = self.w3.eth.contract(address=contract_address, abi=event_abi)
# Create filter for all events
all_events_filter = contract.events.create_filter(
from_block=from_block,
to_block=to_block
)
print(f"📡 Created comprehensive filter for events from block {from_block}")
return {
'contract': contract,
'filter': all_events_filter,
'address': contract_address
}
def analyze_events(self, filter_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze events and generate insights"""
print("📊 Analyzing blockchain events...")
contract = filter_data['contract']
filter_params = filter_data['filter'].filter_params
try:
# Get all logs
logs = self.w3.eth.get_logs(filter_params)
analysis = {
'total_events': len(logs),
'unique_addresses': set(),
'event_types': {},
'gas_used_total': 0,
'time_range': {},
'top_addresses': {},
'hourly_distribution': {}
}
# Process each log
for log in logs:
analysis['unique_addresses'].add(log.address)
# Get block for timestamp
block = self.w3.eth.get_block(log.blockNumber)
# Decode log (simplified)
try:
# This would decode using the actual ABI
event_type = log.topics[0][:10] if log.topics else 'unknown'
analysis['event_types'][event_type] = analysis['event_types'].get(event_type, 0) + 1
except:
pass
# Time analysis
hour = time.strftime('%H', time.localtime(block.timestamp))
analysis['hourly_distribution'][hour] = analysis['hourly_distribution'].get(hour, 0) + 1
# Convert sets to lists for JSON serialization
analysis['unique_addresses'] = list(analysis['unique_addresses'])
print(f" Found {analysis['total_events']} events")
print(f" Unique addresses: {len(analysis['unique_addresses'])}")
print(f" Event types: {len(analysis['event_types'])}")
return analysis
except Exception as e:
print(f"❌ Event analysis error: {str(e)}")
return {}
def track_real_time_events(self, contract_address: str, event_abi: List[Dict],
callback=None):
"""Track real-time events (conceptual example)"""
print("🎯 Setting up real-time event tracking...")
contract = self.w3.eth.contract(address=contract_address, abi=event_abi)
# Create filter for new blocks
new_block_filter = self.w3.eth.filter('latest')
print(" 📡 Monitoring for new blocks...")
print(" (In production, use WebSocket or polling with proper error handling)")
# Event tracking logic
def process_new_block(block_hash):
try:
block = self.w3.eth.get_block(block_hash)
# Get transactions in this block
for tx_hash in block.transactions:
tx = self.w3.eth.get_transaction(tx_hash)
# Check if transaction interacts with our contract
if tx.to and tx.to.lower() == contract_address.lower():
print(f" 🎯 Contract interaction: {tx_hash.hex()}")
# Get receipt for events
receipt = self.w3.eth.get_transaction_receipt(tx_hash)
for log in receipt.logs:
if log.address == contract_address:
print(f" 📝 Event in transaction: {log.topics[0][:10] if log.topics else 'unknown'}")
# Call custom callback if provided
if callback:
callback(log, tx)
except Exception as e:
print(f" ❌ Error processing block {block_hash}: {str(e)}")
# Return processing function
return process_new_block
def export_events_to_csv(self, events: List[Dict], filename: str):
"""Export events to CSV file"""
import csv
print(f"💾 Exporting {len(events)} events to {filename}")
if not events:
print(" No events to export")
return
# Define CSV headers
headers = ['block_number', 'transaction_hash', 'address', 'topics', 'data', 'log_index']
try:
with open(filename, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
for event in events:
row = {
'block_number': event.get('blockNumber', ''),
'transaction_hash': event.get('transactionHash', ''),
'address': event.get('address', ''),
'topics': str(event.get('topics', [])),
'data': event.get('data', ''),
'log_index': event.get('logIndex', '')
}
writer.writerow(row)
print(f" ✅ Events exported to {filename}")
except Exception as e:
print(f" ❌ Export error: {str(e)}")
# ===== MAIN EXECUTION =====
async def main():
"""Main async execution function"""
print("🚀 ADVANCED WEB3.PY EXAMPLES")
print("=" * 50)
# Initialize Web3 connection
w3 = Web3(Web3.HTTPProvider('https://mainnet.infura.io/v3/YOUR_PROJECT_ID'))
if not w3.is_connected():
print("❌ Could not establish Web3 connection")
return
try:
# Execute async examples
print("\n" + "-" * 50)
async_manager = AsyncWeb3Manager('https://mainnet.infura.io/v3/YOUR_PROJECT_ID')
await async_manager.connect()
# Test batch operations
batch_manager = BatchOperationManager(w3)
test_addresses = [
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045", # Vitalik
"0x742d35Cc6634C0532925a3b8D4C9db96C4b4Db45", # Binance
"0x47ac0Fb4F2D84898e4D9E7b4Dab3C24592667209", # Kraken
]
balances = await async_manager.get_balance_batch(test_addresses)
print(f"📊 Batch balances retrieved: {len(balances)}")
# DeFi protocol examples
print("\n" + "-" * 50)
defi = DeFiProtocol(w3)
# Add Uniswap V2 Router
uniswap_router = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
defi.add_protocol("uniswap_v2_router", uniswap_router, uniswap_v2_router_abi)
# Calculate swap amounts
swap_calculation = defi.calculate_swap_amounts(
"0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8", # Mock token address
"0xB0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E9", # Mock token address
w3.to_wei(1, 'ether')
)
# Event processing examples
print("\n" + "-" * 50)
event_processor = EventProcessor(w3)
# Create event filter for WETH transfers
weth_address = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
filter_data = event_processor.create_comprehensive_filter(
weth_address,
erc20_abi,
from_block=w3.eth.block_number - 100, # Last 100 blocks
)
# Analyze events
event_analysis = event_processor.analyze_events(filter_data)
print(f"📈 Event analysis: {len(event_analysis)} metrics")
# Export sample events
sample_events = [
{
'blockNumber': 12345,
'transactionHash': '0xabc123...',
'address': weth_address,
'topics': ['0x...'],
'data': '0x...',
'logIndex': 0
}
]
event_processor.export_events_to_csv(sample_events, 'events_export.csv')
print("\n✅ ALL ADVANCED EXAMPLES COMPLETED!")
print("\n📚 Advanced Features Demonstrated:")
print("✅ Async Web3 operations")
print("✅ Batch request processing")
print("✅ DeFi protocol integration")
print("✅ Event processing and analysis")
print("✅ Real-time event tracking")
print("✅ Data export functionality")
print("\n🔧 Production Recommendations:")
print("1. Use proper error handling and retries")
print("2. Implement rate limiting for API calls")
print("3. Use WebSocket providers for real-time data")
print("4. Cache frequently accessed data")
print("5. Monitor gas prices and optimize transactions")
except Exception as e:
print(f"\n❌ FATAL ERROR: {str(e)}")
# Additional utility functions
def create_contract_instance(w3, address: str, abi_file: str) -> Optional[Contract]:
"""Create contract instance from ABI file"""
try:
with open(abi_file, 'r') as f:
abi = json.load(f)
contract = w3.eth.contract(address=address, abi=abi)
print(f"✅ Contract instance created for {address}")
return contract
except Exception as e:
print(f"❌ Error creating contract instance: {str(e)}")
return None
def monitor_gas_prices(w3, duration: int = 60):
"""Monitor gas prices over time"""
print(f"⛽ Monitoring gas prices for {duration} seconds...")
start_time = time.time()
gas_prices = []
while time.time() - start_time < duration:
try:
gas_price = w3.eth.gas_price
gas_prices.append({
'timestamp': int(time.time()),
'gas_price': gas_price,
'gas_price_gwei': w3.from_wei(gas_price, 'gwei')
})
print(f" Current gas price: {w3.from_wei(gas_price, 'gwei')} gwei")
time.sleep(5)
except Exception as e:
print(f" ❌ Error getting gas price: {str(e)}")
break
if gas_prices:
avg_gas = sum(gp['gas_price'] for gp in gas_prices) / len(gas_prices)
print(f"\n📊 Gas Price Summary:")
print(f" Samples: {len(gas_prices)}")
print(f" Average: {w3.from_wei(avg_gas, 'gwei')} gwei")
print(f" Min: {min(gp['gas_price_gwei'] for gp in gas_prices):.2f} gwei")
print(f" Max: {max(gp['gas_price_gwei'] for gp in gas_prices):.2f} gwei")
return gas_prices
if __name__ == "__main__":
asyncio.run(main())
# Export for individual testing
__all__ = [
'AsyncWeb3Manager',
'BatchOperationManager',
'DeFiProtocol',
'EventProcessor',
'create_contract_instance',
'monitor_gas_prices',
]
💻 DeFi Integration Web3.py python
🔴 complex
⭐⭐⭐⭐⭐
Vollständige DeFi-Integrationsbeispiele inklusive Uniswap, Compound, Aave, Yield Farming und automatisierte Trading-Strategien
⏱️ 45 min
🏷️ web3py, defi, yield, trading, advanced
Prerequisites:
Web3.py advanced, Understanding of DeFi protocols, Python Decimal precision, Financial concepts
# Web3.py DeFi Integration
# Complete DeFi ecosystem integration with popular protocols
import asyncio
import json
import time
import math
from decimal import Decimal, getcontext
from typing import Dict, List, Optional, Tuple
from web3 import Web3
from web3.middleware import geth_poa_middleware
# Set high precision for Decimal calculations
getcontext().prec = 18
# ===== DEFI PROTOCOL ABIs =====
print("=== DEFI PROTOCOL ABIs ===")
# Uniswap V2 Router ABI (simplified)
uniswap_v2_router_abi = [
{
"inputs": [
{"name": "amountOutMin", "type": "uint256"},
{"name": "path", "type": "address[]"},
{"name": "to", "type": "address"},
{"name": "deadline", "type": "uint256"}
],
"name": "swapExactETHForTokens",
"outputs": [{"name": "amounts", "type": "uint256[]"}],
"stateMutability": "payable",
"type": "function"
},
{
"inputs": [
{"name": "amountIn", "type": "uint256"},
{"name": "amountOutMin", "type": "uint256"},
{"name": "path", "type": "address[]"},
{"name": "to", "type": "address"},
{"name": "deadline", "type": "uint256"}
],
"name": "swapExactTokensForTokens",
"outputs": [{"name": "amounts", "type": "uint256[]"}],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{"name": "tokenA", "type": "address"},
{"name": "tokenB", "type": "address"},
{"name": "amountADesired", "type": "uint256"},
{"name": "amountBDesired", "type": "uint256"},
{"name": "amountAMin", "type": "uint256"},
{"name": "amountBMin", "type": "uint256"},
{"name": "to", "type": "address"},
{"name": "deadline", "type": "uint256"}
],
"name": "addLiquidity",
"outputs": [
{"name": "amountA", "type": "uint256"},
{"name": "amountB", "type": "uint256"},
{"name": "liquidity", "type": "uint256"}
],
"stateMutability": "nonpayable",
"type": "function"
}
]
# Uniswap V2 Pair ABI (simplified)
uniswap_v2_pair_abi = [
{
"constant": True,
"inputs": [],
"name": "getReserves",
"outputs": [
{"name": "reserve0", "type": "uint112"},
{"name": "reserve1", "type": "uint112"},
{"name": "blockTimestampLast", "type": "uint32"}
],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": True,
"inputs": [],
"name": "token0",
"outputs": [{"name": "", "type": "address"}],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": True,
"inputs": [],
"name": "token1",
"outputs": [{"name": "", "type": "address"}],
"payable": False,
"stateMutability": "view",
"type": "function"
}
]
# ERC20 Token ABI
erc20_abi = [
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": False,
"inputs": [
{"name": "_spender", "type": "address"},
{"name": "_value", "type": "uint256"}
],
"name": "approve",
"outputs": [{"name": "", "type": "bool"}],
"payable": False,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": False,
"inputs": [
{"name": "_to", "type": "address"},
{"name": "_value", "type": "uint256"}
],
"name": "transfer",
"outputs": [{"name": "", "type": "bool"}],
"payable": False,
"stateMutability": "nonpayable",
"type": "function"
}
]
# Compound cToken ABI (simplified)
compound_ctoken_abi = [
{
"constant": False,
"inputs": [{"name": "mint", "type": "uint256"}],
"name": "mint",
"outputs": [{"name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": False,
"inputs": [{"name": "redeem", "type": "uint256"}],
"name": "redeem",
"outputs": [{"name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": True,
"inputs": [],
"name": "exchangeRateCurrent",
"outputs": [{"name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function"
},
{
"constant": True,
"inputs": [],
"name": "supplyRatePerBlock",
"outputs": [{"name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function"
}
]
# ===== YIELD FARMING MANAGER =====
class YieldFarmingManager:
"""Comprehensive yield farming management"""
def __init__(self, w3: Web3, private_key: str):
self.w3 = w3
self.account = w3.eth.account.from_key(private_key)
self.protocols = {}
self.positions = {}
self.apr_cache = {}
def add_protocol(self, name: str, config: Dict[str, Any]):
"""Add a DeFi protocol configuration"""
self.protocols[name] = config
print(f"✅ Added protocol: {name}")
def get_token_price(self, token_address: str, base_token: str = None) -> Decimal:
"""Get token price from Uniswap"""
print(f"💰 Getting price for token {token_address}")
if token_address.lower() == "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2".lower(): # WETH
return Decimal('1.0')
# Create WETH/token pair
pair_address = self.get_pair_address(token_address, "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")
if pair_address:
try:
pair_contract = self.w3.eth.contract(
address=pair_address,
abi=uniswap_v2_pair_abi
)
reserves = pair_contract.functions.getReserves().call()
token0 = pair_contract.functions.token0().call()
# Calculate price based on reserves
if token0.lower() == token_address.lower():
price = Decimal(reserves[1]) / Decimal(reserves[0])
else:
price = Decimal(reserves[0]) / Decimal(reserves[1])
print(f" 📊 Uniswap price: ${price:.6f}")
return price
except Exception as e:
print(f" ❌ Uniswap price error: {str(e)}")
# Fallback to other price sources
print(f" ⚠️ Using fallback price source")
return Decimal('1.0')
def get_pair_address(self, token_a: str, token_b: str) -> Optional[str]:
"""Get Uniswap pair address (simplified)"""
# This would use the Uniswap V2 Factory to get pair address
# For demo purposes, return mock addresses
pairs = {
"0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8-0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0xBb2b8038a1640196FbE3e38816F3e67Cba72D940",
"0x6B175474E89094C44Da98b954EedeAC495271d0F-0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0xA478c2975Ab1Ea89e8196811F51A7B7Ade33eB11"
}
key = f"{token_a}-{token_b}"
reverse_key = f"{token_b}-{token_a}"
return pairs.get(key) or pairs.get(reverse_key)
def calculate_uniswap_swap(self, amount_in: int, token_in: str, token_out: str) -> Dict[str, Decimal]:
"""Calculate Uniswap swap amounts"""
print(f"🔄 Calculating Uniswap swap: {self.w3.from_wei(amount_in, 'ether')} {token_in} → {token_out}")
pair_address = self.get_pair_address(token_in, token_out)
if not pair_address:
raise ValueError("No liquidity pair found")
try:
pair_contract = self.w3.eth.contract(
address=pair_address,
abi=uniswap_v2_pair_abi
)
reserves = pair_contract.functions.getReserves().call()
token0 = pair_contract.functions.token0().call()
# Determine which reserve corresponds to which token
if token0.lower() == token_in.lower():
reserve_in, reserve_out = reserves[0], reserves[1]
else:
reserve_in, reserve_out = reserves[1], reserves[0]
# Uniswap constant product formula: x * y = k
# Apply 0.3% fee
fee = Decimal('0.003')
amount_in_with_fee = Decimal(amount_in)
numerator = amount_in_with_fee * Decimal(reserve_out)
denominator = Decimal(reserve_in) + amount_in_with_fee
amount_out = numerator / denominator
# Calculate price impact
price_before = Decimal(reserve_out) / Decimal(reserve_in)
price_after = (Decimal(reserve_out) - amount_out) / (Decimal(reserve_in) + amount_in)
price_impact = abs(price_before - price_after) / price_before * Decimal('100')
result = {
'amount_in': Decimal(amount_in),
'amount_out': amount_out,
'price_before': price_before,
'price_after': price_after,
'price_impact': price_impact,
'fee': amount_in_with_fee * fee,
'minimum_out': amount_out * Decimal('0.995') # 0.5% slippage
}
print(f" Expected out: {amount_out:.6f} tokens")
print(f" Price impact: {price_impact:.2f}%")
print(f" Fee: {result['fee']:.6f} tokens")
print(f" Minimum out (0.5% slippage): {result['minimum_out']:.6f} tokens")
return result
except Exception as e:
print(f"❌ Swap calculation error: {str(e)}")
raise
def execute_uniswap_swap(self, amount_in: int, token_in: str, token_out: str,
slippage_tolerance: float = 0.5) -> Dict[str, Any]:
"""Execute Uniswap swap"""
print(f"🚀 Executing Uniswap swap")
# Calculate expected output
swap_calc = self.calculate_uniswap_swap(amount_in, token_in, token_out)
# Get Uniswap Router contract
router_address = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
router_contract = self.w3.eth.contract(
address=router_address,
abi=uniswap_v2_router_abi
)
try:
# Build swap path
path = [token_in, token_out]
# Calculate minimum output with slippage tolerance
min_out = int(swap_calc['amount_out'] * (Decimal('1') - Decimal(slippage_tolerance / 100)))
# Get deadline (20 minutes from now)
deadline = int(time.time()) + 1200
# Get nonce and build transaction
nonce = self.w3.eth.get_transaction_count(self.account.address)
if token_in.lower() == "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2".lower(): # WETH
# ETH to token swap
transaction = router_contract.functions.swapExactETHForTokens(
min_out,
path,
self.account.address,
deadline
).build_transaction({
'value': amount_in,
'gas': 200000,
'gasPrice': self.w3.eth.gas_price,
'nonce': nonce,
'chainId': self.w3.eth.chain_id
})
else:
# Token to token swap
# First need to approve token spending
self.approve_token(token_in, router_address, amount_in)
transaction = router_contract.functions.swapExactTokensForTokens(
amount_in,
min_out,
path,
self.account.address,
deadline
).build_transaction({
'gas': 200000,
'gasPrice': self.w3.eth.gas_price,
'nonce': nonce + 1,
'chainId': self.w3.eth.chain_id
})
# Sign transaction
signed_tx = self.w3.eth.account.sign_transaction(transaction, self.account.key)
print(f" Transaction hash: {signed_tx.hash.hex()}")
print(" ⏳ Ready to send (transaction commented out for safety)")
# Uncomment to send:
# tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
# print(f" ✅ Swap completed in block: {receipt.blockNumber}")
return {
'transaction': signed_tx,
'calculation': swap_calc,
'expected_min_out': min_out
}
except Exception as e:
print(f"❌ Swap execution error: {str(e)}")
raise
def approve_token(self, token_address: str, spender_address: str, amount: int):
"""Approve token spending"""
print(f"✅ Approving token {token_address} for spender {spender_address}")
try:
token_contract = self.w3.eth.contract(
address=token_address,
abi=erc20_abi
)
nonce = self.w3.eth.get_transaction_count(self.account.address)
transaction = token_contract.functions.approve(
spender_address,
amount
).build_transaction({
'gas': 50000,
'gasPrice': self.w3.eth.gas_price,
'nonce': nonce,
'chainId': self.w3.eth.chain_id
})
signed_tx = self.w3.eth.account.sign_transaction(transaction, self.account.key)
print(f" Approval transaction hash: {signed_tx.hash.hex()}")
print(" ⏳ Ready to send (transaction commented out for safety)")
return signed_tx
except Exception as e:
print(f"❌ Approval error: {str(e)}")
raise
# ===== LENDING PROTOCOL MANAGER =====
class LendingProtocolManager:
"""Manage lending protocols like Compound and Aave"""
def __init__(self, w3: Web3, private_key: str):
self.w3 = w3
self.account = w3.eth.account.from_key(private_key)
self.protocols = {}
def add_protocol(self, name: str, config: Dict[str, Any]):
"""Add a lending protocol"""
self.protocols[name] = config
print(f"✅ Added lending protocol: {name}")
def calculate_compound_apy(self, token_address: str) -> Dict[str, Decimal]:
"""Calculate Compound APY for a token"""
print(f"📈 Calculating Compound APY for token {token_address}")
# This would use actual Compound protocol data
# For demo purposes, use mock data
supply_rate = Decimal('0.05') / Decimal(365) / Decimal(86400) # 5% annual
# Blocks per day (Ethereum ~ 6,500 blocks/day)
blocks_per_day = Decimal(6500)
eth_blocks_per_year = blocks_per_day * Decimal(365)
# Calculate APY
apy = (Decimal('1') + supply_rate * eth_blocks_per_year) - Decimal('1')
result = {
'supply_rate': supply_rate,
'apy': apy * Decimal('100'),
'blocks_per_day': blocks_per_day,
'eth_blocks_per_year': eth_blocks_per_year
}
print(f" Supply Rate: {supply_rate:.8f} per block")
print(f" APY: {apy * Decimal('100'):.2f}%")
return result
def supply_to_compound(self, token_address: str, amount: int) -> Dict[str, Any]:
"""Supply tokens to Compound"""
print(f"💰 Supplying {self.w3.from_wei(amount, 'ether')} tokens to Compound")
# This would interact with the actual Compound cToken
ctoken_address = self.get_ctoken_address(token_address)
if not ctoken_address:
raise ValueError("No cToken found for this token")
try:
ctoken_contract = self.w3.eth.contract(
address=ctoken_address,
abi=compound_ctoken_abi
)
# Approve Compound to spend tokens
compound_comptroller = "0x3d9819210a31b4961b30ef54be2aed79b9c9cd3b7"
self.approve_token(token_address, ctoken_address, amount)
nonce = self.w3.eth.get_transaction_count(self.account.address)
transaction = ctoken_contract.functions.mint(amount).build_transaction({
'gas': 200000,
'gasPrice': self.w3.eth.gas_price,
'nonce': nonce,
'chainId': self.w3.eth.chain_id
})
signed_tx = self.w3.eth.account.sign_transaction(transaction, self.account.key)
print(f" Mint transaction hash: {signed_tx.hash.hex()}")
print(" ⏳ Ready to send (transaction commented out for safety)")
return {
'transaction': signed_tx,
'token_address': token_address,
'ctoken_address': ctoken_address,
'amount': amount
}
except Exception as e:
print(f"❌ Compound supply error: {str(e)}")
raise
def get_ctoken_address(self, token_address: str) -> Optional[str]:
"""Get corresponding cToken address"""
# Mock mapping - in reality, this would query Compound
ctokens = {
"0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8": "0x39AA39c021dfbaE8faC545936693aC917d5E75663", # USDC
"0x6B175474E89094C44Da98b954EedeAC495271d0F": "0x4Ddc2D193948926D02F9B1fE9e1daaE1C1d329329", # DAI
"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2": "0x4Ddc2D193948926D02F9B1fE9e1daaE1C1d329329", # WETH (same as DAI in this mock)
}
return ctokens.get(token_address.lower())
# ===== AUTOMATED TRADING STRATEGY =====
class AutomatedTradingStrategy:
"""Automated trading strategy with risk management"""
def __init__(self, w3: Web3, private_key: str):
self.w3 = w3
self.account = w3.eth.account.from_key(private_key)
self.farming_manager = YieldFarmingManager(w3, private_key)
self.lending_manager = LendingProtocolManager(w3, private_key)
self.trading_history = []
def calculate_yield_farming_apy(self, pair_address: str, staking_apr: float = None) -> Dict[str, Decimal]:
"""Calculate total APY for yield farming (trading fees + staking)"""
print(f"📊 Calculating yield farming APY for {pair_address}")
# Mock data for daily trading volume and fees
daily_volume = Decimal('1000000') # $1M daily volume
trading_fee_rate = Decimal('0.003') # 0.3%
daily_fees = daily_volume * trading_fee_rate
# Calculate pool reserves (mock)
total_liquidity = Decimal('5000000') # $5M total liquidity
our_share = Decimal('0.01') # 1% of pool
our_liquidity = total_liquidity * our_share
# Calculate our share of daily fees
our_daily_fees = daily_fees * our_share
our_daily_earnings = our_daily_fees / our_liquidity
# Annualize trading fee APY
trading_fee_apy = our_daily_earnings * Decimal('365')
# Add staking rewards if provided
total_apy = trading_fee_apy
if staking_apr:
staking_apy = Decimal(staking_apr) / Decimal('100')
total_apy += staking_apy
result = {
'daily_volume': daily_volume,
'daily_fees': daily_fees,
'our_daily_fees': our_daily_fees,
'trading_fee_apy': trading_fee_apy * Decimal('100'),
'staking_apy': staking_apr or 0,
'total_apy': total_apy * Decimal('100'),
'our_liquidity': our_liquidity
}
print(f" Daily Volume: ${daily_volume:,.0f}")
print(f" Trading Fee APY: {trading_fee_apy * Decimal('100'):.2f}%")
print(f" Staking APR: {staking_apr or 0}%")
print(f" Total APY: {total_apy * Decimal('100'):.2f}%")
return result
def implement_flash_loan_arbitrage(self, token_a: str, token_b: str,
amount: int) -> Dict[str, Any]:
"""Implement flash loan arbitrage strategy"""
print(f"⚡ Implementing flash loan arbitrage: {token_a} ↔ {token_b}")
# Calculate price differences between exchanges
price_uniswap = self.farming_manager.get_token_price(token_a, token_b)
price_sushiswap = self.get_sushiswap_price(token_a, token_b) # Mock function
price_diff = abs(price_uniswap - price_sushiswap)
price_diff_pct = (price_diff / min(price_uniswap, price_sushiswap)) * 100
if price_diff_pct < 1: # Less than 1% difference
print(f" ⚠️ Price difference ({price_diff_pct:.2f}%) too small for arbitrage")
return {'profitable': False, 'reason': 'Price difference too small'}
# Calculate potential profit
# This is simplified - actual implementation would account for:
# - Flash loan fees (usually 0.09%)
# - Gas costs
# - Slippage
# - DEX fees
flash_loan_fee = Decimal('0.0009') # 0.09%
estimated_profit = amount * Decimal(price_diff_pct / 100) * Decimal('0.99')
flash_loan_cost = amount * flash_loan_fee
net_profit = estimated_profit - flash_loan_cost
if net_profit <= 0:
print(f" ❌ Net profit negative after fees: ${self.w3.from_wei(int(net_profit), 'ether')}")
return {'profitable': False, 'reason': 'No profit after fees'}
result = {
'profitable': True,
'price_diff_pct': price_diff_pct,
'estimated_profit': estimated_profit,
'flash_loan_cost': flash_loan_cost,
'net_profit': net_profit,
'amount': amount
}
print(f" 💰 Potential profit: ${self.w3.from_wei(int(net_profit), 'ether')}")
print(f" Flash loan cost: ${self.w3.from_wei(int(flash_loan_cost), 'ether')}")
return result
def get_sushiswap_price(self, token_a: str, token_b: str) -> float:
"""Get price from Sushiswap (mock)"""
# This would use actual Sushiswap data
return 1.02 # Mock price with slight difference
def rebalance_portfolio(self, target_allocations: Dict[str, float]) -> Dict[str, Any]:
"""Rebalance portfolio to target allocations"""
print("🔄 Rebalancing portfolio to target allocations")
current_value = self.calculate_portfolio_value()
total_value = sum(current_value.values())
rebalance_actions = []
for token, target_pct in target_allocations.items():
target_value = total_value * Decimal(str(target_pct))
current_token_value = current_value.get(token, Decimal('0'))
diff = target_value - current_token_value
diff_pct = (diff / total_value) * 100
if abs(diff_pct) > 1: # Rebalance if difference > 1%
action = {
'token': token,
'current_value': current_token_value,
'target_value': target_value,
'difference': diff,
'difference_pct': diff_pct,
'action': 'buy' if diff > 0 else 'sell'
}
rebalance_actions.append(action)
print(f" {token}: {action['action']} ${abs(diff_pct):.2f}% ({'buy' if diff > 0 else 'sell'})")
return {
'total_value': total_value,
'current_allocations': {k: v/total_value*100 for k, v in current_value.items()},
'target_allocations': target_allocations,
'rebalance_actions': rebalance_actions,
'actions_required': len(rebalance_actions) > 0
}
def calculate_portfolio_value(self) -> Dict[str, Decimal]:
"""Calculate current portfolio value"""
# Mock portfolio data - in reality, this would query blockchain
return {
'ETH': Decimal('10.5'),
'USDC': Decimal('5000'),
'DAI': Decimal('3000'),
'WBTC': Decimal('0.5')
}
def run_strategy(self, strategy_config: Dict[str, Any]):
"""Run automated trading strategy"""
print("🤖 Running automated trading strategy")
strategy_type = strategy_config.get('type')
if strategy_type == 'yield_farming':
# Implement yield farming strategy
pairs = strategy_config.get('pairs', [])
for pair in pairs:
apy_data = self.calculate_yield_farming_apy(pair)
if apy_data['total_apy'] > Decimal('20'): # Target > 20% APY
print(f" 🎯 High APY opportunity: {pair} - {apy_data['total_apy']:.2f}%")
elif strategy_type == 'arbitrage':
# Implement arbitrage strategy
pairs = strategy_config.get('pairs', [])
for token_a, token_b in pairs:
result = self.implement_flash_loan_arbitrage(token_a, token_b,
strategy_config.get('amount', 1000000))
if result['profitable']:
print(f" 💰 Profitable arbitrage found: {token_a}/{token_b}")
elif strategy_type == 'portfolio_rebalance':
# Implement portfolio rebalancing
target_allocs = strategy_config.get('target_allocations', {})
rebalance_result = self.rebalance_portfolio(target_allocs)
if rebalance_result['actions_required']:
print(f" 🔄 Rebalancing required: {len(rebalance_result['rebalance_actions'])} actions")
print(" ✅ Strategy execution completed")
# ===== MAIN EXECUTION =====
async def main():
"""Main async execution function"""
print("🚀 WEB3.PY DEFI INTEGRATION")
print("=" * 50)
# Initialize Web3 connection
w3 = Web3(Web3.HTTPProvider('https://mainnet.infura.io/v3/YOUR_PROJECT_ID'))
if not w3.is_connected():
print("❌ Could not establish Web3 connection")
return
# Initialize with test private key (DO NOT use real private key in production)
private_key = "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
try:
# Initialize managers
farming_manager = YieldFarmingManager(w3, private_key)
lending_manager = LendingProtocolManager(w3, private_key)
trading_strategy = AutomatedTradingStrategy(w3, private_key)
# Add protocol configurations
farming_manager.add_protocol("uniswap_v2", {
'router': '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D',
'factory': '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f'
})
# Yield farming example
print("\n" + "-" * 50)
farming_manager.calculate_uniswap_swap(
w3.to_wei(1, 'ether'),
"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", # WETH
"0x6B175474E89094C44Da98b954EedeAC495271d0F" # DAI
)
# Lending example
print("\n" + "-" * 50)
lending_manager.calculate_compound_apy("0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8") # USDC
# Yield farming APY calculation
print("\n" + "-" * 50)
trading_strategy.calculate_yield_farming_apy(
"0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8-0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
staking_apr=5.0
)
# Arbitrage example
print("\n" + "-" * 50)
trading_strategy.implement_flash_loan_arbitrage(
"0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8", # USDC
"0x6B175474E89094C44Da98b954EedeAC495271d0F", # DAI
w3.to_wei(100000, 'ether') # $100K
)
# Portfolio rebalancing
print("\n" + "-" * 50)
target_allocations = {
'ETH': 0.40, # 40%
'USDC': 0.30, # 30%
'DAI': 0.20, # 20%
'WBTC': 0.10 # 10%
}
trading_strategy.rebalance_portfolio(target_allocations)
# Run automated strategy
print("\n" + "-" * 50)
strategy_config = {
'type': 'yield_farming',
'pairs': [
"0xA0b86a33E6441e0a6c6C4a9679B5c4a7E8E8E8E8-0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
],
'min_apy': 15.0
}
trading_strategy.run_strategy(strategy_config)
print("\n✅ ALL DEFI EXAMPLES COMPLETED!")
print("\n📚 DeFi Features Demonstrated:")
print("✅ Uniswap DEX integration")
print("✅ Compound lending protocol")
print("✅ Yield farming APY calculations")
print("✅ Flash loan arbitrage strategies")
print("✅ Portfolio management and rebalancing")
print("✅ Automated trading strategies")
print("✅ Risk management and slippage protection")
print("\n⚠️ IMPORTANT SECURITY NOTES:")
print("1. Never use real private keys in code examples")
print("2. Always use proper error handling and retries")
print("3. Implement proper slippage protection")
print("4. Monitor gas prices and network congestion")
print("5. Use reputable DeFi protocols and audits")
print("6. Test on testnets before mainnet deployment")
print("7. Consider using oracles for price data")
print("8. Implement proper access controls")
except Exception as e:
print(f"\n❌ FATAL ERROR: {str(e)}")
# Additional utility functions
def calculate_impermanent_loss(price_initial: float, price_final: float) -> float:
"""Calculate impermanent loss"""
if price_final == 0:
return 1.0 # 100% loss
price_ratio = price_final / price_initial
sqrt_ratio = math.sqrt(price_ratio)
# Impermanent loss formula: 2 * sqrt(ratio) / (1 + ratio) - 1
impermanent_loss = (2 * sqrt_ratio / (1 + price_ratio) - 1) * 100
return impermanent_loss
def monitor_defi_protocols(w3, duration: int = 300):
"""Monitor DeFi protocols for opportunities"""
print(f"🔍 Monitoring DeFi protocols for {duration} seconds...")
opportunities = []
end_time = time.time() + duration
while time.time() < end_time:
try:
# This would monitor real DeFi protocols
# For demo purposes, simulate finding opportunities
if time.time() % 60 < 1: # Every minute
opportunity = {
'timestamp': int(time.time()),
'protocol': 'Uniswap V2',
'pair': 'ETH/USDC',
'apr': random.uniform(10, 50),
'liquidity': random.uniform(100000, 1000000),
'volume_24h': random.uniform(500000, 5000000)
}
opportunities.append(opportunity)
if opportunity['apr'] > 30:
print(f" 🎯 High APY opportunity: {opportunity['pair']} - {opportunity['apr']:.2f}%")
time.sleep(10)
except Exception as e:
print(f" ❌ Monitoring error: {str(e)}")
break
print(f"\n📊 Found {len(opportunities)} opportunities")
return opportunities
if __name__ == "__main__":
asyncio.run(main())
# Export for individual testing
__all__ = [
'YieldFarmingManager',
'LendingProtocolManager',
'AutomatedTradingStrategy',
'calculate_impermanent_loss',
'monitor_defi_protocols',
]