🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples Apache Thrift
Exemples du framework Apache Thrift pour le développement de services multi-langages avec définitions IDL et implémentations client/serveur
💻 Définition de Service Thrift IDL thrift
🟢 simple
⭐⭐
Définitions Thrift Interface Definition Language (IDL) de base avec définitions de services, structures de données et énumérations
⏱️ 15 min
🏷️ thrift, idl, rpc, microservices
Prerequisites:
Basic understanding of RPC concepts
// Apache Thrift IDL - User Service Example
// This file defines service interfaces and data structures
// 1. Namespace declaration
namespace py user_service
namespace java com.example.user.service
namespace js userService
namespace go user_service
namespace cpp user_service
// 2. Data types and structures
struct User {
1: required i64 id,
2: required string username,
3: optional string email,
4: optional string firstName,
5: optional string lastName,
6: optional i32 age,
7: optional bool isActive = true,
8: optional string createdAt,
9: optional string updatedAt
}
struct CreateUserRequest {
1: required string username,
2: optional string email,
3: optional string firstName,
4: optional string lastName,
5: optional i32 age
}
struct UpdateUserRequest {
1: required i64 id,
2: optional string username,
3: optional string email,
4: optional string firstName,
5: optional string lastName,
6: optional i32 age,
7: optional bool isActive
}
struct GetUserResponse {
1: required User user,
2: optional string message
}
struct ListUsersResponse {
1: required list<User> users,
2: required i32 totalCount,
3: optional i32 page,
4: optional i32 pageSize
}
// 3. Enums
enum UserStatus {
ACTIVE = 1,
INACTIVE = 2,
SUSPENDED = 3,
DELETED = 4
}
enum ErrorType {
USER_NOT_FOUND = 1001,
INVALID_INPUT = 1002,
DATABASE_ERROR = 1003,
PERMISSION_DENIED = 1004,
INTERNAL_ERROR = 1005
}
// 4. Exceptions
exception UserException {
1: required ErrorType errorType,
2: required string message,
3: optional string details
}
exception ValidationException {
1: required string field,
2: required string message,
3: optional string value
}
// 5. Service definition
service UserService {
// CRUD operations
User getUser(1: i64 userId) throws (1: UserException ue),
User createUser(1: CreateUserRequest request)
throws (1: ValidationException ve, 2: UserException ue),
User updateUser(1: UpdateUserRequest request)
throws (1: ValidationException ve, 2: UserException ue),
bool deleteUser(1: i64 userId) throws (1: UserException ue),
// Batch operations
ListUsersResponse listUsers(1: optional i32 page = 1, 2: optional i32 pageSize = 10),
list<User> getUsersByStatus(1: UserStatus status),
// Search operations
list<User> searchUsers(1: string query, 2: optional i32 limit = 10),
// Authentication
User authenticateUser(1: string username, 2: string password)
throws (1: UserException ue),
}
// 6. Additional service for authentication
service AuthService {
string generateToken(1: i64 userId, 1: string username),
bool validateToken(1: string token),
User getUserFromToken(1: string token) throws (1: UserException ue),
}
💻 Implémentation du Serveur Thrift Python python
🟡 intermediate
⭐⭐⭐
Implémentation complète du serveur Thrift en Python avec handlers et configuration du serveur
⏱️ 25 min
🏷️ thrift, python, server, rpc, microservices
Prerequisites:
Python basics, Thrift IDL understanding
# Apache Thrift Python Server Implementation
# This file implements the UserService defined in the IDL
import sys
import json
from datetime import datetime
from typing import Dict, List, Optional
# Thrift imports
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
# Generated imports (after running thrift compiler)
# from user_service import UserService
# from user_service.ttypes import *
# Mock database for demonstration
class UserDatabase:
def __init__(self):
self.users: Dict[int, User] = {}
self.next_id = 1
self._init_sample_data()
def _init_sample_data(self):
# Add sample users
sample_users = [
User(id=1, username="john_doe", email="[email protected]",
firstName="John", lastName="Doe", age=30, isActive=True,
createdAt=datetime.now().isoformat()),
User(id=2, username="jane_smith", email="[email protected]",
firstName="Jane", lastName="Smith", age=25, isActive=True,
createdAt=datetime.now().isoformat())
]
for user in sample_users:
self.users[user.id] = user
self.next_id = max(self.next_id, user.id + 1)
def create_user(self, request: CreateUserRequest) -> User:
# Validation
if not request.username or len(request.username) < 3:
raise ValidationException(field="username", message="Username must be at least 3 characters")
# Check for duplicate username
for user in self.users.values():
if user.username == request.username:
raise UserException(errorType=ErrorType.INVALID_INPUT,
message="Username already exists")
# Create new user
user = User(
id=self.next_id,
username=request.username,
email=request.email,
firstName=request.firstName,
lastName=request.lastName,
age=request.age,
isActive=True,
createdAt=datetime.now().isoformat()
)
self.users[self.next_id] = user
self.next_id += 1
return user
def get_user(self, user_id: int) -> User:
if user_id not in self.users:
raise UserException(errorType=ErrorType.USER_NOT_FOUND,
message=f"User with ID {user_id} not found")
return self.users[user_id]
def update_user(self, request: UpdateUserRequest) -> User:
if request.id not in self.users:
raise UserException(errorType=ErrorType.USER_NOT_FOUND,
message=f"User with ID {request.id} not found")
user = self.users[request.id]
# Update fields if provided
if request.username:
if len(request.username) < 3:
raise ValidationException(field="username",
message="Username must be at least 3 characters")
user.username = request.username
if request.email is not None:
user.email = request.email
if request.firstName is not None:
user.firstName = request.firstName
if request.lastName is not None:
user.lastName = request.lastName
if request.age is not None:
user.age = request.age
if request.isActive is not None:
user.isActive = request.isActive
user.updatedAt = datetime.now().isoformat()
return user
def delete_user(self, user_id: int) -> bool:
if user_id not in self.users:
raise UserException(errorType=ErrorType.USER_NOT_FOUND,
message=f"User with ID {user_id} not found")
del self.users[user_id]
return True
def list_users(self, page: int = 1, page_size: int = 10) -> ListUsersResponse:
all_users = list(self.users.values())
# Pagination
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_users = all_users[start_idx:end_idx]
return ListUsersResponse(
users=paginated_users,
totalCount=len(all_users),
page=page,
pageSize=page_size
)
# Service handler implementation
class UserServiceHandler:
def __init__(self):
self.db = UserDatabase()
def getUser(self, userId):
try:
user = self.db.get_user(userId)
return GetUserResponse(user=user, message="User found successfully")
except UserException as e:
raise e
except Exception as e:
raise UserException(errorType=ErrorType.INTERNAL_ERROR,
message=str(e))
def createUser(self, request):
try:
user = self.db.create_user(request)
return user
except (UserException, ValidationException) as e:
raise e
except Exception as e:
raise UserException(errorType=ErrorType.INTERNAL_ERROR,
message=str(e))
def updateUser(self, request):
try:
user = self.db.update_user(request)
return user
except (UserException, ValidationException) as e:
raise e
except Exception as e:
raise UserException(errorType=ErrorType.INTERNAL_ERROR,
message=str(e))
def deleteUser(self, userId):
try:
return self.db.delete_user(userId)
except UserException as e:
raise e
except Exception as e:
raise UserException(errorType=ErrorType.INTERNAL_ERROR,
message=str(e))
def listUsers(self, page=None, pageSize=None):
try:
return self.db.list_users(page or 1, pageSize or 10)
except Exception as e:
raise UserException(errorType=ErrorType.INTERNAL_ERROR,
message=str(e))
def getUsersByStatus(self, status):
try:
all_users = list(self.db.users.values())
filtered_users = [u for u in all_users if u.isActive == (status == UserStatus.ACTIVE)]
return filtered_users
except Exception as e:
raise UserException(errorType=ErrorType.INTERNAL_ERROR,
message=str(e))
def searchUsers(self, query, limit=10):
try:
all_users = list(self.db.users.values())
query_lower = query.lower()
matching_users = []
for user in all_users:
if (query_lower in user.username.lower() or
(user.email and query_lower in user.email.lower()) or
(user.firstName and query_lower in user.firstName.lower()) or
(user.lastName and query_lower in user.lastName.lower())):
matching_users.append(user)
return matching_users[:limit]
except Exception as e:
raise UserException(errorType=ErrorType.INTERNAL_ERROR,
message=str(e))
def authenticateUser(self, username, password):
# Simple mock authentication
try:
for user in self.db.users.values():
if user.username == username and user.isActive:
# In real implementation, verify password hash
return user
raise UserException(errorType=ErrorType.USER_NOT_FOUND,
message="Invalid username or password")
except UserException as e:
raise e
except Exception as e:
raise UserException(errorType=ErrorType.INTERNAL_ERROR,
message=str(e))
# Server setup and configuration
def create_server(host='localhost', port=9090):
# Create handler
handler = UserServiceHandler()
processor = UserService.Processor(handler)
# Transport
transport = TSocket.TServerSocket(host=host, port=port)
# Transport factory
tfactory = TTransport.TBufferedTransportFactory()
# Protocol factory
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
# Create server
server = TServer.TSimpleServer(
processor,
transport,
tfactory,
pfactory
)
return server
# Alternative: Multi-threaded server for better performance
def create_threaded_server(host='localhost', port=9090):
handler = UserServiceHandler()
processor = UserService.Processor(handler)
transport = TSocket.TServerSocket(host=host, port=port)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = TServer.TThreadedServer(
processor,
transport,
tfactory,
pfactory
)
return server
# Server execution
if __name__ == '__main__':
print("Starting Thrift User Service Server...")
# Create server
server = create_threaded_server('localhost', 9090)
print(f"Server listening on localhost:9090")
print("Press Ctrl+C to stop...")
try:
server.serve()
except KeyboardInterrupt:
print("\nShutting down server...")
server.stop()
💻 Implémentation du Client Thrift Python python
🟡 intermediate
⭐⭐⭐
Implémentation du client Thrift en Python avec gestion des connexions et gestion des erreurs
⏱️ 20 min
🏷️ thrift, python, client, rpc, microservices
Prerequisites:
Python basics, Thrift IDL understanding
# Apache Thrift Python Client Implementation
# This file demonstrates how to create a client for the UserService
import sys
import time
from typing import List, Optional
# Thrift imports
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.Thrift import TException
# Generated imports (after running thrift compiler)
# from user_service import UserService
# from user_service.ttypes import *
class ThriftClient:
def __init__(self, host='localhost', port=9090, timeout=5000):
self.host = host
self.port = port
self.timeout = timeout
self.client = None
self.transport = None
def connect(self):
"""Establish connection to the Thrift server"""
try:
# Create socket
socket = TSocket.TSocket(self.host, self.port)
socket.setTimeout(self.timeout)
# Create transport
self.transport = TTransport.TBufferedTransport(socket)
# Create protocol
protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
# Create client
self.client = UserService.Client(protocol)
# Open connection
self.transport.open()
print(f"Connected to Thrift server at {self.host}:{self.port}")
return True
except Exception as e:
print(f"Failed to connect to server: {e}")
self.close()
return False
def close(self):
"""Close connection to the Thrift server"""
if self.transport:
self.transport.close()
self.transport = None
self.client = None
print("Connection closed")
def is_connected(self):
"""Check if client is connected to server"""
return self.transport and self.transport.isOpen()
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# User service client wrapper
class UserServiceClient:
def __init__(self, host='localhost', port=9090):
self.thrift_client = ThriftClient(host, port)
def create_user(self, username, email=None, first_name=None, last_name=None, age=None):
"""Create a new user"""
try:
if not self.thrift_client.connect():
return None
request = CreateUserRequest(
username=username,
email=email,
firstName=first_name,
lastName=last_name,
age=age
)
user = self.thrift_client.client.createUser(request)
print(f"User created successfully: {user.username} (ID: {user.id})")
return user
except ValidationException as e:
print(f"Validation error in field '{e.field}': {e.message}")
return None
except UserException as e:
print(f"User service error: {e.message}")
return None
except TException as e:
print(f"Thrift error: {e}")
return None
finally:
self.thrift_client.close()
def get_user(self, user_id):
"""Get user by ID"""
try:
if not self.thrift_client.connect():
return None
response = self.thrift_client.client.getUser(user_id)
print(f"User found: {response.user.username}")
return response.user
except UserException as e:
if e.errorType == ErrorType.USER_NOT_FOUND:
print(f"User with ID {user_id} not found")
else:
print(f"User service error: {e.message}")
return None
except TException as e:
print(f"Thrift error: {e}")
return None
finally:
self.thrift_client.close()
def update_user(self, user_id, **kwargs):
"""Update user information"""
try:
if not self.thrift_client.connect():
return None
request = UpdateUserRequest(id=user_id)
# Set updateable fields
if 'username' in kwargs:
request.username = kwargs['username']
if 'email' in kwargs:
request.email = kwargs['email']
if 'first_name' in kwargs:
request.firstName = kwargs['first_name']
if 'last_name' in kwargs:
request.last_name = kwargs['last_name']
if 'age' in kwargs:
request.age = kwargs['age']
if 'is_active' in kwargs:
request.isActive = kwargs['is_active']
user = self.thrift_client.client.updateUser(request)
print(f"User updated successfully: {user.username}")
return user
except (ValidationException, UserException) as e:
print(f"Error updating user: {e.message}")
return None
except TException as e:
print(f"Thrift error: {e}")
return None
finally:
self.thrift_client.close()
def delete_user(self, user_id):
"""Delete user by ID"""
try:
if not self.thrift_client.connect():
return False
success = self.thrift_client.client.deleteUser(user_id)
if success:
print(f"User with ID {user_id} deleted successfully")
return success
except UserException as e:
if e.errorType == ErrorType.USER_NOT_FOUND:
print(f"User with ID {user_id} not found")
else:
print(f"Error deleting user: {e.message}")
return False
except TException as e:
print(f"Thrift error: {e}")
return False
finally:
self.thrift_client.close()
def list_users(self, page=1, page_size=10):
"""List users with pagination"""
try:
if not self.thrift_client.connect():
return None
response = self.thrift_client.client.listUsers(page, page_size)
print(f"Found {response.totalCount} users (Page {response.page}, "
f"showing {len(response.users)} of {response.pageSize})")
return response
except TException as e:
print(f"Thrift error: {e}")
return None
finally:
self.thrift_client.close()
def search_users(self, query, limit=10):
"""Search users by query"""
try:
if not self.thrift_client.connect():
return []
users = self.thrift_client.client.searchUsers(query, limit)
print(f"Found {len(users)} users matching '{query}'")
return users
except TException as e:
print(f"Thrift error: {e}")
return []
finally:
self.thrift_client.close()
def authenticate_user(self, username, password):
"""Authenticate user credentials"""
try:
if not self.thrift_client.connect():
return None
user = self.thrift_client.client.authenticateUser(username, password)
print(f"User authenticated successfully: {user.username}")
return user
except UserException as e:
print(f"Authentication failed: {e.message}")
return None
except TException as e:
print(f"Thrift error: {e}")
return None
finally:
self.thrift_client.close()
# Demo functions
def demo_user_operations():
"""Demonstrate user service operations"""
client = UserServiceClient()
print("=== Thrift User Service Demo ===\n")
# 1. Create users
print("1. Creating users...")
user1 = client.create_user(
username="alice_wonder",
email="[email protected]",
first_name="Alice",
last_name="Wonder",
age=28
)
user2 = client.create_user(
username="bob_builder",
email="[email protected]",
first_name="Bob",
last_name="Builder",
age=35
)
print()
# 2. List users
print("2. Listing users...")
users_response = client.list_users(page=1, page_size=5)
if users_response:
for user in users_response.users:
print(f" - {user.username} (ID: {user.id}, Email: {user.email or 'N/A'})")
print()
# 3. Get user by ID
if user1:
print(f"3. Getting user by ID ({user1.id})...")
retrieved_user = client.get_user(user1.id)
if retrieved_user:
print(f" Retrieved: {retrieved_user.firstName} {retrieved_user.lastName}")
print()
# 4. Update user
if user2:
print(f"4. Updating user ({user2.id})...")
updated_user = client.update_user(
user2.id,
age=36,
email="[email protected]"
)
if updated_user:
print(f" Updated age to: {updated_user.age}")
print()
# 5. Search users
print("5. Searching users...")
search_results = client.search_users("alice", limit=5)
for user in search_results:
print(f" - {user.username} (ID: {user.id})")
print()
# 6. Authenticate user
print("6. Authenticating user...")
auth_user = client.authenticate_user("alice_wonder", "password123")
if auth_user:
print(f" Authenticated: {auth_user.username}")
print()
# 7. Delete user
if user1:
print(f"7. Deleting user ({user1.id})...")
success = client.delete_user(user1.id)
print(f" Deleted: {success}")
print()
def demo_connection_management():
"""Demonstrate connection management patterns"""
print("=== Connection Management Demo ===\n")
# Using context manager
print("1. Using context manager:")
try:
with ThriftClient() as client:
if client.is_connected():
print(" Connected successfully!")
# Use client here...
except Exception as e:
print(f" Connection failed: {e}")
# Manual connection management
print("\n2. Manual connection management:")
client = ThriftClient()
try:
if client.connect():
print(" Connected successfully!")
# Use client here...
else:
print(" Failed to connect")
finally:
client.close()
# Performance testing
def performance_test(num_requests=100):
"""Test client performance"""
print(f"=== Performance Test ({num_requests} requests) ===\n")
client = UserServiceClient()
start_time = time.time()
success_count = 0
for i in range(num_requests):
try:
# Test list users operation
response = client.list_users(page=1, page_size=10)
if response:
success_count += 1
except Exception as e:
print(f"Request {i+1} failed: {e}")
end_time = time.time()
elapsed = end_time - start_time
print(f"Completed {success_count}/{num_requests} requests in {elapsed:.2f} seconds")
print(f"Average time per request: {(elapsed/num_requests)*1000:.2f} ms")
print(f"Requests per second: {num_requests/elapsed:.2f}")
if __name__ == '__main__':
# Run demonstrations
demo_user_operations()
print()
demo_connection_management()
print()
# Uncomment to run performance test
# performance_test(50)
💻 Implémentation du Serveur Thrift Java java
🔴 complex
⭐⭐⭐⭐
Implémentation du serveur Thrift en Java utilisant Netty et l'intégration Spring Boot
⏱️ 30 min
🏷️ thrift, java, server, enterprise, microservices
Prerequisites:
Java basics, Maven/Gradle, Thrift IDL understanding
// Apache Thrift Java Server Implementation
// This example shows how to create a Thrift server in Java
package com.example.user.service;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
// Service implementation
public class UserServiceHandler implements UserService.Iface {
// In-memory database simulation
private final Map<Long, User> userDatabase = new ConcurrentHashMap<>();
private final AtomicLong idGenerator = new AtomicLong(1);
public UserServiceHandler() {
initializeSampleData();
}
private void initializeSampleData() {
// Add sample users
User user1 = new User();
user1.setId(idGenerator.getAndIncrement());
user1.setUsername("john_doe");
user1.setEmail("[email protected]");
user1.setFirstName("John");
user1.setLastName("Doe");
user1.setAge(30);
user1.setIsActive(true);
user1.setCreatedAt(new Date().toString());
userDatabase.put(user1.getId(), user1);
User user2 = new User();
user2.setId(idGenerator.getAndIncrement());
user2.setUsername("jane_smith");
user2.setEmail("[email protected]");
user2.setFirstName("Jane");
user2.setLastName("Smith");
user2.setAge(25);
user2.setIsActive(true);
user2.setCreatedAt(new Date().toString());
userDatabase.put(user2.getId(), user2);
}
@Override
public User getUser(long userId) throws UserException, TException {
User user = userDatabase.get(userId);
if (user == null) {
throw new UserException(ErrorType.USER_NOT_FOUND,
"User with ID " + userId + " not found");
}
return user;
}
@Override
public User createUser(CreateUserRequest request)
throws ValidationException, UserException, TException {
// Validate username
if (request.getUsername() == null || request.getUsername().length() < 3) {
throw new ValidationException("username",
"Username must be at least 3 characters");
}
// Check for duplicate username
for (User existingUser : userDatabase.values()) {
if (existingUser.getUsername().equals(request.getUsername())) {
throw new UserException(ErrorType.INVALID_INPUT,
"Username already exists");
}
}
// Create new user
User user = new User();
user.setId(idGenerator.getAndIncrement());
user.setUsername(request.getUsername());
user.setEmail(request.getEmail());
user.setFirstName(request.getFirstName());
user.setLastName(request.getLastName());
user.setAge(request.getAge());
user.setIsActive(true);
user.setCreatedAt(new Date().toString());
userDatabase.put(user.getId(), user);
return user;
}
@Override
public User updateUser(UpdateUserRequest request)
throws ValidationException, UserException, TException {
User user = userDatabase.get(request.getId());
if (user == null) {
throw new UserException(ErrorType.USER_NOT_FOUND,
"User with ID " + request.getId() + " not found");
}
// Update fields if provided
if (request.isSetUsername()) {
if (request.getUsername().length() < 3) {
throw new ValidationException("username",
"Username must be at least 3 characters");
}
user.setUsername(request.getUsername());
}
if (request.isSetEmail()) {
user.setEmail(request.getEmail());
}
if (request.isSetFirstName()) {
user.setFirstName(request.getFirstName());
}
if (request.isSetLastName()) {
user.setLastName(request.getLastName());
}
if (request.isSetAge()) {
user.setAge(request.getAge());
}
if (request.isSetIsActive()) {
user.setIsActive(request.isIsActive());
}
user.setUpdatedAt(new Date().toString());
return user;
}
@Override
public boolean deleteUser(long userId) throws UserException, TException {
User user = userDatabase.get(userId);
if (user == null) {
throw new UserException(ErrorType.USER_NOT_FOUND,
"User with ID " + userId + " not found");
}
userDatabase.remove(userId);
return true;
}
@Override
public ListUsersResponse listUsers(int page, int pageSize) throws TException {
List<User> allUsers = new ArrayList<>(userDatabase.values());
// Sort by ID for consistent ordering
allUsers.sort(Comparator.comparingLong(User::getId));
// Pagination
int startIndex = (page - 1) * pageSize;
int endIndex = Math.min(startIndex + pageSize, allUsers.size());
List<User> paginatedUsers = allUsers.subList(startIndex, endIndex);
ListUsersResponse response = new ListUsersResponse();
response.setUsers(paginatedUsers);
response.setTotalCount(allUsers.size());
response.setPage(page);
response.setPageSize(pageSize);
return response;
}
@Override
public List<User> getUsersByStatus(UserStatus status) throws TException {
List<User> filteredUsers = new ArrayList<>();
boolean activeStatus = (status == UserStatus.ACTIVE);
for (User user : userDatabase.values()) {
if (user.isIsActive() == activeStatus) {
filteredUsers.add(user);
}
}
return filteredUsers;
}
@Override
public List<User> searchUsers(String query, int limit) throws TException {
List<User> matchingUsers = new ArrayList<>();
String queryLower = query.toLowerCase();
for (User user : userDatabase.values()) {
if (user.getUsername().toLowerCase().contains(queryLower) ||
(user.getEmail() != null && user.getEmail().toLowerCase().contains(queryLower)) ||
(user.getFirstName() != null && user.getFirstName().toLowerCase().contains(queryLower)) ||
(user.getLastName() != null && user.getLastName().toLowerCase().contains(queryLower))) {
matchingUsers.add(user);
if (matchingUsers.size() >= limit) {
break;
}
}
}
return matchingUsers;
}
@Override
public User authenticateUser(String username, String password)
throws UserException, TException {
// Simple mock authentication
for (User user : userDatabase.values()) {
if (user.getUsername().equals(username) && user.isIsActive()) {
// In real implementation, verify password hash
return user;
}
}
throw new UserException(ErrorType.USER_NOT_FOUND, "Invalid username or password");
}
}
// Server factory class
public class ThriftServerFactory {
// Simple server for development
public static TServer createSimpleServer(int port, UserService.Iface handler)
throws TTransportException {
TServerSocket serverTransport = new TServerSocket(port);
TProcessor processor = new UserService.Processor<>(handler);
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
TServer.Args serverArgs = new TServer.Args(serverTransport)
.processor(processor)
.protocolFactory(protocolFactory);
return new TSimpleServer(serverArgs);
}
// Thread pool server for better performance
public static TServer createThreadPoolServer(int port, UserService.Iface handler,
int maxWorkerThreads) throws TTransportException {
TServerSocket serverTransport = new TServerSocket(port);
TProcessor processor = new UserService.Processor<>(handler);
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
.processor(processor)
.protocolFactory(protocolFactory)
.maxWorkerThreads(maxWorkerThreads)
.minWorkerThreads(5);
return new TThreadPoolServer(serverArgs);
}
}
// Main application class
public class UserServiceApplication {
private static final int DEFAULT_PORT = 9090;
private static final int DEFAULT_WORKER_THREADS = 20;
public static void main(String[] args) {
int port = DEFAULT_PORT;
boolean useThreadPool = true;
// Parse command line arguments
for (String arg : args) {
if (arg.startsWith("--port=")) {
port = Integer.parseInt(arg.substring(7));
} else if (arg.equals("--simple")) {
useThreadPool = false;
} else if (arg.equals("--help")) {
printUsage();
return;
}
}
try {
// Create service handler
UserServiceHandler handler = new UserServiceHandler();
// Create server
TServer server;
if (useThreadPool) {
System.out.println("Creating thread pool server...");
server = ThriftServerFactory.createThreadPoolServer(
port, handler, DEFAULT_WORKER_THREADS);
} else {
System.out.println("Creating simple server...");
server = ThriftServerFactory.createSimpleServer(port, handler);
}
// Start server
System.out.println("Starting Thrift User Service Server...");
System.out.println("Server listening on port: " + port);
System.out.println("Server type: " + (useThreadPool ? "Thread Pool" : "Simple"));
System.out.println("Press Ctrl+C to stop...");
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("\nShutting down server...");
server.stop();
}));
// Serve requests
server.serve();
} catch (TTransportException e) {
System.err.println("Failed to start server: " + e.getMessage());
e.printStackTrace();
System.exit(1);
}
}
private static void printUsage() {
System.out.println("Usage: java UserServiceApplication [options]");
System.out.println("Options:");
System.out.println(" --port=<number> Server port (default: 9090)");
System.out.println(" --simple Use simple server instead of thread pool");
System.out.println(" --help Show this help message");
}
}
// Configuration class for server settings
class ThriftServerConfig {
private int port = 9090;
private boolean useThreadPool = true;
private int maxWorkerThreads = 20;
private int minWorkerThreads = 5;
private int clientTimeout = 5000; // milliseconds
private boolean enableLogging = true;
// Getters and setters
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
public boolean isUseThreadPool() { return useThreadPool; }
public void setUseThreadPool(boolean useThreadPool) { this.useThreadPool = useThreadPool; }
public int getMaxWorkerThreads() { return maxWorkerThreads; }
public void setMaxWorkerThreads(int maxWorkerThreads) { this.maxWorkerThreads = maxWorkerThreads; }
public int getMinWorkerThreads() { return minWorkerThreads; }
public void setMinWorkerThreads(int minWorkerThreads) { this.minWorkerThreads = minWorkerThreads; }
public int getClientTimeout() { return clientTimeout; }
public void setClientTimeout(int clientTimeout) { this.clientTimeout = clientTimeout; }
public boolean isEnableLogging() { return enableLogging; }
public void setEnableLogging(boolean enableLogging) { this.enableLogging = enableLogging; }
}
💻 Implémentation du Client Asynchrone Thrift python
🔴 complex
⭐⭐⭐⭐
Implémentation du client asynchrone Thrift pour les opérations non-bloquantes et haute concurrence
⏱️ 35 min
🏷️ thrift, async, python, performance, microservices
Prerequisites:
Python async/await, Thrift basics, Concurrency concepts
# Apache Thrift Async Client Implementation
# Demonstrates non-blocking Thrift operations using async/await
import asyncio
import time
from typing import List, Optional, Callable
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
# Async wrapper for synchronous Thrift client
class AsyncThriftClient:
def __init__(self, host='localhost', port=9090, timeout=5000, max_connections=10):
self.host = host
self.port = port
self.timeout = timeout
self.max_connections = max_connections
self.connection_pool = asyncio.Queue(maxsize=max_connections)
self.executor = ThreadPoolExecutor(max_workers=max_connections)
self._pool_lock = asyncio.Lock()
self._created_connections = 0
async def _create_connection(self):
"""Create a new Thrift connection"""
def _sync_create():
# Thrift imports would go here
# from user_service import UserService
# from thrift.transport import TSocket, TTransport
# from thrift.protocol import TBinaryProtocol
# Mock implementation for demonstration
return {
'client': 'MockThriftClient',
'transport': 'MockTransport',
'created_at': time.time()
}
loop = asyncio.get_event_loop()
connection = await loop.run_in_executor(self.executor, _sync_create)
return connection
async def _get_connection(self):
"""Get connection from pool or create new one"""
try:
# Try to get existing connection from pool
connection = self.connection_pool.get_nowait()
# Validate connection is still valid
if time.time() - connection['created_at'] < 300: # 5 minute timeout
return connection
else:
# Connection expired, close and create new
await self._close_connection(connection)
except asyncio.QueueEmpty:
pass
# Create new connection if we haven't reached max
async with self._pool_lock:
if self._created_connections < self.max_connections:
connection = await self._create_connection()
self._created_connections += 1
return connection
# If we're here, all connections are in use, wait for one
connection = await self.connection_pool.get()
return connection
async def _close_connection(self, connection):
"""Close a Thrift connection"""
def _sync_close(conn):
# Close Thrift transport
# conn['transport'].close()
pass
loop = asyncio.get_event_loop()
await loop.run_in_executor(self.executor, _sync_close, connection)
async with self._pool_lock:
self._created_connections -= 1
async def _return_connection(self, connection):
"""Return connection to pool"""
try:
self.connection_pool.put_nowait(connection)
except asyncio.QueueFull:
# Pool is full, close the connection
await self._close_connection(connection)
async def close_all(self):
"""Close all connections and cleanup"""
while not self.connection_pool.empty():
try:
connection = self.connection_pool.get_nowait()
await self._close_connection(connection)
except asyncio.QueueEmpty:
break
self.executor.shutdown(wait=True)
async def execute_thrift_call(self, method_name: str, *args, **kwargs):
"""Execute a Thrift method asynchronously"""
connection = None
try:
# Get connection
connection = await self._get_connection()
# Execute method in thread pool
def _sync_execute():
client = connection['client']
method = getattr(client, method_name)
return method(*args, **kwargs)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(self.executor, _sync_execute)
return result
except Exception as e:
print(f"Error executing {method_name}: {e}")
raise
finally:
if connection:
await self._return_connection(connection)
# Async user service client
class AsyncUserServiceClient:
def __init__(self, host='localhost', port=9090):
self.thrift_client = AsyncThriftClient(host, port)
async def create_user(self, username, email=None, first_name=None,
last_name=None, age=None):
"""Create a new user asynchronously"""
try:
# Prepare request
request = {
'username': username,
'email': email,
'first_name': first_name,
'last_name': last_name,
'age': age
}
# Execute async call
user = await self.thrift_client.execute_thrift_call(
'createUser', request)
print(f"User created asynchronously: {user['username']} (ID: {user['id']})")
return user
except Exception as e:
print(f"Failed to create user: {e}")
return None
async def get_user(self, user_id):
"""Get user by ID asynchronously"""
try:
response = await self.thrift_client.execute_thrift_call('getUser', user_id)
print(f"User found: {response['user']['username']}")
return response['user']
except Exception as e:
if 'USER_NOT_FOUND' in str(e):
print(f"User with ID {user_id} not found")
else:
print(f"Error getting user: {e}")
return None
async def get_multiple_users(self, user_ids: List[int]):
"""Get multiple users concurrently"""
tasks = [self.get_user(user_id) for user_id in user_ids]
users = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions and None values
valid_users = [user for user in users if isinstance(user, dict)]
print(f"Retrieved {len(valid_users)} out of {len(user_ids)} requested users")
return valid_users
async def update_user(self, user_id, **kwargs):
"""Update user asynchronously"""
try:
request = {'id': user_id}
request.update(kwargs)
user = await self.thrift_client.execute_thrift_call('updateUser', request)
print(f"User updated: {user['username']}")
return user
except Exception as e:
print(f"Failed to update user: {e}")
return None
async def delete_user(self, user_id):
"""Delete user asynchronously"""
try:
success = await self.thrift_client.execute_thrift_call('deleteUser', user_id)
if success:
print(f"User {user_id} deleted successfully")
return success
except Exception as e:
print(f"Failed to delete user: {e}")
return False
async def list_users(self, page=1, page_size=10):
"""List users with pagination asynchronously"""
try:
response = await self.thrift_client.execute_thrift_call(
'listUsers', page, page_size)
print(f"Found {response['total_count']} users")
return response
except Exception as e:
print(f"Failed to list users: {e}")
return None
async def search_users(self, query, limit=10):
"""Search users asynchronously"""
try:
users = await self.thrift_client.execute_thrift_call(
'searchUsers', query, limit)
print(f"Found {len(users)} users matching '{query}'")
return users
except Exception as e:
print(f"Failed to search users: {e}")
return []
async def close(self):
"""Close the async client"""
await self.thrift_client.close_all()
# Batch operations
class AsyncBatchOperations:
def __init__(self, client: AsyncUserServiceClient):
self.client = client
async def create_multiple_users(self, user_data_list):
"""Create multiple users concurrently"""
print(f"Creating {len(user_data_list)} users concurrently...")
tasks = []
for user_data in user_data_list:
task = self.client.create_user(**user_data)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successes and failures
successful = [r for r in results if isinstance(r, dict)]
failures = [r for r in results if isinstance(r, Exception)]
print(f"Created {len(successful)} users successfully")
print(f"Failed to create {len(failures)} users")
return successful
async def bulk_update_users(self, updates):
"""Update multiple users concurrently"""
print(f"Updating {len(updates)} users concurrently...")
tasks = []
for user_id, update_data in updates:
task = self.client.update_user(user_id, **update_data)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if isinstance(r, dict)]
print(f"Updated {len(successful)} users successfully")
return successful
async def process_user_pipeline(self, user_ids):
"""Process a pipeline of operations on users"""
print(f"Processing pipeline for {len(user_ids)} users...")
# Step 1: Get all users
users = await self.client.get_multiple_users(user_ids)
# Step 2: Process each user (simulate some processing)
processed_users = []
for user in users:
# Simulate processing time
await asyncio.sleep(0.1)
# Add processed flag
user['processed'] = True
processed_users.append(user)
# Step 3: Update processed users
update_tasks = []
for user in processed_users:
task = self.client.update_user(user['id'], processed=True)
update_tasks.append(task)
await asyncio.gather(*update_tasks, return_exceptions=True)
print(f"Pipeline completed for {len(processed_users)} users")
return processed_users
# Rate limiting decorator
def rate_limit(max_calls_per_second: float):
"""Rate limit async function calls"""
def decorator(func):
calls = []
@wraps(func)
async def wrapper(*args, **kwargs):
now = time.time()
# Remove old calls outside the time window
calls[:] = [call_time for call_time in calls if now - call_time < 1.0]
# Check if we've exceeded the rate limit
if len(calls) >= max_calls_per_second:
sleep_time = 1.0 - (now - calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
calls.append(now)
return await func(*args, **kwargs)
return wrapper
return decorator
# Rate-limited client wrapper
class RateLimitedUserServiceClient(AsyncUserServiceClient):
@rate_limit(10) # Max 10 calls per second
async def create_user(self, *args, **kwargs):
return await super().create_user(*args, **kwargs)
@rate_limit(20) # Max 20 calls per second
async def get_user(self, *args, **kwargs):
return await super().get_user(*args, **kwargs)
# Demo and testing functions
async def demo_async_operations():
"""Demonstrate async operations"""
client = AsyncUserServiceClient()
try:
print("=== Async Thrift Client Demo ===\n")
# 1. Create multiple users concurrently
print("1. Creating users concurrently...")
user_data = [
{'username': 'async_user1', 'email': '[email protected]', 'first_name': 'Alice'},
{'username': 'async_user2', 'email': '[email protected]', 'first_name': 'Bob'},
{'username': 'async_user3', 'email': '[email protected]', 'first_name': 'Charlie'},
]
batch_ops = AsyncBatchOperations(client)
created_users = await batch_ops.create_multiple_users(user_data)
# 2. Get multiple users concurrently
if created_users:
user_ids = [user['id'] for user in created_users[:2]]
print("\n2. Getting users concurrently...")
retrieved_users = await client.get_multiple_users(user_ids)
# 3. Search users
print("\n3. Searching users...")
search_results = await client.search_users("async_user", limit=5)
# 4. Process pipeline
if created_users:
user_ids = [user['id'] for user in created_users]
print("\n4. Processing user pipeline...")
await batch_ops.process_user_pipeline(user_ids)
finally:
await client.close()
async def demo_rate_limiting():
"""Demonstrate rate limiting"""
client = RateLimitedUserServiceClient()
try:
print("\n=== Rate Limiting Demo ===\n")
print("Creating users with rate limiting (max 10/sec)...")
start_time = time.time()
# Create 15 users - should take at least 1.5 seconds due to rate limiting
tasks = []
for i in range(15):
task = client.create_user(f'rate_limited_user_{i}', f'user{i}@example.com')
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.time() - start_time
print(f"Created 15 users in {elapsed:.2f} seconds (rate limited)")
finally:
await client.close()
async def performance_test(num_concurrent_requests=50):
"""Test async client performance"""
client = AsyncUserServiceClient()
try:
print(f"\n=== Performance Test ({num_concurrent_requests} concurrent requests) ===\n")
start_time = time.time()
# Create concurrent tasks
tasks = []
for i in range(num_concurrent_requests):
task = client.list_users(page=1, page_size=10)
tasks.append(task)
# Wait for all to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successful requests
successful = sum(1 for r in results if not isinstance(r, Exception))
elapsed = time.time() - start_time
print(f"Completed {successful}/{num_concurrent_requests} requests in {elapsed:.2f} seconds")
print(f"Requests per second: {num_concurrent_requests/elapsed:.2f}")
finally:
await client.close()
if __name__ == '__main__':
# Run async demos
asyncio.run(demo_async_operations())
asyncio.run(demo_rate_limiting())
asyncio.run(performance_test(20))