Exemplos OpenAI API

Exemplos abrangentes da API OpenAI incluindo modelos GPT, geração de imagens DALL-E, processamento de áudio Whisper e chamadas de função

💻 Integração OpenAI GPT API python

🟡 intermediate ⭐⭐⭐

Uso completo da API GPT com respostas streaming, prompting avançado e otimização de custos

⏱️ 40 min 🏷️ openai, gpt, chat, streaming
Prerequisites: Python, OpenAI API key, HTTP requests
# OpenAI GPT API Integration
# Comprehensive examples with chat completions, streaming, and advanced features

import os
import json
import time
import asyncio
from typing import List, Dict, Any, Optional, Generator
from dataclasses import dataclass
from openai import OpenAI, AsyncOpenAI
import tiktoken
from enum import Enum

# 1. Configuration and Initialization
@dataclass
class OpenAIConfig:
    """Configuration for OpenAI API"""
    api_key: str
    organization: Optional[str] = None
    model: str = "gpt-3.5-turbo"
    temperature: float = 0.7
    max_tokens: int = 1000
    top_p: float = 1.0
    frequency_penalty: float = 0.0
    presence_penalty: float = 0.0

class OpenAIClient:
    """Enhanced OpenAI client with advanced features"""

    def __init__(self, config: OpenAIConfig):
        self.config = config
        self.client = OpenAI(
            api_key=config.api_key,
            organization=config.organization
        )
        self.async_client = AsyncOpenAI(
            api_key=config.api_key,
            organization=config.organization
        )
        self.encoding = tiktoken.encoding_for_model(config.model)

    def count_tokens(self, text: str) -> int:
        """Count tokens in text"""
        return len(self.encoding.encode(text))

    def estimate_cost(self, messages: List[Dict[str, str]]) -> float:
        """Estimate API call cost in USD"""
        total_tokens = 0

        # Count tokens in messages
        for message in messages:
            total_tokens += self.count_tokens(message.get('content', ''))
            total_tokens += self.count_tokens(message.get('role', ''))

        # Pricing (adjust based on current OpenAI pricing)
        # GPT-3.5-turbo: $0.002 per 1K tokens
        # GPT-4: $0.03 per 1K tokens (input) + $0.06 per 1K tokens (output)
        if 'gpt-4' in self.config.model.lower():
            input_cost = (total_tokens * 0.03) / 1000
            output_cost = (self.config.max_tokens * 0.06) / 1000
        else:
            input_cost = (total_tokens * 0.002) / 1000
            output_cost = (self.config.max_tokens * 0.002) / 1000

        return input_cost + output_cost

# 2. Basic Chat Completions
def basic_chat_completion(client: OpenAIClient) -> Dict[str, Any]:
    """Basic chat completion example"""
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain quantum computing in simple terms."}
    ]

    try:
        response = client.client.chat.completions.create(
            model=client.config.model,
            messages=messages,
            temperature=client.config.temperature,
            max_tokens=client.config.max_tokens
        )

        return {
            "response": response.choices[0].message.content,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            },
            "finish_reason": response.choices[0].finish_reason
        }

    except Exception as e:
        return {"error": str(e)}

# 3. Streaming Responses
def stream_chat_completion(client: OpenAIClient, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
    """Streaming chat completion"""
    try:
        stream = client.client.chat.completions.create(
            model=client.config.model,
            messages=messages,
            temperature=client.config.temperature,
            max_tokens=client.config.max_tokens,
            stream=True
        )

        for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

    except Exception as e:
        yield f"Error: {str(e)}"

# 4. Advanced Prompting Strategies
class PromptingStrategies:
    """Advanced prompting techniques for better results"""

    @staticmethod
    def few_shot_prompting(client: OpenAIClient, examples: List[Dict[str, str]], query: str) -> str:
        """Few-shot prompting with examples"""
        messages = [
            {"role": "system", "content": "You are a helpful assistant that follows patterns from examples."}
        ]

        for example in examples:
            messages.append({"role": "user", "content": example["input"]})
            messages.append({"role": "assistant", "content": example["output"]})

        messages.append({"role": "user", "content": query})

        response = client.client.chat.completions.create(
            model=client.config.model,
            messages=messages,
            temperature=0.3,  # Lower temperature for more consistent results
            max_tokens=500
        )

        return response.choices[0].message.content

    @staticmethod
    def chain_of_thought_prompting(client: OpenAIClient, problem: str) -> str:
        """Chain of thought prompting for complex reasoning"""
        prompt = f"""Solve this step by step, showing your reasoning process:

        Problem: {problem}

        Step 1: Understand the problem
        Step 2: Break down into smaller steps
        Step 3: Solve each step
        Step 4: Combine results
        Step 5: Final answer

        Solution:"""

        response = client.client.chat.completions.create(
            model=client.config.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.1,
            max_tokens=1000
        )

        return response.choices[0].message.content

    @staticmethod
    def structured_output_prompting(client: OpenAIClient, prompt: str, output_schema: Dict) -> Dict:
        """Generate structured output using prompting"""
        schema_prompt = f"""
        {prompt}

        Please respond in the following JSON format:
        {json.dumps(output_schema, indent=2)}

        Response:"""

        response = client.client.chat.completions.create(
            model=client.config.model,
            messages=[{"role": "user", "content": schema_prompt}],
            temperature=0.1,
            max_tokens=1000
        )

        try:
            return json.loads(response.choices[0].message.content)
        except json.JSONDecodeError:
            return {"raw_response": response.choices[0].message.content, "error": "Failed to parse JSON"}

# 5. Conversation Management
class ConversationManager:
    """Manage conversations with context and memory"""

    def __init__(self, client: OpenAIClient, max_history: int = 10):
        self.client = client
        self.max_history = max_history
        self.conversations = {}
        self.persona_instructions = {
            "professional": "You are a professional, formal assistant who provides expert advice.",
            "casual": "You are a friendly, casual assistant who speaks in a relaxed manner.",
            "technical": "You are a technical assistant who provides detailed technical explanations.",
            "creative": "You are a creative assistant who thinks outside the box and provides innovative ideas."
        }

    def create_conversation(self, conversation_id: str, persona: str = "professional") -> None:
        """Create a new conversation with specified persona"""
        if conversation_id not in self.conversations:
            self.conversations[conversation_id] = {
                "messages": [
                    {"role": "system", "content": self.persona_instructions.get(persona, self.persona_instructions["professional"])}
                ],
                "persona": persona,
                "created_at": time.time()
            }

    def add_message(self, conversation_id: str, role: str, content: str) -> None:
        """Add message to conversation"""
        if conversation_id not in self.conversations:
            self.create_conversation(conversation_id)

        conversation = self.conversations[conversation_id]
        conversation["messages"].append({"role": role, "content": content})

        # Limit conversation history
        if len(conversation["messages"]) > self.max_history * 2 + 1:  # +1 for system message
            # Keep system message and last max_history*2 messages
            system_msg = conversation["messages"][0]
            recent_msgs = conversation["messages"][-(self.max_history * 2):]
            conversation["messages"] = [system_msg] + recent_msgs

    def get_response(self, conversation_id: str, user_message: str) -> Dict[str, Any]:
        """Get response from AI in conversation context"""
        self.add_message(conversation_id, "user", user_message)

        messages = self.conversations[conversation_id]["messages"]

        try:
            response = self.client.client.chat.completions.create(
                model=self.client.config.model,
                messages=messages,
                temperature=self.client.config.temperature,
                max_tokens=self.client.config.max_tokens
            )

            ai_response = response.choices[0].message.content
            self.add_message(conversation_id, "assistant", ai_response)

            return {
                "response": ai_response,
                "conversation_id": conversation_id,
                "usage": response.usage._asdict() if response.usage else None
            }

        except Exception as e:
            return {"error": str(e), "conversation_id": conversation_id}

    def get_conversation_summary(self, conversation_id: str) -> str:
        """Get summary of conversation"""
        if conversation_id not in self.conversations:
            return "Conversation not found"

        messages = self.conversations[conversation_id]["messages"][1:]  # Exclude system message
        conversation_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages[-10:]])  # Last 10 messages

        summary_prompt = f"""Summarize this conversation in 2-3 sentences:

        {conversation_text}

        Summary:"""

        response = self.client.client.chat.completions.create(
            model=self.client.config.model,
            messages=[{"role": "user", "content": summary_prompt}],
            temperature=0.3,
            max_tokens=150
        )

        return response.choices[0].message.content

