Примеры Apache Thrift

Примеры фреймворка Apache Thrift для разработки сервисов на разных языках с IDL-определениями и реализациями клиент/сервер

💻 Определение Сервиса Thrift IDL thrift

🟢 simple ⭐⭐

Базовые определения Thrift Interface Definition Language (IDL) с определениями сервисов, структурами данных и перечислениями

⏱️ 15 min 🏷️ thrift, idl, rpc, microservices
Prerequisites: Basic understanding of RPC concepts
// Apache Thrift IDL - User Service Example
// This file defines service interfaces and data structures

// 1. Namespace declaration
namespace py user_service
namespace java com.example.user.service
namespace js userService
namespace go user_service
namespace cpp user_service

// 2. Data types and structures
struct User {
    1: required i64 id,
    2: required string username,
    3: optional string email,
    4: optional string firstName,
    5: optional string lastName,
    6: optional i32 age,
    7: optional bool isActive = true,
    8: optional string createdAt,
    9: optional string updatedAt
}

struct CreateUserRequest {
    1: required string username,
    2: optional string email,
    3: optional string firstName,
    4: optional string lastName,
    5: optional i32 age
}

struct UpdateUserRequest {
    1: required i64 id,
    2: optional string username,
    3: optional string email,
    4: optional string firstName,
    5: optional string lastName,
    6: optional i32 age,
    7: optional bool isActive
}

struct GetUserResponse {
    1: required User user,
    2: optional string message
}

struct ListUsersResponse {
    1: required list<User> users,
    2: required i32 totalCount,
    3: optional i32 page,
    4: optional i32 pageSize
}

// 3. Enums
enum UserStatus {
    ACTIVE = 1,
    INACTIVE = 2,
    SUSPENDED = 3,
    DELETED = 4
}

enum ErrorType {
    USER_NOT_FOUND = 1001,
    INVALID_INPUT = 1002,
    DATABASE_ERROR = 1003,
    PERMISSION_DENIED = 1004,
    INTERNAL_ERROR = 1005
}

// 4. Exceptions
exception UserException {
    1: required ErrorType errorType,
    2: required string message,
    3: optional string details
}

exception ValidationException {
    1: required string field,
    2: required string message,
    3: optional string value
}

// 5. Service definition
service UserService {
    // CRUD operations
    User getUser(1: i64 userId) throws (1: UserException ue),

    User createUser(1: CreateUserRequest request)
        throws (1: ValidationException ve, 2: UserException ue),

    User updateUser(1: UpdateUserRequest request)
        throws (1: ValidationException ve, 2: UserException ue),

    bool deleteUser(1: i64 userId) throws (1: UserException ue),

    // Batch operations
    ListUsersResponse listUsers(1: optional i32 page = 1, 2: optional i32 pageSize = 10),

    list<User> getUsersByStatus(1: UserStatus status),

    // Search operations
    list<User> searchUsers(1: string query, 2: optional i32 limit = 10),

    // Authentication
    User authenticateUser(1: string username, 2: string password)
        throws (1: UserException ue),
}

// 6. Additional service for authentication
service AuthService {
    string generateToken(1: i64 userId, 1: string username),
    bool validateToken(1: string token),
    User getUserFromToken(1: string token) throws (1: UserException ue),
}

💻 Реализация Сервера Thrift на Python python

🟡 intermediate ⭐⭐⭐

Полная реализация сервера Thrift на Python с обработчиками и конфигурацией сервера

⏱️ 25 min 🏷️ thrift, python, server, rpc, microservices
Prerequisites: Python basics, Thrift IDL understanding
# Apache Thrift Python Server Implementation
# This file implements the UserService defined in the IDL

import sys
import json
from datetime import datetime
from typing import Dict, List, Optional

# Thrift imports
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

# Generated imports (after running thrift compiler)
# from user_service import UserService
# from user_service.ttypes import *

# Mock database for demonstration
class UserDatabase:
    def __init__(self):
        self.users: Dict[int, User] = {}
        self.next_id = 1
        self._init_sample_data()

    def _init_sample_data(self):
        # Add sample users
        sample_users = [
            User(id=1, username="john_doe", email="[email protected]",
                 firstName="John", lastName="Doe", age=30, isActive=True,
                 createdAt=datetime.now().isoformat()),
            User(id=2, username="jane_smith", email="[email protected]",
                 firstName="Jane", lastName="Smith", age=25, isActive=True,
                 createdAt=datetime.now().isoformat())
        ]
        for user in sample_users:
            self.users[user.id] = user
            self.next_id = max(self.next_id, user.id + 1)

    def create_user(self, request: CreateUserRequest) -> User:
        # Validation
        if not request.username or len(request.username) < 3:
            raise ValidationException(field="username", message="Username must be at least 3 characters")

        # Check for duplicate username
        for user in self.users.values():
            if user.username == request.username:
                raise UserException(errorType=ErrorType.INVALID_INPUT,
                                   message="Username already exists")

        # Create new user
        user = User(
            id=self.next_id,
            username=request.username,
            email=request.email,
            firstName=request.firstName,
            lastName=request.lastName,
            age=request.age,
            isActive=True,
            createdAt=datetime.now().isoformat()
        )

        self.users[self.next_id] = user
        self.next_id += 1

        return user

    def get_user(self, user_id: int) -> User:
        if user_id not in self.users:
            raise UserException(errorType=ErrorType.USER_NOT_FOUND,
                               message=f"User with ID {user_id} not found")
        return self.users[user_id]

    def update_user(self, request: UpdateUserRequest) -> User:
        if request.id not in self.users:
            raise UserException(errorType=ErrorType.USER_NOT_FOUND,
                               message=f"User with ID {request.id} not found")

        user = self.users[request.id]

        # Update fields if provided
        if request.username:
            if len(request.username) < 3:
                raise ValidationException(field="username",
                                        message="Username must be at least 3 characters")
            user.username = request.username

        if request.email is not None:
            user.email = request.email
        if request.firstName is not None:
            user.firstName = request.firstName
        if request.lastName is not None:
            user.lastName = request.lastName
        if request.age is not None:
            user.age = request.age
        if request.isActive is not None:
            user.isActive = request.isActive

        user.updatedAt = datetime.now().isoformat()

        return user

    def delete_user(self, user_id: int) -> bool:
        if user_id not in self.users:
            raise UserException(errorType=ErrorType.USER_NOT_FOUND,
                               message=f"User with ID {user_id} not found")

        del self.users[user_id]
        return True

    def list_users(self, page: int = 1, page_size: int = 10) -> ListUsersResponse:
        all_users = list(self.users.values())

        # Pagination
        start_idx = (page - 1) * page_size
        end_idx = start_idx + page_size

        paginated_users = all_users[start_idx:end_idx]

        return ListUsersResponse(
            users=paginated_users,
            totalCount=len(all_users),
            page=page,
            pageSize=page_size
        )