# 6. Function Calling
class FunctionCalling:
    """Advanced function calling implementation"""

    def __init__(self, client: OpenAIClient):
        self.client = client
        self.functions = {
            "get_weather": self.get_weather,
            "calculate_age": self.calculate_age,
            "search_web": self.search_web,
            "send_email": self.send_email
        }
        self.function_schemas = {
            "get_weather": {
                "name": "get_weather",
                "description": "Get current weather information for a city",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "city": {"type": "string", "description": "The city name"},
                        "units": {"type": "string", "enum": ["metric", "imperial"], "default": "metric"}
                    },
                    "required": ["city"]
                }
            },
            "calculate_age": {
                "name": "calculate_age",
                "description": "Calculate age from birth date",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "birth_date": {"type": "string", "description": "Birth date in YYYY-MM-DD format"},
                        "current_date": {"type": "string", "description": "Current date in YYYY-MM-DD format (optional)"}
                    },
                    "required": ["birth_date"]
                }
            },
            "search_web": {
                "name": "search_web",
                "description": "Search the web for information",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {"type": "string", "description": "Search query"},
                        "max_results": {"type": "integer", "default": 5}
                    },
                    "required": ["query"]
                }
            },
            "send_email": {
                "name": "send_email",
                "description": "Send an email",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "to": {"type": "string", "description": "Recipient email"},
                        "subject": {"type": "string", "description": "Email subject"},
                        "body": {"type": "string", "description": "Email body"}
                    },
                    "required": ["to", "subject", "body"]
                }
            }
        }

    def get_weather(self, city: str, units: str = "metric") -> Dict[str, Any]:
        """Simulate weather API call"""
        # In production, integrate with real weather API
        weather_data = {
            "city": city,
            "temperature": 22 if units == "metric" else 72,
            "units": units,
            "condition": "Partly cloudy",
            "humidity": 65
        }
        return weather_data

    def calculate_age(self, birth_date: str, current_date: Optional[str] = None) -> Dict[str, Any]:
        """Calculate age from birth date"""
        from datetime import datetime

        birth = datetime.strptime(birth_date, "%Y-%m-%d")
        current = datetime.strptime(current_date, "%Y-%m-%d") if current_date else datetime.now()

        age = current.year - birth.year - ((current.month, current.day) < (birth.month, birth.day))

        return {
            "birth_date": birth_date,
            "current_date": current_date if current_date else datetime.now().strftime("%Y-%m-%d"),
            "age": age,
            "age_in_months": age * 12
        }

    def search_web(self, query: str, max_results: int = 5) -> Dict[str, Any]:
        """Simulate web search"""
        # In production, integrate with real search API
        return {
            "query": query,
            "results": [
                {
                    "title": f"Result for {query}",
                    "url": f"https://example.com/{query.replace(' ', '-')}",
                    "snippet": f"This is a search result for {query}"
                }
            ] * min(max_results, 3)
        }

    def send_email(self, to: str, subject: str, body: str) -> Dict[str, Any]:
        """Simulate sending email"""
        return {
            "to": to,
            "subject": subject,
            "body_length": len(body),
            "status": "queued",
            "sent_at": time.strftime("%Y-%m-%d %H:%M:%S")
        }

    def process_with_functions(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
        """Process messages with function calling"""
        try:
            response = self.client.client.chat.completions.create(
                model=self.client.config.model,
                messages=messages,
                functions=list(self.function_schemas.values()),
                function_call="auto",
                temperature=0.1
            )

            message = response.choices[0].message

            if message.function_call:
                function_name = message.function_call.name
                function_args = json.loads(message.function_call.arguments)

                if function_name in self.functions:
                    function_result = self.functions[function_name](**function_args)

                    # Continue conversation with function result
                    messages.append({
                        "role": "assistant",
                        "content": None,
                        "function_call": {
                            "name": function_name,
                            "arguments": json.dumps(function_args)
                        }
                    })

                    messages.append({
                        "role": "function",
                        "name": function_name,
                        "content": json.dumps(function_result)
                    })

                    # Get final response
                    final_response = self.client.client.chat.completions.create(
                        model=self.client.config.model,
                        messages=messages,
                        temperature=0.1
                    )

                    return {
                        "function_name": function_name,
                        "function_args": function_args,
                        "function_result": function_result,
                        "response": final_response.choices[0].message.content
                    }
                else:
                    return {"error": f"Unknown function: {function_name}"}
            else:
                return {
                    "response": message.content,
                    "function_called": False
                }

        except Exception as e:
            return {"error": str(e)}

# 7. Batch Processing and Optimization
class BatchProcessor:
    """Efficient batch processing for multiple requests"""

    def __init__(self, client: OpenAIClient):
        self.client = client
        self.max_concurrent_requests = 10

    async def process_batch_async(self, prompts: List[str]) -> List[Dict[str, Any]]:
        """Process multiple prompts concurrently"""
        semaphore = asyncio.Semaphore(self.max_concurrent_requests)

        async def process_single_prompt(prompt: str, index: int) -> Dict[str, Any]:
            async with semaphore:
                try:
                    response = await self.client.async_client.chat.completions.create(
                        model=self.client.config.model,
                        messages=[{"role": "user", "content": prompt}],
                        temperature=self.client.config.temperature,
                        max_tokens=self.client.config.max_tokens
                    )

                    return {
                        "index": index,
                        "prompt": prompt,
                        "response": response.choices[0].message.content,
                        "usage": response.usage._asdict() if response.usage else None
                    }

                except Exception as e:
                    return {
                        "index": index,
                        "prompt": prompt,
                        "error": str(e)
                    }

        tasks = [process_single_prompt(prompt, i) for i, prompt in enumerate(prompts)]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Sort results by original index
        sorted_results = sorted(results, key=lambda x: x.get("index", 0))
        return sorted_results

    def rate_limited_request(self, messages: List[Dict[str, str]], requests_per_minute: int = 60) -> Dict[str, Any]:
        """Make rate-limited API requests"""
        start_time = time.time()
        request_count = 0

        try:
            # Check rate limit
            if request_count >= requests_per_minute:
                elapsed_time = time.time() - start_time
                if elapsed_time < 60:
                    sleep_time = 60 - elapsed_time
                    time.sleep(sleep_time)
                    start_time = time.time()
                    request_count = 0

            response = self.client.client.chat.completions.create(
                model=self.client.config.model,
                messages=messages,
                temperature=self.client.config.temperature,
                max_tokens=self.client.config.max_tokens
            )

            request_count += 1

            return {
                "response": response.choices[0].message.content,
                "usage": response.usage._asdict() if response.usage else None
            }

        except Exception as e:
            return {"error": str(e)}

# 8. Error Handling and Retry Logic
import random
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class RobustOpenAIClient:
    """OpenAI client with robust error handling and retry logic"""

    def __init__(self, client: OpenAIClient):
        self.client = client

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        retry=retry_if_exception_type((ConnectionError, TimeoutError))
    )
    def robust_completion(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
        """Robust chat completion with retry logic"""
        try:
            response = self.client.client.chat.completions.create(
                model=self.client.config.model,
                messages=messages,
                temperature=self.client.config.temperature,
                max_tokens=self.client.config.max_tokens,
                timeout=30  # 30 second timeout
            )

            return {
                "response": response.choices[0].message.content,
                "usage": response.usage._asdict() if response.usage else None,
                "finish_reason": response.choices[0].finish_reason
            }

        except Exception as e:
            print(f"Attempt failed: {str(e)}")
            raise

    def exponential_backoff_request(self, messages: List[Dict[str, str]], max_retries: int = 5) -> Dict[str, Any]:
        """Request with exponential backoff"""
        for attempt in range(max_retries):
            try:
                return self.client.client.chat.completions.create(
                    model=self.client.config.model,
                    messages=messages,
                    temperature=self.client.config.temperature,
                    max_tokens=self.client.config.max_tokens
                )
            except Exception as e:
                if attempt == max_retries - 1:
                    return {"error": f"Max retries exceeded: {str(e)}"}

                wait_time = (2 ** attempt) + random.uniform(0, 1)
                time.sleep(wait_time)

# 9. Usage Analytics and Monitoring
class OpenAIAnalytics:
    """Track and analyze OpenAI API usage"""

    def __init__(self):
        self.usage_data = []
        self.error_data = []

    def track_usage(self, request_data: Dict[str, Any], response_data: Dict[str, Any], start_time: float) -> None:
        """Track API usage"""
        usage_record = {
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "request_data": request_data,
            "response_data": response_data,
            "duration": time.time() - start_time,
            "success": "error" not in response_data
        }

        self.usage_data.append(usage_record)

    def track_error(self, error_data: Dict[str, Any]) -> None:
        """Track API errors"""
        error_record = {
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "error": error_data
        }

        self.error_data.append(error_record)

    def get_usage_stats(self) -> Dict[str, Any]:
        """Get usage statistics"""
        if not self.usage_data:
            return {"message": "No usage data available"}

        total_requests = len(self.usage_data)
        successful_requests = sum(1 for record in self.usage_data if record["success"])
        total_duration = sum(record["duration"] for record in self.usage_data)

        return {
            "total_requests": total_requests,
            "successful_requests": successful_requests,
            "success_rate": successful_requests / total_requests * 100,
            "average_response_time": total_duration / total_requests,
            "total_errors": len(self.error_data),
            "usage_by_hour": self._group_usage_by_hour()
        }

    def _group_usage_by_hour(self) -> Dict[str, int]:
        """Group usage by hour"""
        hourly_usage = {}
        for record in self.usage_data:
            hour = record["timestamp"].split(":")[0]
            hourly_usage[hour] = hourly_usage.get(hour, 0) + 1
        return hourly_usage

    def export_data(self, filename: str) -> None:
        """Export usage and error data to file"""
        export_data = {
            "usage_data": self.usage_data,
            "error_data": self.error_data,
            "statistics": self.get_usage_stats()
        }

        with open(filename, 'w') as f:
            json.dump(export_data, f, indent=2)

# 10. Demonstration Functions
def demo_basic_usage():
    """Demonstrate basic OpenAI API usage"""
    print("=== Basic OpenAI API Usage Demo ===")

    config = OpenAIConfig(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-3.5-turbo",
        temperature=0.7
    )

    client = OpenAIClient(config)

    # Basic chat completion
    result = basic_chat_completion(client)
    print(f"Chat Completion Result: {result}")

    # Token counting and cost estimation
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain machine learning in simple terms."}
    ]

    token_count = client.count_tokens(messages[1]["content"])
    estimated_cost = client.estimate_cost(messages)

    print(f"Token count: {token_count}")
    print(f"Estimated cost: ${estimated_cost:.6f}")