# Service handler implementation
class UserServiceHandler:
    def __init__(self):
        self.db = UserDatabase()

    def getUser(self, userId):
        try:
            user = self.db.get_user(userId)
            return GetUserResponse(user=user, message="User found successfully")
        except UserException as e:
            raise e
        except Exception as e:
            raise UserException(errorType=ErrorType.INTERNAL_ERROR,
                               message=str(e))

    def createUser(self, request):
        try:
            user = self.db.create_user(request)
            return user
        except (UserException, ValidationException) as e:
            raise e
        except Exception as e:
            raise UserException(errorType=ErrorType.INTERNAL_ERROR,
                               message=str(e))

    def updateUser(self, request):
        try:
            user = self.db.update_user(request)
            return user
        except (UserException, ValidationException) as e:
            raise e
        except Exception as e:
            raise UserException(errorType=ErrorType.INTERNAL_ERROR,
                               message=str(e))

    def deleteUser(self, userId):
        try:
            return self.db.delete_user(userId)
        except UserException as e:
            raise e
        except Exception as e:
            raise UserException(errorType=ErrorType.INTERNAL_ERROR,
                               message=str(e))

    def listUsers(self, page=None, pageSize=None):
        try:
            return self.db.list_users(page or 1, pageSize or 10)
        except Exception as e:
            raise UserException(errorType=ErrorType.INTERNAL_ERROR,
                               message=str(e))

    def getUsersByStatus(self, status):
        try:
            all_users = list(self.db.users.values())
            filtered_users = [u for u in all_users if u.isActive == (status == UserStatus.ACTIVE)]
            return filtered_users
        except Exception as e:
            raise UserException(errorType=ErrorType.INTERNAL_ERROR,
                               message=str(e))

    def searchUsers(self, query, limit=10):
        try:
            all_users = list(self.db.users.values())
            query_lower = query.lower()

            matching_users = []
            for user in all_users:
                if (query_lower in user.username.lower() or
                    (user.email and query_lower in user.email.lower()) or
                    (user.firstName and query_lower in user.firstName.lower()) or
                    (user.lastName and query_lower in user.lastName.lower())):
                    matching_users.append(user)

            return matching_users[:limit]
        except Exception as e:
            raise UserException(errorType=ErrorType.INTERNAL_ERROR,
                               message=str(e))

    def authenticateUser(self, username, password):
        # Simple mock authentication
        try:
            for user in self.db.users.values():
                if user.username == username and user.isActive:
                    # In real implementation, verify password hash
                    return user

            raise UserException(errorType=ErrorType.USER_NOT_FOUND,
                               message="Invalid username or password")
        except UserException as e:
            raise e
        except Exception as e:
            raise UserException(errorType=ErrorType.INTERNAL_ERROR,
                               message=str(e))

# Server setup and configuration
def create_server(host='localhost', port=9090):
    # Create handler
    handler = UserServiceHandler()
    processor = UserService.Processor(handler)

    # Transport
    transport = TSocket.TServerSocket(host=host, port=port)

    # Transport factory
    tfactory = TTransport.TBufferedTransportFactory()

    # Protocol factory
    pfactory = TBinaryProtocol.TBinaryProtocolFactory()

    # Create server
    server = TServer.TSimpleServer(
        processor,
        transport,
        tfactory,
        pfactory
    )

    return server

# Alternative: Multi-threaded server for better performance
def create_threaded_server(host='localhost', port=9090):
    handler = UserServiceHandler()
    processor = UserService.Processor(handler)
    transport = TSocket.TServerSocket(host=host, port=port)
    tfactory = TTransport.TBufferedTransportFactory()
    pfactory = TBinaryProtocol.TBinaryProtocolFactory()

    server = TServer.TThreadedServer(
        processor,
        transport,
        tfactory,
        pfactory
    )

    return server