def demo_streaming():
    """Demonstrate streaming responses"""
    print("\n=== Streaming Response Demo ===")

    config = OpenAIConfig(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-3.5-turbo"
    )

    client = OpenAIClient(config)
    messages = [{"role": "user", "content": "Write a short poem about programming"}]

    print("Streaming response:")
    for chunk in stream_chat_completion(client, messages):
        print(chunk, end='', flush=True)
    print()

def demo_prompting_strategies():
    """Demonstrate advanced prompting strategies"""
    print("\n=== Prompting Strategies Demo ===")

    config = OpenAIConfig(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-3.5-turbo",
        temperature=0.3
    )

    client = OpenAIClient(config)
    strategies = PromptingStrategies()

    # Few-shot prompting
    examples = [
        {"input": "2 + 2", "output": "4"},
        {"input": "5 * 3", "output": "15"},
        {"input": "10 / 2", "output": "5"}
    ]

    result = strategies.few_shot_prompting(client, examples, "7 + 8")
    print(f"Few-shot result: {result}")

    # Chain of thought
    problem = "If a train travels 300 km in 3 hours, what is its average speed?"
    cot_result = strategies.chain_of_thought_prompting(client, problem)
    print(f"Chain of thought result: {cot_result}")

def demo_function_calling():
    """Demonstrate function calling"""
    print("\n=== Function Calling Demo ===")

    config = OpenAIConfig(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-3.5-turbo"
    )

    client = OpenAIClient(config)
    function_calling = FunctionCalling(client)

    messages = [
        {"role": "user", "content": "What's the weather like in Tokyo and calculate my age if I was born in 1990-05-15"}
    ]

    result = function_calling.process_with_functions(messages)
    print(f"Function calling result: {json.dumps(result, indent=2)}")

def demo_conversation_management():
    """Demonstrate conversation management"""
    print("\n=== Conversation Management Demo ===")

    config = OpenAIConfig(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-3.5-turbo"
    )

    client = OpenAIClient(config)
    conversation_manager = ConversationManager(client)

    # Create conversation
    conv_id = "demo_conv"
    conversation_manager.create_conversation(conv_id, persona="casual")

    # Simulate conversation
    messages = [
        "Hi! Can you help me learn Python?",
        "What should I start with?",
        "Thanks! Can you recommend any good resources?"
    ]

    for msg in messages:
        response = conversation_manager.get_response(conv_id, msg)
        print(f"User: {msg}")
        print(f"Assistant: {response['response'][:100]}...")

    # Get summary
    summary = conversation_manager.get_conversation_summary(conv_id)
    print(f"\nConversation Summary: {summary}")

# Main execution
if __name__ == "__main__":
    # Set environment variable
    os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")

    try:
        demo_basic_usage()
        demo_streaming()
        demo_prompting_strategies()
        demo_function_calling()
        demo_conversation_management()
    except Exception as e:
        print(f"Demo error: {e}")
        print("Make sure to set your OPENAI_API_KEY environment variable")

💻 API DALL-E e Whisper OpenAI python

🔴 complex ⭐⭐⭐⭐

Geração de imagens com DALL-E e processamento de áudio com Whisper incluindo operações batch

⏱️ 45 min 🏷️ openai, dall-e, whisper, image, audio
Prerequisites: Python, OpenAI API key, PIL/Pillow, requests
# OpenAI DALL-E and Whisper API Integration
# Image generation and audio processing with advanced features

import os
import json
import time
import requests
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass
from PIL import Image, ImageDraw, ImageFont
import io
import base64
from pathlib import Path
import asyncio
from openai import OpenAI
import soundfile as sf
import numpy as np
from datetime import datetime

# 1. Configuration and Client Setup
@dataclass
class DalleConfig:
    """Configuration for DALL-E API"""
    api_key: str
    organization: Optional[str] = None
    model: str = "dall-e-3"
    size: str = "1024x1024"  # 1024x1024, 1792x1024, or 1024x1792
    quality: str = "standard"  # standard or hd
    style: str = "vivid"  # vivid or natural
    max_retries: int = 3
    timeout: int = 60

@dataclass
class WhisperConfig:
    """Configuration for Whisper API"""
    api_key: str
    organization: Optional[str] = None
    model: str = "whisper-1"
    response_format: str = "json"  # json, text, srt, verbose_json, or vtt
    language: Optional[str] = None
    temperature: float = 0.0
    timeout: int = 120

class OpenAIMediaClient:
    """Combined client for DALL-E and Whisper APIs"""

    def __init__(self, dalle_config: DalleConfig, whisper_config: WhisperConfig):
        self.dalle_config = dalle_config
        self.whisper_config = whisper_config
        self.client = OpenAI(
            api_key=dalle_config.api_key,
            organization=dalle_config.organization
        )
        self.supported_image_sizes = {
            "dall-e-2": ["256x256", "512x512", "1024x1024"],
            "dall-e-3": ["1024x1024", "1792x1024", "1024x1792"]
        }

    def validate_dalle_params(self, prompt: str, size: Optional[str] = None) -> Tuple[str, str]:
        """Validate DALL-E parameters"""
        if not prompt or len(prompt.strip()) == 0:
            raise ValueError("Prompt cannot be empty")
        if len(prompt) > 4000:  # DALL-E has a 4000 character limit
            raise ValueError("Prompt too long (max 4000 characters)")

        actual_size = size or self.dalle_config.size
        if actual_size not in self.supported_image_sizes.get(self.dalle_config.model, []):
            raise ValueError(f"Invalid size {actual_size} for model {self.dalle_config.model}")

        return prompt.strip(), actual_size

# 2. DALL-E Image Generation
class DALLEGenerator:
    """Advanced DALL-E image generation with features"""

    def __init__(self, client: OpenAIMediaClient):
        self.client = client
        self.generation_history = []

    def generate_image(
        self,
        prompt: str,
        size: Optional[str] = None,
        quality: Optional[str] = None,
        style: Optional[str] = None,
        n: int = 1,
        save_path: Optional[str] = None
    ) -> Dict[str, Any]:
        """Generate images using DALL-E"""

        prompt, actual_size = self.client.validate_dalle_params(prompt, size)

        generation_params = {
            "model": self.client.dalle_config.model,
            "prompt": prompt,
            "size": actual_size,
            "n": min(n, 10)  # DALL-E supports max 10 images
        }

        # Add quality parameter for DALL-E 3
        if self.client.dalle_config.model == "dall-e-3":
            generation_params["quality"] = quality or self.client.dalle_config.quality
            generation_params["style"] = style or self.client.dalle_config.style

        try:
            start_time = time.time()
            response = self.client.client.images.create(**generation_params)
            duration = time.time() - start_time

            # Process response
            images = []
            for i, image_data in enumerate(response.data):
                image_info = {
                    "url": image_data.url,
                    "revised_prompt": image_data.revised_prompt if hasattr(image_data, 'revised_prompt') else None,
                    "index": i,
                    "size": actual_size,
                    "quality": generation_params.get("quality"),
                    "style": generation_params.get("style")
                }

                # Download and save image if path provided
                if save_path:
                    image_path = self._download_image(
                        image_data.url,
                        f"{save_path}_image_{i}_{int(time.time())}.png"
                    )
                    image_info["local_path"] = image_path

                images.append(image_info)

            # Track generation
            generation_record = {
                "timestamp": datetime.now().isoformat(),
                "prompt": prompt,
                "params": generation_params,
                "duration": duration,
                "image_count": len(images),
                "success": True
            }
            self.generation_history.append(generation_record)

            return {
                "images": images,
                "prompt": prompt,
                "params": generation_params,
                "duration": duration,
                "success": True
            }

        except Exception as e:
            error_record = {
                "timestamp": datetime.now().isoformat(),
                "prompt": prompt,
                "error": str(e),
                "success": False
            }
            self.generation_history.append(error_record)

            return {
                "prompt": prompt,
                "error": str(e),
                "success": False
            }

    def _download_image(self, url: str, filename: str) -> str:
        """Download image from URL"""
        response = requests.get(url)
        response.raise_for_status()

        # Ensure directory exists
        Path(filename).parent.mkdir(parents=True, exist_ok=True)

        with open(filename, 'wb') as f:
            f.write(response.content)

        return filename

    def generate_variations(
        self,
        image_path: str,
        size: Optional[str] = None,
        n: int = 1,
        save_path: Optional[str] = None
    ) -> Dict[str, Any]:
        """Generate variations from an existing image"""

        if not os.path.exists(image_path):
            raise ValueError(f"Image file not found: {image_path}")

        try:
            with open(image_path, 'rb') as image_file:
                response = self.client.client.images.create_variations(
                    image=image_file,
                    size=size or self.client.dalle_config.size,
                    n=min(n, 10)
                )

            # Process variations
            variations = []
            for i, image_data in enumerate(response.data):
                variation_info = {
                    "url": image_data.url,
                    "index": i,
                    "source_image": image_path
                }

                if save_path:
                    variation_path = self._download_image(
                        image_data.url,
                        f"{save_path}_variation_{i}_{int(time.time())}.png"
                    )
                    variation_info["local_path"] = variation_path

                variations.append(variation_info)

            return {
                "variations": variations,
                "source_image": image_path,
                "success": True
            }

        except Exception as e:
            return {
                "source_image": image_path,
                "error": str(e),
                "success": False
            }

    def edit_image(
        self,
        image_path: str,
        mask_path: Optional[str] = None,
        prompt: str,
        size: Optional[str] = None,
        n: int = 1,
        save_path: Optional[str] = None
    ) -> Dict[str, Any]:
        """Edit an image using a mask"""

        if not os.path.exists(image_path):
            raise ValueError(f"Image file not found: {image_path}")

        try:
            with open(image_path, 'rb') as image_file:
                files = {"image": image_file}

                if mask_path and os.path.exists(mask_path):
                    with open(mask_path, 'rb') as mask_file:
                        files["mask"] = mask_file
                else:
                    mask_path = None  # If mask doesn't exist, don't use it

                response = self.client.client.images.edit(
                    image=image_file,
                    mask=mask_file if mask_path else None,
                    prompt=prompt,
                    size=size or self.client.dalle_config.size,
                    n=min(n, 10)
                )

            # Process edited images
            edits = []
            for i, image_data in enumerate(response.data):
                edit_info = {
                    "url": image_data.url,
                    "index": i,
                    "prompt": prompt,
                    "source_image": image_path,
                    "mask_used": mask_path is not None
                }

                if save_path:
                    edit_path = self._download_image(
                        image_data.url,
                        f"{save_path}_edit_{i}_{int(time.time())}.png"
                    )
                    edit_info["local_path"] = edit_path

                edits.append(edit_info)

            return {
                "edits": edits,
                "prompt": prompt,
                "source_image": image_path,
                "mask_used": mask_path is not None,
                "success": True
            }

        except Exception as e:
            return {
                "prompt": prompt,
                "source_image": image_path,
                "error": str(e),
                "success": False
            }

    def create_image_grid(
        self,
        prompts: List[str],
        grid_size: Tuple[int, int] = (2, 2),
        save_path: Optional[str] = None
    ) -> Dict[str, Any]:
        """Generate and combine multiple images into a grid"""

        if len(prompts) != grid_size[0] * grid_size[1]:
            raise ValueError(f"Number of prompts ({len(prompts)}) doesn't match grid size ({grid_size[0]}x{grid_size[1]})")

        try:
            # Generate individual images
            generated_images = []
            for i, prompt in enumerate(prompts):
                result = self.generate_image(prompt, n=1)
                if result["success"] and result["images"]:
                    generated_images.append(result["images"][0]["url"])
                else:
                    raise ValueError(f"Failed to generate image for prompt {i+1}")

            # Download and combine images
            image_objects = []
            for url in generated_images:
                response = requests.get(url)
                img = Image.open(io.BytesIO(response.content))
                image_objects.append(img)

            # Create grid
            width, height = image_objects[0].size
            grid_image = Image.new('RGB', (width * grid_size[1], height * grid_size[0]))
            draw = ImageDraw.Draw(grid_image)

            for i, img in enumerate(image_objects):
                row = i // grid_size[1]
                col = i % grid_size[1]
                grid_image.paste(img, (col * width, row * height))

                # Add border
                draw.rectangle(
                    [col * width, row * height, (col + 1) * width, (row + 1) * height],
                    outline='black',
                    width=2
                )

            # Save grid
            if save_path:
                Path(save_path).parent.mkdir(parents=True, exist_ok=True)
                grid_image.save(save_path)
                local_path = save_path
            else:
                local_path = f"grid_{int(time.time())}.png"
                grid_image.save(local_path)

            return {
                "grid_path": local_path,
                "prompts": prompts,
                "grid_size": grid_size,
                "success": True
            }

        except Exception as e:
            return {
                "prompts": prompts,
                "error": str(e),
                "success": False
            }

# 3. Whisper Audio Processing
class WhisperProcessor:
    """Advanced Whisper audio processing with features"""

    def __init__(self, client: OpenAIMediaClient):
        self.client = client
        self.transcription_history = []
        self.supported_formats = ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"]

    def transcribe_audio(
        self,
        audio_path: str,
        language: Optional[str] = None,
        response_format: Optional[str] = None,
        temperature: Optional[float] = None,
        save_transcript: Optional[str] = None
    ) -> Dict[str, Any]:
        """Transcribe audio file using Whisper"""

        if not os.path.exists(audio_path):
            raise ValueError(f"Audio file not found: {audio_path}")

        # Check file format
        file_ext = Path(audio_path).suffix.lower().lstrip('.')
        if file_ext not in self.supported_formats:
            raise ValueError(f"Unsupported audio format: {file_ext}")

        try:
            start_time = time.time()

            with open(audio_path, 'rb') as audio_file:
                transcription_params = {
                    "model": self.client.whisper_config.model,
                    "file": audio_file,
                    "response_format": response_format or self.client.whisper_config.response_format
                }

                if language:
                    transcription_params["language"] = language
                if temperature is not None:
                    transcription_params["temperature"] = temperature

                response = self.client.client.audio.transcriptions.create(**transcription_params)

            duration = time.time() - start_time

            # Process response based on format
            if response_format == "json" or response_format is None:
                result = {
                    "text": response.text,
                    "language": getattr(response, 'language', None),
                    "duration": getattr(response, 'duration', None)
                }
            elif response_format == "verbose_json":
                result = {
                    "text": response.text,
                    "language": response.language,
                    "duration": response.duration,
                    "words": response.words,
                    "segments": response.segments
                }
            else:  # text, srt, vtt
                result = {"text": response}

            # Add metadata
            result.update({
                "audio_path": audio_path,
                "params": transcription_params,
                "processing_time": duration,
                "timestamp": datetime.now().isoformat()
            })

            # Save transcript if path provided
            if save_transcript:
                self._save_transcript(result["text"], save_transcript)
                result["transcript_path"] = save_transcript

            # Track transcription
            transcription_record = {
                "timestamp": datetime.now().isoformat(),
                "audio_path": audio_path,
                "duration": duration,
                "success": True,
                "language": result.get("language")
            }
            self.transcription_history.append(transcription_record)

            result["success"] = True
            return result

        except Exception as e:
            error_record = {
                "timestamp": datetime.now().isoformat(),
                "audio_path": audio_path,
                "error": str(e),
                "success": False
            }
            self.transcription_history.append(error_record)

            return {
                "audio_path": audio_path,
                "error": str(e),
                "success": False
            }

    def translate_audio(
        self,
        audio_path: str,
        response_format: Optional[str] = None,
        temperature: Optional[float] = None,
        save_transcript: Optional[str] = None
    ) -> Dict[str, Any]:
        """Translate audio to English using Whisper"""

        if not os.path.exists(audio_path):
            raise ValueError(f"Audio file not found: {audio_path}")

        try:
            start_time = time.time()

            with open(audio_path, 'rb') as audio_file:
                translation_params = {
                    "model": self.client.whisper_config.model,
                    "file": audio_file,
                    "response_format": response_format or self.client.whisper_config.response_format
                }

                if temperature is not None:
                    translation_params["temperature"] = temperature

                response = self.client.client.audio.translations.create(**translation_params)

            duration = time.time() - start_time

            # Process response
            if response_format == "json" or response_format is None:
                result = {
                    "text": response.text,
                    "duration": getattr(response, 'duration', None)
                }
            else:  # text, srt, vtt
                result = {"text": response}

            # Add metadata
            result.update({
                "audio_path": audio_path,
                "operation": "translation",
                "params": translation_params,
                "processing_time": duration,
                "timestamp": datetime.now().isoformat()
            })

            # Save translation if path provided
            if save_transcript:
                self._save_transcript(result["text"], save_transcript)
                result["translation_path"] = save_transcript

            result["success"] = True
            return result

        except Exception as e:
            return {
                "audio_path": audio_path,
                "operation": "translation",
                "error": str(e),
                "success": False
            }

    def _save_transcript(self, text: str, file_path: str) -> None:
        """Save transcript to file"""
        Path(file_path).parent.mkdir(parents=True, exist_ok=True)

        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(text)

    def batch_transcribe(
        self,
        audio_files: List[str],
        language: Optional[str] = None,
        max_concurrent: int = 3
    ) -> List[Dict[str, Any]]:
        """Transcribe multiple audio files concurrently"""

        semaphore = asyncio.Semaphore(max_concurrent)

        async def transcribe_single(audio_path: str) -> Dict[str, Any]:
            async with semaphore:
                loop = asyncio.get_event_loop()
                return await loop.run_in_executor(
                    None,
                    self.transcribe_audio,
                    audio_path,
                    language
                )

        async def run_batch():
            tasks = [transcribe_single(audio_path) for audio_path in audio_files]
            return await asyncio.gather(*tasks, return_exceptions=True)

        return asyncio.run(run_batch())

    def process_with_timestamps(
        self,
        audio_path: str,
        language: Optional[str] = None
    ) -> Dict[str, Any]:
        """Process audio with detailed timestamp information"""

        result = self.transcribe_audio(
            audio_path,
            language=language,
            response_format="verbose_json"
        )

        if result["success"] and "segments" in result:
            # Process segments for better analysis
            segments = result["segments"]
            word_count = len(result["words"]) if "words" in result else 0
            avg_words_per_second = word_count / result["duration"] if result["duration"] else 0

            result["analysis"] = {
                "segment_count": len(segments),
                "word_count": word_count,
                "average_words_per_second": avg_words_per_second,
                "first_segment_time": segments[0]["start"] if segments else 0,
                "last_segment_time": segments[-1]["end"] if segments else 0
            }

        return result