# Server execution
if __name__ == '__main__':
    print("Starting Thrift User Service Server...")

    # Create server
    server = create_threaded_server('localhost', 9090)

    print(f"Server listening on localhost:9090")
    print("Press Ctrl+C to stop...")

    try:
        server.serve()
    except KeyboardInterrupt:
        print("\nShutting down server...")
        server.stop()

💻 Реализация Клиента Thrift на Python python

🟡 intermediate ⭐⭐⭐

Реализация клиента Thrift на Python с управлением подключениями и обработкой ошибок

⏱️ 20 min 🏷️ thrift, python, client, rpc, microservices
Prerequisites: Python basics, Thrift IDL understanding
# Apache Thrift Python Client Implementation
# This file demonstrates how to create a client for the UserService

import sys
import time
from typing import List, Optional

# Thrift imports
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.Thrift import TException

# Generated imports (after running thrift compiler)
# from user_service import UserService
# from user_service.ttypes import *

class ThriftClient:
    def __init__(self, host='localhost', port=9090, timeout=5000):
        self.host = host
        self.port = port
        self.timeout = timeout
        self.client = None
        self.transport = None

    def connect(self):
        """Establish connection to the Thrift server"""
        try:
            # Create socket
            socket = TSocket.TSocket(self.host, self.port)
            socket.setTimeout(self.timeout)

            # Create transport
            self.transport = TTransport.TBufferedTransport(socket)

            # Create protocol
            protocol = TBinaryProtocol.TBinaryProtocol(self.transport)

            # Create client
            self.client = UserService.Client(protocol)

            # Open connection
            self.transport.open()

            print(f"Connected to Thrift server at {self.host}:{self.port}")
            return True

        except Exception as e:
            print(f"Failed to connect to server: {e}")
            self.close()
            return False

    def close(self):
        """Close connection to the Thrift server"""
        if self.transport:
            self.transport.close()
            self.transport = None
            self.client = None
            print("Connection closed")

    def is_connected(self):
        """Check if client is connected to server"""
        return self.transport and self.transport.isOpen()

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

# User service client wrapper
class UserServiceClient:
    def __init__(self, host='localhost', port=9090):
        self.thrift_client = ThriftClient(host, port)

    def create_user(self, username, email=None, first_name=None, last_name=None, age=None):
        """Create a new user"""
        try:
            if not self.thrift_client.connect():
                return None

            request = CreateUserRequest(
                username=username,
                email=email,
                firstName=first_name,
                lastName=last_name,
                age=age
            )

            user = self.thrift_client.client.createUser(request)
            print(f"User created successfully: {user.username} (ID: {user.id})")
            return user

        except ValidationException as e:
            print(f"Validation error in field '{e.field}': {e.message}")
            return None
        except UserException as e:
            print(f"User service error: {e.message}")
            return None
        except TException as e:
            print(f"Thrift error: {e}")
            return None
        finally:
            self.thrift_client.close()

    def get_user(self, user_id):
        """Get user by ID"""
        try:
            if not self.thrift_client.connect():
                return None

            response = self.thrift_client.client.getUser(user_id)
            print(f"User found: {response.user.username}")
            return response.user

        except UserException as e:
            if e.errorType == ErrorType.USER_NOT_FOUND:
                print(f"User with ID {user_id} not found")
            else:
                print(f"User service error: {e.message}")
            return None
        except TException as e:
            print(f"Thrift error: {e}")
            return None
        finally:
            self.thrift_client.close()

    def update_user(self, user_id, **kwargs):
        """Update user information"""
        try:
            if not self.thrift_client.connect():
                return None

            request = UpdateUserRequest(id=user_id)

            # Set updateable fields
            if 'username' in kwargs:
                request.username = kwargs['username']
            if 'email' in kwargs:
                request.email = kwargs['email']
            if 'first_name' in kwargs:
                request.firstName = kwargs['first_name']
            if 'last_name' in kwargs:
                request.last_name = kwargs['last_name']
            if 'age' in kwargs:
                request.age = kwargs['age']
            if 'is_active' in kwargs:
                request.isActive = kwargs['is_active']

            user = self.thrift_client.client.updateUser(request)
            print(f"User updated successfully: {user.username}")
            return user

        except (ValidationException, UserException) as e:
            print(f"Error updating user: {e.message}")
            return None
        except TException as e:
            print(f"Thrift error: {e}")
            return None
        finally:
            self.thrift_client.close()

    def delete_user(self, user_id):
        """Delete user by ID"""
        try:
            if not self.thrift_client.connect():
                return False

            success = self.thrift_client.client.deleteUser(user_id)
            if success:
                print(f"User with ID {user_id} deleted successfully")
            return success

        except UserException as e:
            if e.errorType == ErrorType.USER_NOT_FOUND:
                print(f"User with ID {user_id} not found")
            else:
                print(f"Error deleting user: {e.message}")
            return False
        except TException as e:
            print(f"Thrift error: {e}")
            return False
        finally:
            self.thrift_client.close()

    def list_users(self, page=1, page_size=10):
        """List users with pagination"""
        try:
            if not self.thrift_client.connect():
                return None

            response = self.thrift_client.client.listUsers(page, page_size)
            print(f"Found {response.totalCount} users (Page {response.page}, "
                  f"showing {len(response.users)} of {response.pageSize})")

            return response

        except TException as e:
            print(f"Thrift error: {e}")
            return None
        finally:
            self.thrift_client.close()

    def search_users(self, query, limit=10):
        """Search users by query"""
        try:
            if not self.thrift_client.connect():
                return []

            users = self.thrift_client.client.searchUsers(query, limit)
            print(f"Found {len(users)} users matching '{query}'")

            return users

        except TException as e:
            print(f"Thrift error: {e}")
            return []
        finally:
            self.thrift_client.close()

    def authenticate_user(self, username, password):
        """Authenticate user credentials"""
        try:
            if not self.thrift_client.connect():
                return None

            user = self.thrift_client.client.authenticateUser(username, password)
            print(f"User authenticated successfully: {user.username}")
            return user

        except UserException as e:
            print(f"Authentication failed: {e.message}")
            return None
        except TException as e:
            print(f"Thrift error: {e}")
            return None
        finally:
            self.thrift_client.close()

# Demo functions
def demo_user_operations():
    """Demonstrate user service operations"""
    client = UserServiceClient()

    print("=== Thrift User Service Demo ===\n")

    # 1. Create users
    print("1. Creating users...")
    user1 = client.create_user(
        username="alice_wonder",
        email="[email protected]",
        first_name="Alice",
        last_name="Wonder",
        age=28
    )

    user2 = client.create_user(
        username="bob_builder",
        email="[email protected]",
        first_name="Bob",
        last_name="Builder",
        age=35
    )

    print()

    # 2. List users
    print("2. Listing users...")
    users_response = client.list_users(page=1, page_size=5)
    if users_response:
        for user in users_response.users:
            print(f"  - {user.username} (ID: {user.id}, Email: {user.email or 'N/A'})")
    print()

    # 3. Get user by ID
    if user1:
        print(f"3. Getting user by ID ({user1.id})...")
        retrieved_user = client.get_user(user1.id)
        if retrieved_user:
            print(f"   Retrieved: {retrieved_user.firstName} {retrieved_user.lastName}")
    print()

    # 4. Update user
    if user2:
        print(f"4. Updating user ({user2.id})...")
        updated_user = client.update_user(
            user2.id,
            age=36,
            email="[email protected]"
        )
        if updated_user:
            print(f"   Updated age to: {updated_user.age}")
    print()

    # 5. Search users
    print("5. Searching users...")
    search_results = client.search_users("alice", limit=5)
    for user in search_results:
        print(f"  - {user.username} (ID: {user.id})")
    print()

    # 6. Authenticate user
    print("6. Authenticating user...")
    auth_user = client.authenticate_user("alice_wonder", "password123")
    if auth_user:
        print(f"   Authenticated: {auth_user.username}")
    print()

    # 7. Delete user
    if user1:
        print(f"7. Deleting user ({user1.id})...")
        success = client.delete_user(user1.id)
        print(f"   Deleted: {success}")
    print()

def demo_connection_management():
    """Demonstrate connection management patterns"""
    print("=== Connection Management Demo ===\n")

    # Using context manager
    print("1. Using context manager:")
    try:
        with ThriftClient() as client:
            if client.is_connected():
                print("   Connected successfully!")
                # Use client here...
    except Exception as e:
        print(f"   Connection failed: {e}")

    # Manual connection management
    print("\n2. Manual connection management:")
    client = ThriftClient()
    try:
        if client.connect():
            print("   Connected successfully!")
            # Use client here...
        else:
            print("   Failed to connect")
    finally:
        client.close()

# Performance testing
def performance_test(num_requests=100):
    """Test client performance"""
    print(f"=== Performance Test ({num_requests} requests) ===\n")

    client = UserServiceClient()

    start_time = time.time()
    success_count = 0

    for i in range(num_requests):
        try:
            # Test list users operation
            response = client.list_users(page=1, page_size=10)
            if response:
                success_count += 1
        except Exception as e:
            print(f"Request {i+1} failed: {e}")

    end_time = time.time()
    elapsed = end_time - start_time

    print(f"Completed {success_count}/{num_requests} requests in {elapsed:.2f} seconds")
    print(f"Average time per request: {(elapsed/num_requests)*1000:.2f} ms")
    print(f"Requests per second: {num_requests/elapsed:.2f}")

if __name__ == '__main__':
    # Run demonstrations
    demo_user_operations()
    print()
    demo_connection_management()
    print()
    # Uncomment to run performance test
    # performance_test(50)

💻 Реализация Сервера Thrift на Java java

🔴 complex ⭐⭐⭐⭐

Реализация сервера Thrift на Java с использованием Netty и интеграцией Spring Boot

⏱️ 30 min 🏷️ thrift, java, server, enterprise, microservices
Prerequisites: Java basics, Maven/Gradle, Thrift IDL understanding
// Apache Thrift Java Server Implementation
// This example shows how to create a Thrift server in Java

package com.example.user.service;

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

// Service implementation
public class UserServiceHandler implements UserService.Iface {

    // In-memory database simulation
    private final Map<Long, User> userDatabase = new ConcurrentHashMap<>();
    private final AtomicLong idGenerator = new AtomicLong(1);

    public UserServiceHandler() {
        initializeSampleData();
    }

    private void initializeSampleData() {
        // Add sample users
        User user1 = new User();
        user1.setId(idGenerator.getAndIncrement());
        user1.setUsername("john_doe");
        user1.setEmail("[email protected]");
        user1.setFirstName("John");
        user1.setLastName("Doe");
        user1.setAge(30);
        user1.setIsActive(true);
        user1.setCreatedAt(new Date().toString());
        userDatabase.put(user1.getId(), user1);

        User user2 = new User();
        user2.setId(idGenerator.getAndIncrement());
        user2.setUsername("jane_smith");
        user2.setEmail("[email protected]");
        user2.setFirstName("Jane");
        user2.setLastName("Smith");
        user2.setAge(25);
        user2.setIsActive(true);
        user2.setCreatedAt(new Date().toString());
        userDatabase.put(user2.getId(), user2);
    }