# 4. Advanced Image Processing
class ImageProcessor:
    """Advanced image processing utilities"""

    @staticmethod
    def create_mask_from_image(
        image_path: str,
        mask_areas: List[Tuple[int, int, int, int]],  # [(x1, y1, x2, y2), ...]
        output_path: str,
        mask_color: Tuple[int, int, int] = (255, 255, 255)
    ) -> str:
        """Create a mask image from specified areas"""
        img = Image.open(image_path)
        mask = Image.new('RGB', img.size, (0, 0, 0))
        draw = ImageDraw.Draw(mask)

        for x1, y1, x2, y2 in mask_areas:
            draw.rectangle([x1, y1, x2, y2], fill=mask_color)

        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        mask.save(output_path)
        return output_path

    @staticmethod
    def resize_image_with_aspect_ratio(
        image_path: str,
        target_size: Tuple[int, int],
        output_path: str
    ) -> str:
        """Resize image maintaining aspect ratio"""
        img = Image.open(image_path)
        target_width, target_height = target_size

        # Calculate new size maintaining aspect ratio
        original_width, original_height = img.size
        aspect_ratio = original_width / original_height

        if aspect_ratio > target_width / target_height:
            new_width = target_width
            new_height = int(target_width / aspect_ratio)
        else:
            new_height = target_height
            new_width = int(target_height * aspect_ratio)

        resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        resized_img.save(output_path)
        return output_path

    @staticmethod
    def add_watermark(
        image_path: str,
        watermark_text: str,
        output_path: str,
        position: str = "bottom-right"
    ) -> str:
        """Add text watermark to image"""
        img = Image.open(image_path)
        draw = ImageDraw.Draw(img)

        # Try to use a font, fallback to default
        try:
            font = ImageFont.truetype("arial.ttf", 20)
        except:
            font = ImageFont.load_default()

        # Calculate text size and position
        bbox = draw.textbbox((0, 0), watermark_text, font=font)
        text_width = bbox[2] - bbox[0]
        text_height = bbox[3] - bbox[1]

        img_width, img_height = img.size
        margin = 20

        if position == "bottom-right":
            x, y = img_width - text_width - margin, img_height - text_height - margin
        elif position == "bottom-left":
            x, y = margin, img_height - text_height - margin
        elif position == "top-right":
            x, y = img_width - text_width - margin, margin
        else:  # top-left
            x, y = margin, margin

        # Add semi-transparent watermark
        overlay = Image.new('RGBA', img.size, (0, 0, 0, 0))
        overlay_draw = ImageDraw.Draw(overlay)
        overlay_draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, 128))

        # Combine with original image
        watermarked = Image.alpha_composite(img.convert('RGBA'), overlay)
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        watermarked.save(output_path)
        return output_path

# 5. Content Moderation and Safety
class ContentModerator:
    """Content moderation for generated images and transcriptions"""

    @staticmethod
    def check_prompt_safety(prompt: str) -> Dict[str, Any]:
        """Check if prompt contains potentially unsafe content"""
        # This is a basic implementation
        # In production, use proper content moderation APIs

        unsafe_keywords = [
            "violent", "harmful", "illegal", "adult", "nude", "naked",
            "blood", "weapon", "gun", "knife", "death", "kill"
        ]

        prompt_lower = prompt.lower()
        found_keywords = [word for word in unsafe_keywords if word in prompt_lower]

        return {
            "safe": len(found_keywords) == 0,
            "flagged_keywords": found_keywords,
            "confidence": max(0, 1 - len(found_keywords) * 0.2)
        }

    @staticmethod
    def moderate_transcription(transcription: str) -> Dict[str, Any]:
        """Moderate transcription content"""
        # Basic implementation
        personal_info_patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',  # Credit card
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'  # Email
        ]

        import re
        flagged_items = []

        for pattern in personal_info_patterns:
            matches = re.findall(pattern, transcription)
            flagged_items.extend(matches)

        return {
            "contains_personal_info": len(flagged_items) > 0,
            "flagged_items": flagged_items,
            "recommendation": "review" if flagged_items else "safe"
        }

# 6. Usage Analytics
class MediaAnalytics:
    """Track and analyze media generation and processing"""

    def __init__(self):
        self.generation_data = []
        self.transcription_data = []
        self.error_data = []

    def track_generation(self, data: Dict[str, Any]) -> None:
        """Track image generation data"""
        self.generation_data.append(data)

    def track_transcription(self, data: Dict[str, Any]) -> None:
        """Track transcription data"""
        self.transcription_data.append(data)

    def track_error(self, data: Dict[str, Any]) -> None:
        """Track error data"""
        self.error_data.append(data)

    def get_statistics(self) -> Dict[str, Any]:
        """Get comprehensive statistics"""
        return {
            "image_generation": {
                "total_requests": len(self.generation_data),
                "successful_requests": sum(1 for d in self.generation_data if d.get("success", False)),
                "total_images": sum(d.get("image_count", 1) for d in self.generation_data),
                "average_duration": sum(d.get("duration", 0) for d in self.generation_data) / max(len(self.generation_data), 1),
                "popular_sizes": self._get_popular_sizes(),
                "popular_styles": self._get_popular_styles()
            },
            "transcription": {
                "total_requests": len(self.transcription_data),
                "successful_requests": sum(1 for d in self.transcription_data if d.get("success", False)),
                "total_duration": sum(d.get("processing_time", 0) for d in self.transcription_data),
                "languages": self._get_language_distribution()
            },
            "errors": {
                "total_errors": len(self.error_data),
                "error_types": self._get_error_types()
            }
        }

    def _get_popular_sizes(self) -> Dict[str, int]:
        """Get most popular image sizes"""
        sizes = {}
        for d in self.generation_data:
            size = d.get("params", {}).get("size", "unknown")
            sizes[size] = sizes.get(size, 0) + 1
        return sizes

    def _get_popular_styles(self) -> Dict[str, int]:
        """Get most popular image styles"""
        styles = {}
        for d in self.generation_data:
            style = d.get("params", {}).get("style", "unknown")
            styles[style] = styles.get(style, 0) + 1
        return styles

    def _get_language_distribution(self) -> Dict[str, int]:
        """Get language distribution in transcriptions"""
        languages = {}
        for d in self.transcription_data:
            lang = d.get("language", "unknown")
            languages[lang] = languages.get(lang, 0) + 1
        return languages

    def _get_error_types(self) -> Dict[str, int]:
        """Get error type distribution"""
        error_types = {}
        for d in self.error_data:
            error = d.get("error", "unknown error")
            error_types[error] = error_types.get(error, 0) + 1
        return error_types

# 7. Demonstration Functions
def demo_dalle_generation():
    """Demonstrate DALL-E image generation"""
    print("=== DALL-E Image Generation Demo ===")

    dalle_config = DalleConfig(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="dall-e-3",
        size="1024x1024",
        quality="standard",
        style="vivid"
    )

    whisper_config = WhisperConfig(api_key=os.getenv("OPENAI_API_KEY"))
    client = OpenAIMediaClient(dalle_config, whisper_config)
    dalle_gen = DALLEGenerator(client)

    # Basic generation
    result = dalle_gen.generate_image(
        "A futuristic city with flying cars and neon lights, cyberpunk style",
        save_path="./generated_images/cyberpunk_city"
    )
    print(f"Generation result: {result['success']}")
    if result['success']:
        print(f"Generated {len(result['images'])} images")
        for img in result['images']:
            print(f"- Image {img['index']}: {img.get('local_path', img['url'])}")

def demo_whisper_transcription():
    """Demonstrate Whisper transcription"""
    print("\n=== Whisper Transcription Demo ===")

    dalle_config = DalleConfig(api_key=os.getenv("OPENAI_API_KEY"))
    whisper_config = WhisperConfig(api_key=os.getenv("OPENAI_API_KEY"))
    client = OpenAIMediaClient(dalle_config, whisper_config)
    whisper_proc = WhisperProcessor(client)

    # Note: This requires an actual audio file
    audio_path = "sample_audio.mp3"  # Replace with actual audio file path

    if os.path.exists(audio_path):
        result = whisper_proc.transcribe_audio(
            audio_path,
            language="en",
            save_transcript="./transcriptions/sample_transcript.txt"
        )
        print(f"Transcription result: {result['success']}")
        if result['success']:
            print(f"Transcribed text: {result['text'][:200]}...")
    else:
        print(f"Audio file not found: {audio_path}")