    @Override
    public User getUser(long userId) throws UserException, TException {
        User user = userDatabase.get(userId);
        if (user == null) {
            throw new UserException(ErrorType.USER_NOT_FOUND,
                                   "User with ID " + userId + " not found");
        }
        return user;
    }

    @Override
    public User createUser(CreateUserRequest request)
            throws ValidationException, UserException, TException {

        // Validate username
        if (request.getUsername() == null || request.getUsername().length() < 3) {
            throw new ValidationException("username",
                                        "Username must be at least 3 characters");
        }

        // Check for duplicate username
        for (User existingUser : userDatabase.values()) {
            if (existingUser.getUsername().equals(request.getUsername())) {
                throw new UserException(ErrorType.INVALID_INPUT,
                                       "Username already exists");
            }
        }

        // Create new user
        User user = new User();
        user.setId(idGenerator.getAndIncrement());
        user.setUsername(request.getUsername());
        user.setEmail(request.getEmail());
        user.setFirstName(request.getFirstName());
        user.setLastName(request.getLastName());
        user.setAge(request.getAge());
        user.setIsActive(true);
        user.setCreatedAt(new Date().toString());

        userDatabase.put(user.getId(), user);

        return user;
    }

    @Override
    public User updateUser(UpdateUserRequest request)
            throws ValidationException, UserException, TException {

        User user = userDatabase.get(request.getId());
        if (user == null) {
            throw new UserException(ErrorType.USER_NOT_FOUND,
                                   "User with ID " + request.getId() + " not found");
        }

        // Update fields if provided
        if (request.isSetUsername()) {
            if (request.getUsername().length() < 3) {
                throw new ValidationException("username",
                                            "Username must be at least 3 characters");
            }
            user.setUsername(request.getUsername());
        }

        if (request.isSetEmail()) {
            user.setEmail(request.getEmail());
        }
        if (request.isSetFirstName()) {
            user.setFirstName(request.getFirstName());
        }
        if (request.isSetLastName()) {
            user.setLastName(request.getLastName());
        }
        if (request.isSetAge()) {
            user.setAge(request.getAge());
        }
        if (request.isSetIsActive()) {
            user.setIsActive(request.isIsActive());
        }

        user.setUpdatedAt(new Date().toString());

        return user;
    }

    @Override
    public boolean deleteUser(long userId) throws UserException, TException {
        User user = userDatabase.get(userId);
        if (user == null) {
            throw new UserException(ErrorType.USER_NOT_FOUND,
                                   "User with ID " + userId + " not found");
        }

        userDatabase.remove(userId);
        return true;
    }

    @Override
    public ListUsersResponse listUsers(int page, int pageSize) throws TException {
        List<User> allUsers = new ArrayList<>(userDatabase.values());

        // Sort by ID for consistent ordering
        allUsers.sort(Comparator.comparingLong(User::getId));

        // Pagination
        int startIndex = (page - 1) * pageSize;
        int endIndex = Math.min(startIndex + pageSize, allUsers.size());

        List<User> paginatedUsers = allUsers.subList(startIndex, endIndex);

        ListUsersResponse response = new ListUsersResponse();
        response.setUsers(paginatedUsers);
        response.setTotalCount(allUsers.size());
        response.setPage(page);
        response.setPageSize(pageSize);

        return response;
    }

    @Override
    public List<User> getUsersByStatus(UserStatus status) throws TException {
        List<User> filteredUsers = new ArrayList<>();
        boolean activeStatus = (status == UserStatus.ACTIVE);

        for (User user : userDatabase.values()) {
            if (user.isIsActive() == activeStatus) {
                filteredUsers.add(user);
            }
        }

        return filteredUsers;
    }

    @Override
    public List<User> searchUsers(String query, int limit) throws TException {
        List<User> matchingUsers = new ArrayList<>();
        String queryLower = query.toLowerCase();

        for (User user : userDatabase.values()) {
            if (user.getUsername().toLowerCase().contains(queryLower) ||
                (user.getEmail() != null && user.getEmail().toLowerCase().contains(queryLower)) ||
                (user.getFirstName() != null && user.getFirstName().toLowerCase().contains(queryLower)) ||
                (user.getLastName() != null && user.getLastName().toLowerCase().contains(queryLower))) {
                matchingUsers.add(user);

                if (matchingUsers.size() >= limit) {
                    break;
                }
            }
        }

        return matchingUsers;
    }

    @Override
    public User authenticateUser(String username, String password)
            throws UserException, TException {

        // Simple mock authentication
        for (User user : userDatabase.values()) {
            if (user.getUsername().equals(username) && user.isIsActive()) {
                // In real implementation, verify password hash
                return user;
            }
        }

        throw new UserException(ErrorType.USER_NOT_FOUND, "Invalid username or password");
    }
}

// Server factory class
public class ThriftServerFactory {

    // Simple server for development
    public static TServer createSimpleServer(int port, UserService.Iface handler)
            throws TTransportException {

        TServerSocket serverTransport = new TServerSocket(port);
        TProcessor processor = new UserService.Processor<>(handler);
        TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();

        TServer.Args serverArgs = new TServer.Args(serverTransport)
                .processor(processor)
                .protocolFactory(protocolFactory);

        return new TSimpleServer(serverArgs);
    }

    // Thread pool server for better performance
    public static TServer createThreadPoolServer(int port, UserService.Iface handler,
                                               int maxWorkerThreads) throws TTransportException {

        TServerSocket serverTransport = new TServerSocket(port);
        TProcessor processor = new UserService.Processor<>(handler);
        TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();

        TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport)
                .processor(processor)
                .protocolFactory(protocolFactory)
                .maxWorkerThreads(maxWorkerThreads)
                .minWorkerThreads(5);

        return new TThreadPoolServer(serverArgs);
    }
}

// Main application class
public class UserServiceApplication {

    private static final int DEFAULT_PORT = 9090;
    private static final int DEFAULT_WORKER_THREADS = 20;

    public static void main(String[] args) {
        int port = DEFAULT_PORT;
        boolean useThreadPool = true;

        // Parse command line arguments
        for (String arg : args) {
            if (arg.startsWith("--port=")) {
                port = Integer.parseInt(arg.substring(7));
            } else if (arg.equals("--simple")) {
                useThreadPool = false;
            } else if (arg.equals("--help")) {
                printUsage();
                return;
            }
        }

        try {
            // Create service handler
            UserServiceHandler handler = new UserServiceHandler();

            // Create server
            TServer server;
            if (useThreadPool) {
                System.out.println("Creating thread pool server...");
                server = ThriftServerFactory.createThreadPoolServer(
                    port, handler, DEFAULT_WORKER_THREADS);
            } else {
                System.out.println("Creating simple server...");
                server = ThriftServerFactory.createSimpleServer(port, handler);
            }

            // Start server
            System.out.println("Starting Thrift User Service Server...");
            System.out.println("Server listening on port: " + port);
            System.out.println("Server type: " + (useThreadPool ? "Thread Pool" : "Simple"));
            System.out.println("Press Ctrl+C to stop...");

            // Add shutdown hook
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("\nShutting down server...");
                server.stop();
            }));

            // Serve requests
            server.serve();

        } catch (TTransportException e) {
            System.err.println("Failed to start server: " + e.getMessage());
            e.printStackTrace();
            System.exit(1);
        }
    }

    private static void printUsage() {
        System.out.println("Usage: java UserServiceApplication [options]");
        System.out.println("Options:");
        System.out.println("  --port=<number>    Server port (default: 9090)");
        System.out.println("  --simple           Use simple server instead of thread pool");
        System.out.println("  --help             Show this help message");
    }
}

// Configuration class for server settings
class ThriftServerConfig {
    private int port = 9090;
    private boolean useThreadPool = true;
    private int maxWorkerThreads = 20;
    private int minWorkerThreads = 5;
    private int clientTimeout = 5000; // milliseconds
    private boolean enableLogging = true;

    // Getters and setters
    public int getPort() { return port; }
    public void setPort(int port) { this.port = port; }

    public boolean isUseThreadPool() { return useThreadPool; }
    public void setUseThreadPool(boolean useThreadPool) { this.useThreadPool = useThreadPool; }

    public int getMaxWorkerThreads() { return maxWorkerThreads; }
    public void setMaxWorkerThreads(int maxWorkerThreads) { this.maxWorkerThreads = maxWorkerThreads; }

    public int getMinWorkerThreads() { return minWorkerThreads; }
    public void setMinWorkerThreads(int minWorkerThreads) { this.minWorkerThreads = minWorkerThreads; }

    public int getClientTimeout() { return clientTimeout; }
    public void setClientTimeout(int clientTimeout) { this.clientTimeout = clientTimeout; }

    public boolean isEnableLogging() { return enableLogging; }
    public void setEnableLogging(boolean enableLogging) { this.enableLogging = enableLogging; }
}

💻 Реализация Асинхронного Клиента Thrift python

🔴 complex ⭐⭐⭐⭐

Реализация асинхронного клиента Thrift для неблокирующих операций и высокой производительности

⏱️ 35 min 🏷️ thrift, async, python, performance, microservices
Prerequisites: Python async/await, Thrift basics, Concurrency concepts
# Apache Thrift Async Client Implementation
# Demonstrates non-blocking Thrift operations using async/await

import asyncio
import time
from typing import List, Optional, Callable
from concurrent.futures import ThreadPoolExecutor
from functools import wraps