def demo_advanced_features():
    """Demonstrate advanced features"""
    print("\n=== Advanced Features Demo ===")

    dalle_config = DalleConfig(api_key=os.getenv("OPENAI_API_KEY"))
    whisper_config = WhisperConfig(api_key=os.getenv("OPENAI_API_KEY"))
    client = OpenAIMediaClient(dalle_config, whisper_config)

    dalle_gen = DALLEGenerator(client)
    moderator = ContentModerator()

    # Test content moderation
    test_prompt = "A beautiful sunset over mountains"
    safety_check = moderator.check_prompt_safety(test_prompt)
    print(f"Prompt safety check: {safety_check}")

    # Generate image grid
    prompts = [
        "A serene lake at dawn",
        "A busy city street at night",
        "A cozy cabin in the woods",
        "A tropical beach paradise"
    ]

    grid_result = dalle_gen.create_image_grid(
        prompts,
        grid_size=(2, 2),
        save_path="./generated_images/scenery_grid.png"
    )
    print(f"Image grid result: {grid_result['success']}")

# Main execution
if __name__ == "__main__":
    # Set environment variable
    os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")

    try:
        demo_dalle_generation()
        demo_whisper_transcription()
        demo_advanced_features()
    except Exception as e:
        print(f"Demo error: {e}")
        print("Make sure to set your OPENAI_API_KEY environment variable")

💻 Embeddings e Similaridade de Texto OpenAI python

🔴 complex ⭐⭐⭐⭐⭐

Análise avançada de embeddings com busca de similaridade, clustering e aplicações semânticas

⏱️ 50 min 🏷️ openai, embeddings, similarity, clustering
Prerequisites: Python, OpenAI API, scikit-learn, numpy, matplotlib
# OpenAI Embeddings and Text Similarity
# Advanced embedding analysis with similarity search, clustering, and applications

import os
import json
import time
import math
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from sklearn.cluster import KMeans, DBSCAN
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns
from openai import OpenAI
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import silhouette_score
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# 1. Configuration and Client Setup
@dataclass
class EmbeddingConfig:
    """Configuration for OpenAI embeddings"""
    api_key: str
    organization: Optional[str] = None
    model: str = "text-embedding-ada-002"
    max_tokens: int = 8192
    batch_size: int = 100
    dimension: int = 1536  # OpenAI embedding dimension

class EmbeddingClient:
    """Enhanced OpenAI embeddings client with advanced features"""

    def __init__(self, config: EmbeddingConfig):
        self.config = config
        self.client = OpenAI(
            api_key=config.api_key,
            organization=config.organization
        )
        self.embedding_cache = {}  # Simple cache for embeddings

    def get_embedding(self, text: str, model: Optional[str] = None) -> List[float]:
        """Get embedding for a single text"""
        # Check cache first
        cache_key = f"{text[:100]}_{model or self.config.model}"
        if cache_key in self.embedding_cache:
            return self.embedding_cache[cache_key]

        try:
            response = self.client.embeddings.create(
                model=model or self.config.model,
                input=text.replace("\n", " ")  # Remove newlines
            )

            embedding = response.data[0].embedding

            # Cache result
            self.embedding_cache[cache_key] = embedding

            return embedding

        except Exception as e:
            print(f"Error getting embedding: {e}")
            return [0.0] * self.config.dimension

    def get_batch_embeddings(
        self,
        texts: List[str],
        model: Optional[str] = None,
        show_progress: bool = True
    ) -> List[List[float]]:
        """Get embeddings for multiple texts efficiently"""
        embeddings = []
        batch_size = self.config.batch_size

        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]

            if show_progress:
                print(f"Processing batch {i//batch_size + 1}/{(len(texts) + batch_size - 1)//batch_size}")

            try:
                response = self.client.embeddings.create(
                    model=model or self.config.model,
                    input=[text.replace("\n", " ") for text in batch]
                )

                batch_embeddings = [item.embedding for item in response.data]
                embeddings.extend(batch_embeddings)

                # Small delay to avoid rate limits
                time.sleep(0.1)

            except Exception as e:
                print(f"Error processing batch {i}: {e}")
                # Add zero embeddings for failed batch
                embeddings.extend([[0.0] * self.config.dimension] * len(batch))

        return embeddings

# 2. Similarity Analysis
class SimilarityAnalyzer:
    """Advanced similarity analysis using embeddings"""

    def __init__(self, embedding_client: EmbeddingClient):
        self.client = embedding_client

    def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """Calculate cosine similarity between two vectors"""
        try:
            vec1_np = np.array(vec1)
            vec2_np = np.array(vec2)

            dot_product = np.dot(vec1_np, vec2_np)
            norm1 = np.linalg.norm(vec1_np)
            norm2 = np.linalg.norm(vec2_np)

            if norm1 == 0 or norm2 == 0:
                return 0.0

            return dot_product / (norm1 * norm2)

        except Exception as e:
            print(f"Error calculating similarity: {e}")
            return 0.0

    def euclidean_distance(self, vec1: List[float], vec2: List[float]) -> float:
        """Calculate Euclidean distance between two vectors"""
        try:
            vec1_np = np.array(vec1)
            vec2_np = np.array(vec2)
            return np.linalg.norm(vec1_np - vec2_np)
        except Exception as e:
            print(f"Error calculating distance: {e}")
            return float('inf')

    def find_most_similar(
        self,
        query_text: str,
        documents: List[str],
        top_k: int = 5,
        similarity_threshold: float = 0.7
    ) -> List[Dict[str, Any]]:
        """Find most similar documents to query"""

        query_embedding = self.client.get_embedding(query_text)
        doc_embeddings = self.client.get_batch_embeddings(documents, show_progress=False)

        similarities = []
        for i, doc_embedding in enumerate(doc_embeddings):
            sim = self.cosine_similarity(query_embedding, doc_embedding)
            similarities.append({
                "document": documents[i],
                "similarity": sim,
                "index": i
            })

        # Sort by similarity and filter by threshold
        similarities.sort(key=lambda x: x["similarity"], reverse=True)
        filtered_similarities = [
            sim for sim in similarities
            if sim["similarity"] >= similarity_threshold
        ]

        return filtered_similarities[:top_k]

    def create_similarity_matrix(
        self,
        texts: List[str]
    ) -> Tuple[np.ndarray, List[str]]:
        """Create similarity matrix for a list of texts"""
        embeddings = self.client.get_batch_embeddings(texts)
        similarity_matrix = cosine_similarity(embeddings)

        return similarity_matrix, texts

    def visualize_similarity_heatmap(
        self,
        texts: List[str],
        title: str = "Text Similarity Heatmap",
        save_path: Optional[str] = None
    ) -> None:
        """Create and save similarity heatmap"""
        similarity_matrix, labels = self.create_similarity_matrix(texts)

        # Truncate labels for better display
        short_labels = [label[:50] + "..." if len(label) > 50 else label for label in labels]

        plt.figure(figsize=(12, 10))
        sns.heatmap(
            similarity_matrix,
            xticklabels=short_labels,
            yticklabels=short_labels,
            annot=True,
            cmap="coolwarm",
            center=0,
            fmt=".2f"
        )
        plt.title(title)
        plt.xticks(rotation=45, ha="right")
        plt.yticks(rotation=0)
        plt.tight_layout()

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches="tight")
        plt.show()