# Async wrapper for synchronous Thrift client
class AsyncThriftClient:
    def __init__(self, host='localhost', port=9090, timeout=5000, max_connections=10):
        self.host = host
        self.port = port
        self.timeout = timeout
        self.max_connections = max_connections
        self.connection_pool = asyncio.Queue(maxsize=max_connections)
        self.executor = ThreadPoolExecutor(max_workers=max_connections)
        self._pool_lock = asyncio.Lock()
        self._created_connections = 0

    async def _create_connection(self):
        """Create a new Thrift connection"""
        def _sync_create():
            # Thrift imports would go here
            # from user_service import UserService
            # from thrift.transport import TSocket, TTransport
            # from thrift.protocol import TBinaryProtocol

            # Mock implementation for demonstration
            return {
                'client': 'MockThriftClient',
                'transport': 'MockTransport',
                'created_at': time.time()
            }

        loop = asyncio.get_event_loop()
        connection = await loop.run_in_executor(self.executor, _sync_create)
        return connection

    async def _get_connection(self):
        """Get connection from pool or create new one"""
        try:
            # Try to get existing connection from pool
            connection = self.connection_pool.get_nowait()

            # Validate connection is still valid
            if time.time() - connection['created_at'] < 300:  # 5 minute timeout
                return connection
            else:
                # Connection expired, close and create new
                await self._close_connection(connection)

        except asyncio.QueueEmpty:
            pass

        # Create new connection if we haven't reached max
        async with self._pool_lock:
            if self._created_connections < self.max_connections:
                connection = await self._create_connection()
                self._created_connections += 1
                return connection

        # If we're here, all connections are in use, wait for one
        connection = await self.connection_pool.get()
        return connection

    async def _close_connection(self, connection):
        """Close a Thrift connection"""
        def _sync_close(conn):
            # Close Thrift transport
            # conn['transport'].close()
            pass

        loop = asyncio.get_event_loop()
        await loop.run_in_executor(self.executor, _sync_close, connection)

        async with self._pool_lock:
            self._created_connections -= 1

    async def _return_connection(self, connection):
        """Return connection to pool"""
        try:
            self.connection_pool.put_nowait(connection)
        except asyncio.QueueFull:
            # Pool is full, close the connection
            await self._close_connection(connection)

    async def close_all(self):
        """Close all connections and cleanup"""
        while not self.connection_pool.empty():
            try:
                connection = self.connection_pool.get_nowait()
                await self._close_connection(connection)
            except asyncio.QueueEmpty:
                break

        self.executor.shutdown(wait=True)

    async def execute_thrift_call(self, method_name: str, *args, **kwargs):
        """Execute a Thrift method asynchronously"""
        connection = None
        try:
            # Get connection
            connection = await self._get_connection()

            # Execute method in thread pool
            def _sync_execute():
                client = connection['client']
                method = getattr(client, method_name)
                return method(*args, **kwargs)

            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(self.executor, _sync_execute)

            return result

        except Exception as e:
            print(f"Error executing {method_name}: {e}")
            raise
        finally:
            if connection:
                await self._return_connection(connection)

# Async user service client
class AsyncUserServiceClient:
    def __init__(self, host='localhost', port=9090):
        self.thrift_client = AsyncThriftClient(host, port)

    async def create_user(self, username, email=None, first_name=None,
                         last_name=None, age=None):
        """Create a new user asynchronously"""
        try:
            # Prepare request
            request = {
                'username': username,
                'email': email,
                'first_name': first_name,
                'last_name': last_name,
                'age': age
            }

            # Execute async call
            user = await self.thrift_client.execute_thrift_call(
                'createUser', request)

            print(f"User created asynchronously: {user['username']} (ID: {user['id']})")
            return user

        except Exception as e:
            print(f"Failed to create user: {e}")
            return None

    async def get_user(self, user_id):
        """Get user by ID asynchronously"""
        try:
            response = await self.thrift_client.execute_thrift_call('getUser', user_id)
            print(f"User found: {response['user']['username']}")
            return response['user']

        except Exception as e:
            if 'USER_NOT_FOUND' in str(e):
                print(f"User with ID {user_id} not found")
            else:
                print(f"Error getting user: {e}")
            return None

    async def get_multiple_users(self, user_ids: List[int]):
        """Get multiple users concurrently"""
        tasks = [self.get_user(user_id) for user_id in user_ids]
        users = await asyncio.gather(*tasks, return_exceptions=True)

        # Filter out exceptions and None values
        valid_users = [user for user in users if isinstance(user, dict)]

        print(f"Retrieved {len(valid_users)} out of {len(user_ids)} requested users")
        return valid_users

    async def update_user(self, user_id, **kwargs):
        """Update user asynchronously"""
        try:
            request = {'id': user_id}
            request.update(kwargs)

            user = await self.thrift_client.execute_thrift_call('updateUser', request)
            print(f"User updated: {user['username']}")
            return user

        except Exception as e:
            print(f"Failed to update user: {e}")
            return None

    async def delete_user(self, user_id):
        """Delete user asynchronously"""
        try:
            success = await self.thrift_client.execute_thrift_call('deleteUser', user_id)
            if success:
                print(f"User {user_id} deleted successfully")
            return success

        except Exception as e:
            print(f"Failed to delete user: {e}")
            return False

    async def list_users(self, page=1, page_size=10):
        """List users with pagination asynchronously"""
        try:
            response = await self.thrift_client.execute_thrift_call(
                'listUsers', page, page_size)

            print(f"Found {response['total_count']} users")
            return response

        except Exception as e:
            print(f"Failed to list users: {e}")
            return None

    async def search_users(self, query, limit=10):
        """Search users asynchronously"""
        try:
            users = await self.thrift_client.execute_thrift_call(
                'searchUsers', query, limit)

            print(f"Found {len(users)} users matching '{query}'")
            return users

        except Exception as e:
            print(f"Failed to search users: {e}")
            return []

    async def close(self):
        """Close the async client"""
        await self.thrift_client.close_all()