# 3. Text Clustering
class TextClusterer:
    """Advanced text clustering using embeddings"""

    def __init__(self, embedding_client: EmbeddingClient):
        self.client = embedding_client

    def kmeans_clustering(
        self,
        texts: List[str],
        n_clusters: int,
        random_state: int = 42
    ) -> Dict[str, Any]:
        """Perform K-means clustering on texts"""
        embeddings = self.client.get_batch_embeddings(texts)
        embedding_array = np.array(embeddings)

        # Perform K-means clustering
        kmeans = KMeans(
            n_clusters=n_clusters,
            random_state=random_state,
            n_init=10
        )
        cluster_labels = kmeans.fit_predict(embedding_array)

        # Calculate silhouette score
        silhouette_avg = silhouette_score(embedding_array, cluster_labels)

        # Organize results
        clusters = {}
        for i, label in enumerate(cluster_labels):
            if label not in clusters:
                clusters[label] = []
            clusters[label].append({
                "text": texts[i],
                "index": i
            })

        return {
            "clusters": clusters,
            "cluster_labels": cluster_labels.tolist(),
            "cluster_centers": kmeans.cluster_centers_.tolist(),
            "silhouette_score": silhouette_avg,
            "n_clusters": n_clusters
        }

    def dbscan_clustering(
        self,
        texts: List[str],
        eps: float = 0.3,
        min_samples: int = 2
    ) -> Dict[str, Any]:
        """Perform DBSCAN clustering on texts"""
        embeddings = self.client.get_batch_embeddings(texts)
        embedding_array = np.array(embeddings)

        # Convert to cosine distance matrix
        cosine_dist = 1 - cosine_similarity(embedding_array)

        # Perform DBSCAN clustering
        dbscan = DBSCAN(
            eps=eps,
            min_samples=min_samples,
            metric="precomputed"
        )
        cluster_labels = dbscan.fit_predict(cosine_dist)

        # Calculate silhouette score (excluding noise points)
        if len(set(cluster_labels)) > 1 and -1 not in cluster_labels:
            silhouette_avg = silhouette_score(embedding_array, cluster_labels)
        else:
            silhouette_avg = -1

        # Organize results
        clusters = {}
        noise_points = []

        for i, label in enumerate(cluster_labels):
            if label == -1:
                noise_points.append({
                    "text": texts[i],
                    "index": i
                })
            else:
                if label not in clusters:
                    clusters[label] = []
                clusters[label].append({
                    "text": texts[i],
                    "index": i
                })

        return {
            "clusters": clusters,
            "noise_points": noise_points,
            "cluster_labels": cluster_labels.tolist(),
            "silhouette_score": silhouette_avg,
            "n_clusters": len(clusters),
            "n_noise_points": len(noise_points)
        }

    def find_optimal_clusters(
        self,
        texts: List[str],
        max_clusters: int = 10
    ) -> Dict[str, Any]:
        """Find optimal number of clusters using elbow method"""
        embeddings = self.client.get_batch_embeddings(texts)
        embedding_array = np.array(embeddings)

        inertias = []
        silhouette_scores = []
        cluster_range = range(2, min(max_clusters + 1, len(texts)))

        for n_clusters in cluster_range:
            kmeans = KMeans(
                n_clusters=n_clusters,
                random_state=42,
                n_init=10
            )
            cluster_labels = kmeans.fit_predict(embedding_array)

            inertias.append(kmeans.inertia_)
            silhouette_scores.append(silhouette_score(embedding_array, cluster_labels))

        # Find optimal number based on silhouette score
        optimal_k = cluster_range[np.argmax(silhouette_scores)]

        return {
            "cluster_range": list(cluster_range),
            "inertias": inertias,
            "silhouette_scores": silhouette_scores,
            "optimal_clusters": optimal_k,
            "max_silhouette_score": max(silhouette_scores)
        }

    def visualize_clusters(
        self,
        texts: List[str],
        cluster_result: Dict[str, Any],
        method: str = "tsne",
        save_path: Optional[str] = None
    ) -> None:
        """Visualize clusters using dimensionality reduction"""
        embeddings = self.client.get_batch_embeddings(texts)
        embedding_array = np.array(embeddings)

        # Apply dimensionality reduction
        if method == "tsne":
            reducer = TSNE(
                n_components=2,
                random_state=42,
                perplexity=min(30, len(texts) - 1)
            )
        else:  # PCA
            reducer = PCA(n_components=2, random_state=42)

        embeddings_2d = reducer.fit_transform(embedding_array)
        cluster_labels = cluster_result["cluster_labels"]

        # Create interactive plot
        fig = px.scatter(
            x=embeddings_2d[:, 0],
            y=embeddings_2d[:, 1],
            color=cluster_labels,
            hover_data=[texts],
            title=f"Text Clusters Visualization ({method.upper()})",
            labels={
                "x": f"{method.upper()} 1",
                "y": f"{method.upper()} 2",
                "color": "Cluster"
            }
        )

        fig.update_traces(
            text=[f"Cluster {label}" for label in cluster_labels],
            marker=dict(size=8)
        )

        if save_path:
            fig.write_html(save_path)

        fig.show()

# 4. Semantic Search
class SemanticSearchEngine:
    """Advanced semantic search engine"""

    def __init__(self, embedding_client: EmbeddingClient):
        self.client = embedding_client
        self.documents = []
        self.embeddings = []

    def index_documents(self, documents: List[Dict[str, Any]]) -> None:
        """Index documents for semantic search"""
        self.documents = documents
        texts = [doc.get("text", "") for doc in documents]
        self.embeddings = self.client.get_batch_embeddings(texts)

    def search(
        self,
        query: str,
        top_k: int = 10,
        similarity_threshold: float = 0.5,
        filters: Optional[Dict[str, Any]] = None
    ) -> List[Dict[str, Any]]:
        """Search documents using semantic similarity"""
        if not self.documents or not self.embeddings:
            return []

        # Get query embedding
        query_embedding = self.client.get_embedding(query)
        query_array = np.array(query_embedding).reshape(1, -1)

        # Calculate similarities
        embedding_array = np.array(self.embeddings)
        similarities = cosine_similarity(query_array, embedding_array)[0]

        # Create search results
        results = []
        for i, similarity in enumerate(similarities):
            if similarity >= similarity_threshold:
                result = {
                    "document": self.documents[i],
                    "similarity": float(similarity),
                    "index": i
                }

                # Apply filters if provided
                if filters:
                    match = True
                    for key, value in filters.items():
                        if self.documents[i].get(key) != value:
                            match = False
                            break
                    if match:
                        results.append(result)
                else:
                    results.append(result)

        # Sort by similarity and return top_k
        results.sort(key=lambda x: x["similarity"], reverse=True)
        return results[:top_k]

    def hybrid_search(
        self,
        query: str,
        keyword_query: Optional[str] = None,
        top_k: int = 10,
        semantic_weight: float = 0.7,
        keyword_weight: float = 0.3
    ) -> List[Dict[str, Any]]:
        """Hybrid search combining semantic and keyword search"""
        # Semantic search
        semantic_results = self.search(query, top_k=top_k * 2)

        results = []
        for result in semantic_results:
            results.append({
                **result,
                "semantic_score": result["similarity"],
                "keyword_score": 0.0
            })

        # Keyword search if keyword query provided
        if keyword_query:
            from sklearn.feature_extraction.text import TfidfVectorizer

            # Prepare documents
            doc_texts = [doc.get("text", "") for doc in self.documents]

            # Create TF-IDF vectorizer
            vectorizer = TfidfVectorizer()
            tfidf_matrix = vectorizer.fit_transform(doc_texts)

            # Transform keyword query
            query_vector = vectorizer.transform([keyword_query])

            # Calculate keyword similarities
            keyword_similarities = cosine_similarity(query_vector, tfidf_matrix)[0]

            # Add keyword scores to results
            for i, score in enumerate(keyword_similarities):
                if score > 0:
                    existing_result = next(
                        (r for r in results if r["index"] == i),
                        None
                    )
                    if existing_result:
                        existing_result["keyword_score"] = float(score)
                    else:
                        results.append({
                            "document": self.documents[i],
                            "similarity": 0.0,
                            "semantic_score": 0.0,
                            "keyword_score": float(score),
                            "index": i
                        })

        # Combine scores
        for result in results:
            combined_score = (
                result["semantic_score"] * semantic_weight +
                result["keyword_score"] * keyword_weight
            )
            result["combined_score"] = combined_score

        # Sort by combined score and return top_k
        results.sort(key=lambda x: x["combined_score"], reverse=True)
        return results[:top_k]

# 5. Text Classification
class TextClassifier:
    """Zero-shot text classification using embeddings"""

    def __init__(self, embedding_client: EmbeddingClient):
        self.client = embedding_client

    def zero_shot_classify(
        self,
        text: str,
        labels: List[str],
        top_k: int = 3
    ) -> List[Dict[str, Any]]:
        """Classify text using zero-shot approach"""
        # Create label descriptions
        label_descriptions = [
            f"This text is about {label}."
            for label in labels
        ]

        # Get embeddings
        text_embedding = self.client.get_embedding(text)
        label_embeddings = self.client.get_batch_embeddings(label_descriptions)

        # Calculate similarities
        similarities = []
        for i, label_embedding in enumerate(label_embeddings):
            sim = self.client.embedding_client.embedding_cache.get(
                f"{text[:100]}_text-embedding-ada-002"
            )
            if sim is None:
                sim = self.client.embedding_client.embedding_cache.get(
                    f"{text[:100]}_text-embedding-ada-002"
                )

            similarity = self.client.embedding_client.embedding_cache.get(
                f"{label_descriptions[i][:100]}_text-embedding-ada-002"
            )
            if similarity is None:
                similarity = self.client.embedding_client.embedding_cache.get(
                    f"{label_descriptions[i][:100]}_text-embedding-ada-002"
                )

            # Calculate cosine similarity
            vec1_np = np.array(text_embedding)
            vec2_np = np.array(label_embedding)
            dot_product = np.dot(vec1_np, vec2_np)
            norm1 = np.linalg.norm(vec1_np)
            norm2 = np.linalg.norm(vec2_np)
            sim_score = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0

            similarities.append({
                "label": labels[i],
                "similarity": sim_score
            })

        # Sort by similarity and return top_k
        similarities.sort(key=lambda x: x["similarity"], reverse=True)
        return similarities[:top_k]

    def classify_with_examples(
        self,
        text: str,
        examples: Dict[str, List[str]]
    ) -> Dict[str, Any]:
        """Classify text using few-shot examples"""
        label_similarities = {}

        for label, example_texts in examples.items():
            # Calculate average similarity with examples
            total_similarity = 0
            for example_text in example_texts:
                similarity = self.client.embedding_client.embedding_cache.get(
                    f"{text[:100]}_text-embedding-ada-002"
                )
                if similarity is None:
                    similarity = self.client.embedding_client.embedding_cache.get(
                        f"{text[:100]}_text-embedding-ada-002"
                    )

                example_embedding = self.client.embedding_client.embedding_cache.get(
                    f"{example_text[:100]}_text-embedding-ada-002"
                )
                if example_embedding is None:
                    example_embedding = self.client.embedding_client.embedding_cache.get(
                        f"{example_text[:100]}_text-embedding-ada-002"
                    )

                vec1_np = np.array(similarity)
                vec2_np = np.array(example_embedding)
                dot_product = np.dot(vec1_np, vec2_np)
                norm1 = np.linalg.norm(vec1_np)
                norm2 = np.linalg.norm(vec2_np)
                sim_score = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0

                total_similarity += sim_score

            avg_similarity = total_similarity / len(example_texts)
            label_similarities[label] = avg_similarity

        # Find best matching label
        best_label = max(label_similarities, key=label_similarities.get)
        confidence = label_similarities[best_label]

        return {
            "predicted_label": best_label,
            "confidence": confidence,
            "all_similarities": label_similarities
        }