# Batch operations
class AsyncBatchOperations:
    def __init__(self, client: AsyncUserServiceClient):
        self.client = client

    async def create_multiple_users(self, user_data_list):
        """Create multiple users concurrently"""
        print(f"Creating {len(user_data_list)} users concurrently...")

        tasks = []
        for user_data in user_data_list:
            task = self.client.create_user(**user_data)
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Count successes and failures
        successful = [r for r in results if isinstance(r, dict)]
        failures = [r for r in results if isinstance(r, Exception)]

        print(f"Created {len(successful)} users successfully")
        print(f"Failed to create {len(failures)} users")

        return successful

    async def bulk_update_users(self, updates):
        """Update multiple users concurrently"""
        print(f"Updating {len(updates)} users concurrently...")

        tasks = []
        for user_id, update_data in updates:
            task = self.client.update_user(user_id, **update_data)
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        successful = [r for r in results if isinstance(r, dict)]
        print(f"Updated {len(successful)} users successfully")

        return successful

    async def process_user_pipeline(self, user_ids):
        """Process a pipeline of operations on users"""
        print(f"Processing pipeline for {len(user_ids)} users...")

        # Step 1: Get all users
        users = await self.client.get_multiple_users(user_ids)

        # Step 2: Process each user (simulate some processing)
        processed_users = []
        for user in users:
            # Simulate processing time
            await asyncio.sleep(0.1)

            # Add processed flag
            user['processed'] = True
            processed_users.append(user)

        # Step 3: Update processed users
        update_tasks = []
        for user in processed_users:
            task = self.client.update_user(user['id'], processed=True)
            update_tasks.append(task)

        await asyncio.gather(*update_tasks, return_exceptions=True)

        print(f"Pipeline completed for {len(processed_users)} users")
        return processed_users

# Rate limiting decorator
def rate_limit(max_calls_per_second: float):
    """Rate limit async function calls"""
    def decorator(func):
        calls = []

        @wraps(func)
        async def wrapper(*args, **kwargs):
            now = time.time()

            # Remove old calls outside the time window
            calls[:] = [call_time for call_time in calls if now - call_time < 1.0]

            # Check if we've exceeded the rate limit
            if len(calls) >= max_calls_per_second:
                sleep_time = 1.0 - (now - calls[0])
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)

            calls.append(now)
            return await func(*args, **kwargs)

        return wrapper
    return decorator

# Rate-limited client wrapper
class RateLimitedUserServiceClient(AsyncUserServiceClient):
    @rate_limit(10)  # Max 10 calls per second
    async def create_user(self, *args, **kwargs):
        return await super().create_user(*args, **kwargs)

    @rate_limit(20)  # Max 20 calls per second
    async def get_user(self, *args, **kwargs):
        return await super().get_user(*args, **kwargs)

# Demo and testing functions
async def demo_async_operations():
    """Demonstrate async operations"""
    client = AsyncUserServiceClient()

    try:
        print("=== Async Thrift Client Demo ===\n")

        # 1. Create multiple users concurrently
        print("1. Creating users concurrently...")
        user_data = [
            {'username': 'async_user1', 'email': '[email protected]', 'first_name': 'Alice'},
            {'username': 'async_user2', 'email': '[email protected]', 'first_name': 'Bob'},
            {'username': 'async_user3', 'email': '[email protected]', 'first_name': 'Charlie'},
        ]

        batch_ops = AsyncBatchOperations(client)
        created_users = await batch_ops.create_multiple_users(user_data)

        # 2. Get multiple users concurrently
        if created_users:
            user_ids = [user['id'] for user in created_users[:2]]
            print("\n2. Getting users concurrently...")
            retrieved_users = await client.get_multiple_users(user_ids)

        # 3. Search users
        print("\n3. Searching users...")
        search_results = await client.search_users("async_user", limit=5)

        # 4. Process pipeline
        if created_users:
            user_ids = [user['id'] for user in created_users]
            print("\n4. Processing user pipeline...")
            await batch_ops.process_user_pipeline(user_ids)

    finally:
        await client.close()

async def demo_rate_limiting():
    """Demonstrate rate limiting"""
    client = RateLimitedUserServiceClient()

    try:
        print("\n=== Rate Limiting Demo ===\n")
        print("Creating users with rate limiting (max 10/sec)...")

        start_time = time.time()

        # Create 15 users - should take at least 1.5 seconds due to rate limiting
        tasks = []
        for i in range(15):
            task = client.create_user(f'rate_limited_user_{i}', f'user{i}@example.com')
            tasks.append(task)

        await asyncio.gather(*tasks, return_exceptions=True)

        elapsed = time.time() - start_time
        print(f"Created 15 users in {elapsed:.2f} seconds (rate limited)")

    finally:
        await client.close()

async def performance_test(num_concurrent_requests=50):
    """Test async client performance"""
    client = AsyncUserServiceClient()

    try:
        print(f"\n=== Performance Test ({num_concurrent_requests} concurrent requests) ===\n")

        start_time = time.time()

        # Create concurrent tasks
        tasks = []
        for i in range(num_concurrent_requests):
            task = client.list_users(page=1, page_size=10)
            tasks.append(task)

        # Wait for all to complete
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Count successful requests
        successful = sum(1 for r in results if not isinstance(r, Exception))

        elapsed = time.time() - start_time

        print(f"Completed {successful}/{num_concurrent_requests} requests in {elapsed:.2f} seconds")
        print(f"Requests per second: {num_concurrent_requests/elapsed:.2f}")

    finally:
        await client.close()

if __name__ == '__main__':
    # Run async demos
    asyncio.run(demo_async_operations())
    asyncio.run(demo_rate_limiting())
    asyncio.run(performance_test(20))