# 6. Advanced Applications
class EmbeddingApplications:
    """Advanced applications using text embeddings"""

    @staticmethod
    def find_anomalies(
        embedding_client: EmbeddingClient,
        texts: List[str],
        threshold_percentile: float = 95
    ) -> List[Dict[str, Any]]:
        """Find anomalous texts using embedding distances"""
        embeddings = embedding_client.get_batch_embeddings(texts)
        embedding_array = np.array(embeddings)

        # Calculate pairwise distances
        similarities = cosine_similarity(embedding_array)

        # Calculate average similarity for each text
        avg_similarities = []
        for i in range(len(texts)):
            # Remove self-similarity
            row_similarities = similarities[i, :i].tolist() + similarities[i, i+1:].tolist()
            avg_sim = np.mean(row_similarities) if row_similarities else 0
            avg_similarities.append(avg_sim)

        # Find threshold
        threshold = np.percentile(avg_similarities, threshold_percentile)

        # Identify anomalies
        anomalies = []
        for i, avg_sim in enumerate(avg_similarities):
            if avg_sim < threshold:
                anomalies.append({
                    "text": texts[i],
                    "index": i,
                    "avg_similarity": avg_sim,
                    "is_anomaly": True
                })

        return anomalies

    @staticmethod
    def generate_summaries(
        embedding_client: EmbeddingClient,
        long_text: str,
        num_sentences: int = 3
    ) -> List[str]:
        """Generate extractive summary using embeddings"""
        # Split text into sentences
        sentences = [s.strip() for s in long_text.split('.') if s.strip()]

        if len(sentences) <= num_sentences:
            return sentences

        # Get embeddings for all sentences
        sentence_embeddings = embedding_client.get_batch_embeddings(sentences)

        # Calculate sentence importance using centrality
        similarity_matrix = cosine_similarity(sentence_embeddings)
        centrality_scores = np.mean(similarity_matrix, axis=1)

        # Select top sentences
        top_indices = np.argsort(centrality_scores)[-num_sentences:]
        top_indices.sort()  # Maintain original order

        summary = [sentences[i] + '.' for i in top_indices]
        return summary

    @staticmethod
    def track_topic_evolution(
        embedding_client: EmbeddingClient,
        text_series: List[str],
        time_labels: List[str]
    ) -> Dict[str, Any]:
        """Track topic evolution over time"""
        embeddings = embedding_client.get_batch_embeddings(text_series)
        embedding_array = np.array(embeddings)

        # Calculate similarities between consecutive texts
        evolution_scores = []
        for i in range(1, len(embedding_array)):
            sim = cosine_similarity(
                embedding_array[i-1:i],
                embedding_array[i:i+1]
            )[0][0]
            evolution_scores.append({
                "from_time": time_labels[i-1],
                "to_time": time_labels[i],
                "similarity": float(sim),
                "change": 1 - float(sim)
            })

        return {
            "evolution_scores": evolution_scores,
            "average_change": np.mean([score["change"] for score in evolution_scores]),
            "max_change": max([score["change"] for score in evolution_scores]),
            "min_change": min([score["change"] for score in evolution_scores])
        }

# 7. Demonstration Functions
def demo_similarity_analysis():
    """Demonstrate similarity analysis"""
    print("=== Similarity Analysis Demo ===")

    config = EmbeddingConfig(api_key=os.getenv("OPENAI_API_KEY"))
    client = EmbeddingClient(config)
    analyzer = SimilarityAnalyzer(client)

    texts = [
        "The weather is beautiful today",
        "It's a lovely sunny day outside",
        "I love programming in Python",
        "Machine learning is fascinating",
        "AI will change the world",
        "Python is great for data science"
    ]

    # Find similar texts
    query = "I enjoy coding with Python"
    similar = analyzer.find_most_similar(query, texts, top_k=3)
    print(f"Most similar to '{query}':")
    for result in similar:
        print(f"- {result['document']} (similarity: {result['similarity']:.3f})")

    # Create similarity matrix
    analyzer.visualize_similarity_heatmap(
        texts,
        title="Text Similarity Heatmap",
        save_path="similarity_heatmap.png"
    )

def demo_text_clustering():
    """Demonstrate text clustering"""
    print("\n=== Text Clustering Demo ===")

    config = EmbeddingConfig(api_key=os.getenv("OPENAI_API_KEY"))
    client = EmbeddingClient(config)
    clusterer = TextClusterer(client)

    # Sample texts for clustering
    texts = [
        "Python is a popular programming language",
        "JavaScript is used for web development",
        "React is a frontend framework",
        "Django is a Python web framework",
        "Machine learning algorithms are complex",
        "Neural networks are deep learning models",
        "Data science involves statistics",
        "AI research is advancing rapidly",
        "Coffee is a popular beverage",
        "Tea has many health benefits",
        "Water is essential for life",
        "Fruits contain important vitamins"
    ]

    # Find optimal clusters
    optimal = clusterer.find_optimal_clusters(texts, max_clusters=6)
    print(f"Optimal number of clusters: {optimal['optimal_clusters']}")
    print(f"Max silhouette score: {optimal['max_silhouette_score']:.3f}")

    # Perform clustering
    kmeans_result = clusterer.kmeans_clustering(texts, optimal['optimal_clusters'])
    print(f"\nK-means clustering results:")
    for cluster_id, cluster_data in kmeans_result['clusters'].items():
        print(f"Cluster {cluster_id}: {len(cluster_data)} texts")
        for item in cluster_data:
            print(f"  - {item['text'][:50]}...")

    # Visualize clusters
    clusterer.visualize_clusters(
        texts,
        kmeans_result,
        method="tsne",
        save_path="cluster_visualization.html"
    )

def demo_semantic_search():
    """Demonstrate semantic search"""
    print("\n=== Semantic Search Demo ===")

    config = EmbeddingConfig(api_key=os.getenv("OPENAI_API_KEY"))
    client = EmbeddingClient(config)
    search_engine = SemanticSearchEngine(client)

    # Index documents
    documents = [
        {"text": "Python is a versatile programming language for web development", "category": "programming"},
        {"text": "React is a popular JavaScript library for building user interfaces", "category": "programming"},
        {"text": "Machine learning uses algorithms to find patterns in data", "category": "ai"},
        {"text": "Neural networks are a key component of deep learning", "category": "ai"},
        {"text": "Coffee beans are roasted to develop their flavor", "category": "food"},
        {"text": "Tea cultivation requires specific climate conditions", "category": "food"}
    ]

    search_engine.index_documents(documents)

    # Semantic search
    query = "AI and neural networks"
    results = search_engine.search(query, top_k=3)
    print(f"Semantic search for '{query}':")
    for result in results:
        print(f"- {result['document']['text'][:60]}... (similarity: {result['similarity']:.3f})")

    # Hybrid search
    hybrid_results = search_engine.hybrid_search(
        "programming languages",
        keyword_query="Python web",
        top_k=2
    )
    print(f"\nHybrid search results:")
    for result in hybrid_results:
        print(f"- {result['document']['text'][:60]}... (combined score: {result['combined_score']:.3f})")

def demo_advanced_applications():
    """Demonstrate advanced applications"""
    print("\n=== Advanced Applications Demo ===")

    config = EmbeddingConfig(api_key=os.getenv("OPENAI_API_KEY"))
    client = EmbeddingClient(config)

    # Anomaly detection
    texts = [
        "Normal business email about quarterly reports",
        "Standard project update with timelines",
        "Regular team meeting minutes",
        "URGENT: Your account will be suspended immediately",
        "Congratulations! You've won $1,000,000!!!",
        "Usual marketing campaign performance metrics"
    ]

    anomalies = EmbeddingApplications.find_anomalies(client, texts)
    print(f"Found {len(anomalies)} anomalies:")
    for anomaly in anomalies:
        print(f"- {anomaly['text'][:50]}... (avg similarity: {anomaly['avg_similarity']:.3f})")

    # Text summarization
    long_text = """
    Artificial intelligence has revolutionized many industries. Machine learning algorithms can now
    process vast amounts of data to identify patterns and make predictions. Deep learning, a subset
    of machine learning, uses neural networks with multiple layers to learn hierarchical representations.
    These technologies are being applied in healthcare, finance, transportation, and entertainment.
    As AI continues to evolve, it raises important questions about ethics, privacy, and the future
    of work. Researchers are working on developing more interpretable and fair AI systems.
    """

    summary = EmbeddingApplications.generate_summaries(client, long_text, num_sentences=3)
    print(f"\nGenerated summary:")
    for sentence in summary:
        print(f"- {sentence}")

# Main execution
if __name__ == "__main__":
    # Set environment variable
    os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")

    try:
        demo_similarity_analysis()
        demo_text_clustering()
        demo_semantic_search()
        demo_advanced_applications()
    except Exception as e:
        print(f"Demo error: {e}")
        print("Make sure to set your OPENAI_API_KEY environment variable")