🎯 empfohlene Sammlungen
Balanced sample collections from various categories for you to explore
OpenAI API Beispiele
Umfassende OpenAI API Beispiele einschließlich GPT-Modelle, DALL-E Bildgenerierung, Whisper Audioverarbeitung und Funktionsaufrufe
💻 OpenAI GPT API Integration python
🟡 intermediate
⭐⭐⭐
Vollständige GPT API Nutzung mit Streaming-Antworten, fortgeschrittenem Prompting und Kosteneffizienz
⏱️ 40 min
🏷️ openai, gpt, chat, streaming
Prerequisites:
Python, OpenAI API key, HTTP requests
# OpenAI GPT API Integration
# Comprehensive examples with chat completions, streaming, and advanced features
import os
import json
import time
import asyncio
from typing import List, Dict, Any, Optional, Generator
from dataclasses import dataclass
from openai import OpenAI, AsyncOpenAI
import tiktoken
from enum import Enum
# 1. Configuration and Initialization
@dataclass
class OpenAIConfig:
"""Configuration for OpenAI API"""
api_key: str
organization: Optional[str] = None
model: str = "gpt-3.5-turbo"
temperature: float = 0.7
max_tokens: int = 1000
top_p: float = 1.0
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
class OpenAIClient:
"""Enhanced OpenAI client with advanced features"""
def __init__(self, config: OpenAIConfig):
self.config = config
self.client = OpenAI(
api_key=config.api_key,
organization=config.organization
)
self.async_client = AsyncOpenAI(
api_key=config.api_key,
organization=config.organization
)
self.encoding = tiktoken.encoding_for_model(config.model)
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
return len(self.encoding.encode(text))
def estimate_cost(self, messages: List[Dict[str, str]]) -> float:
"""Estimate API call cost in USD"""
total_tokens = 0
# Count tokens in messages
for message in messages:
total_tokens += self.count_tokens(message.get('content', ''))
total_tokens += self.count_tokens(message.get('role', ''))
# Pricing (adjust based on current OpenAI pricing)
# GPT-3.5-turbo: $0.002 per 1K tokens
# GPT-4: $0.03 per 1K tokens (input) + $0.06 per 1K tokens (output)
if 'gpt-4' in self.config.model.lower():
input_cost = (total_tokens * 0.03) / 1000
output_cost = (self.config.max_tokens * 0.06) / 1000
else:
input_cost = (total_tokens * 0.002) / 1000
output_cost = (self.config.max_tokens * 0.002) / 1000
return input_cost + output_cost
# 2. Basic Chat Completions
def basic_chat_completion(client: OpenAIClient) -> Dict[str, Any]:
"""Basic chat completion example"""
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum computing in simple terms."}
]
try:
response = client.client.chat.completions.create(
model=client.config.model,
messages=messages,
temperature=client.config.temperature,
max_tokens=client.config.max_tokens
)
return {
"response": response.choices[0].message.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
},
"finish_reason": response.choices[0].finish_reason
}
except Exception as e:
return {"error": str(e)}
# 3. Streaming Responses
def stream_chat_completion(client: OpenAIClient, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
"""Streaming chat completion"""
try:
stream = client.client.chat.completions.create(
model=client.config.model,
messages=messages,
temperature=client.config.temperature,
max_tokens=client.config.max_tokens,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"Error: {str(e)}"
# 4. Advanced Prompting Strategies
class PromptingStrategies:
"""Advanced prompting techniques for better results"""
@staticmethod
def few_shot_prompting(client: OpenAIClient, examples: List[Dict[str, str]], query: str) -> str:
"""Few-shot prompting with examples"""
messages = [
{"role": "system", "content": "You are a helpful assistant that follows patterns from examples."}
]
for example in examples:
messages.append({"role": "user", "content": example["input"]})
messages.append({"role": "assistant", "content": example["output"]})
messages.append({"role": "user", "content": query})
response = client.client.chat.completions.create(
model=client.config.model,
messages=messages,
temperature=0.3, # Lower temperature for more consistent results
max_tokens=500
)
return response.choices[0].message.content
@staticmethod
def chain_of_thought_prompting(client: OpenAIClient, problem: str) -> str:
"""Chain of thought prompting for complex reasoning"""
prompt = f"""Solve this step by step, showing your reasoning process:
Problem: {problem}
Step 1: Understand the problem
Step 2: Break down into smaller steps
Step 3: Solve each step
Step 4: Combine results
Step 5: Final answer
Solution:"""
response = client.client.chat.completions.create(
model=client.config.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=1000
)
return response.choices[0].message.content
@staticmethod
def structured_output_prompting(client: OpenAIClient, prompt: str, output_schema: Dict) -> Dict:
"""Generate structured output using prompting"""
schema_prompt = f"""
{prompt}
Please respond in the following JSON format:
{json.dumps(output_schema, indent=2)}
Response:"""
response = client.client.chat.completions.create(
model=client.config.model,
messages=[{"role": "user", "content": schema_prompt}],
temperature=0.1,
max_tokens=1000
)
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return {"raw_response": response.choices[0].message.content, "error": "Failed to parse JSON"}
# 5. Conversation Management
class ConversationManager:
"""Manage conversations with context and memory"""
def __init__(self, client: OpenAIClient, max_history: int = 10):
self.client = client
self.max_history = max_history
self.conversations = {}
self.persona_instructions = {
"professional": "You are a professional, formal assistant who provides expert advice.",
"casual": "You are a friendly, casual assistant who speaks in a relaxed manner.",
"technical": "You are a technical assistant who provides detailed technical explanations.",
"creative": "You are a creative assistant who thinks outside the box and provides innovative ideas."
}
def create_conversation(self, conversation_id: str, persona: str = "professional") -> None:
"""Create a new conversation with specified persona"""
if conversation_id not in self.conversations:
self.conversations[conversation_id] = {
"messages": [
{"role": "system", "content": self.persona_instructions.get(persona, self.persona_instructions["professional"])}
],
"persona": persona,
"created_at": time.time()
}
def add_message(self, conversation_id: str, role: str, content: str) -> None:
"""Add message to conversation"""
if conversation_id not in self.conversations:
self.create_conversation(conversation_id)
conversation = self.conversations[conversation_id]
conversation["messages"].append({"role": role, "content": content})
# Limit conversation history
if len(conversation["messages"]) > self.max_history * 2 + 1: # +1 for system message
# Keep system message and last max_history*2 messages
system_msg = conversation["messages"][0]
recent_msgs = conversation["messages"][-(self.max_history * 2):]
conversation["messages"] = [system_msg] + recent_msgs
def get_response(self, conversation_id: str, user_message: str) -> Dict[str, Any]:
"""Get response from AI in conversation context"""
self.add_message(conversation_id, "user", user_message)
messages = self.conversations[conversation_id]["messages"]
try:
response = self.client.client.chat.completions.create(
model=self.client.config.model,
messages=messages,
temperature=self.client.config.temperature,
max_tokens=self.client.config.max_tokens
)
ai_response = response.choices[0].message.content
self.add_message(conversation_id, "assistant", ai_response)
return {
"response": ai_response,
"conversation_id": conversation_id,
"usage": response.usage._asdict() if response.usage else None
}
except Exception as e:
return {"error": str(e), "conversation_id": conversation_id}
def get_conversation_summary(self, conversation_id: str) -> str:
"""Get summary of conversation"""
if conversation_id not in self.conversations:
return "Conversation not found"
messages = self.conversations[conversation_id]["messages"][1:] # Exclude system message
conversation_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages[-10:]]) # Last 10 messages
summary_prompt = f"""Summarize this conversation in 2-3 sentences:
{conversation_text}
Summary:"""
response = self.client.client.chat.completions.create(
model=self.client.config.model,
messages=[{"role": "user", "content": summary_prompt}],
temperature=0.3,
max_tokens=150
)
return response.choices[0].message.content
# 6. Function Calling
class FunctionCalling:
"""Advanced function calling implementation"""
def __init__(self, client: OpenAIClient):
self.client = client
self.functions = {
"get_weather": self.get_weather,
"calculate_age": self.calculate_age,
"search_web": self.search_web,
"send_email": self.send_email
}
self.function_schemas = {
"get_weather": {
"name": "get_weather",
"description": "Get current weather information for a city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "The city name"},
"units": {"type": "string", "enum": ["metric", "imperial"], "default": "metric"}
},
"required": ["city"]
}
},
"calculate_age": {
"name": "calculate_age",
"description": "Calculate age from birth date",
"parameters": {
"type": "object",
"properties": {
"birth_date": {"type": "string", "description": "Birth date in YYYY-MM-DD format"},
"current_date": {"type": "string", "description": "Current date in YYYY-MM-DD format (optional)"}
},
"required": ["birth_date"]
}
},
"search_web": {
"name": "search_web",
"description": "Search the web for information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"max_results": {"type": "integer", "default": 5}
},
"required": ["query"]
}
},
"send_email": {
"name": "send_email",
"description": "Send an email",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string", "description": "Recipient email"},
"subject": {"type": "string", "description": "Email subject"},
"body": {"type": "string", "description": "Email body"}
},
"required": ["to", "subject", "body"]
}
}
}
def get_weather(self, city: str, units: str = "metric") -> Dict[str, Any]:
"""Simulate weather API call"""
# In production, integrate with real weather API
weather_data = {
"city": city,
"temperature": 22 if units == "metric" else 72,
"units": units,
"condition": "Partly cloudy",
"humidity": 65
}
return weather_data
def calculate_age(self, birth_date: str, current_date: Optional[str] = None) -> Dict[str, Any]:
"""Calculate age from birth date"""
from datetime import datetime
birth = datetime.strptime(birth_date, "%Y-%m-%d")
current = datetime.strptime(current_date, "%Y-%m-%d") if current_date else datetime.now()
age = current.year - birth.year - ((current.month, current.day) < (birth.month, birth.day))
return {
"birth_date": birth_date,
"current_date": current_date if current_date else datetime.now().strftime("%Y-%m-%d"),
"age": age,
"age_in_months": age * 12
}
def search_web(self, query: str, max_results: int = 5) -> Dict[str, Any]:
"""Simulate web search"""
# In production, integrate with real search API
return {
"query": query,
"results": [
{
"title": f"Result for {query}",
"url": f"https://example.com/{query.replace(' ', '-')}",
"snippet": f"This is a search result for {query}"
}
] * min(max_results, 3)
}
def send_email(self, to: str, subject: str, body: str) -> Dict[str, Any]:
"""Simulate sending email"""
return {
"to": to,
"subject": subject,
"body_length": len(body),
"status": "queued",
"sent_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
def process_with_functions(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""Process messages with function calling"""
try:
response = self.client.client.chat.completions.create(
model=self.client.config.model,
messages=messages,
functions=list(self.function_schemas.values()),
function_call="auto",
temperature=0.1
)
message = response.choices[0].message
if message.function_call:
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
if function_name in self.functions:
function_result = self.functions[function_name](**function_args)
# Continue conversation with function result
messages.append({
"role": "assistant",
"content": None,
"function_call": {
"name": function_name,
"arguments": json.dumps(function_args)
}
})
messages.append({
"role": "function",
"name": function_name,
"content": json.dumps(function_result)
})
# Get final response
final_response = self.client.client.chat.completions.create(
model=self.client.config.model,
messages=messages,
temperature=0.1
)
return {
"function_name": function_name,
"function_args": function_args,
"function_result": function_result,
"response": final_response.choices[0].message.content
}
else:
return {"error": f"Unknown function: {function_name}"}
else:
return {
"response": message.content,
"function_called": False
}
except Exception as e:
return {"error": str(e)}
# 7. Batch Processing and Optimization
class BatchProcessor:
"""Efficient batch processing for multiple requests"""
def __init__(self, client: OpenAIClient):
self.client = client
self.max_concurrent_requests = 10
async def process_batch_async(self, prompts: List[str]) -> List[Dict[str, Any]]:
"""Process multiple prompts concurrently"""
semaphore = asyncio.Semaphore(self.max_concurrent_requests)
async def process_single_prompt(prompt: str, index: int) -> Dict[str, Any]:
async with semaphore:
try:
response = await self.client.async_client.chat.completions.create(
model=self.client.config.model,
messages=[{"role": "user", "content": prompt}],
temperature=self.client.config.temperature,
max_tokens=self.client.config.max_tokens
)
return {
"index": index,
"prompt": prompt,
"response": response.choices[0].message.content,
"usage": response.usage._asdict() if response.usage else None
}
except Exception as e:
return {
"index": index,
"prompt": prompt,
"error": str(e)
}
tasks = [process_single_prompt(prompt, i) for i, prompt in enumerate(prompts)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Sort results by original index
sorted_results = sorted(results, key=lambda x: x.get("index", 0))
return sorted_results
def rate_limited_request(self, messages: List[Dict[str, str]], requests_per_minute: int = 60) -> Dict[str, Any]:
"""Make rate-limited API requests"""
start_time = time.time()
request_count = 0
try:
# Check rate limit
if request_count >= requests_per_minute:
elapsed_time = time.time() - start_time
if elapsed_time < 60:
sleep_time = 60 - elapsed_time
time.sleep(sleep_time)
start_time = time.time()
request_count = 0
response = self.client.client.chat.completions.create(
model=self.client.config.model,
messages=messages,
temperature=self.client.config.temperature,
max_tokens=self.client.config.max_tokens
)
request_count += 1
return {
"response": response.choices[0].message.content,
"usage": response.usage._asdict() if response.usage else None
}
except Exception as e:
return {"error": str(e)}
# 8. Error Handling and Retry Logic
import random
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class RobustOpenAIClient:
"""OpenAI client with robust error handling and retry logic"""
def __init__(self, client: OpenAIClient):
self.client = client
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((ConnectionError, TimeoutError))
)
def robust_completion(self, messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""Robust chat completion with retry logic"""
try:
response = self.client.client.chat.completions.create(
model=self.client.config.model,
messages=messages,
temperature=self.client.config.temperature,
max_tokens=self.client.config.max_tokens,
timeout=30 # 30 second timeout
)
return {
"response": response.choices[0].message.content,
"usage": response.usage._asdict() if response.usage else None,
"finish_reason": response.choices[0].finish_reason
}
except Exception as e:
print(f"Attempt failed: {str(e)}")
raise
def exponential_backoff_request(self, messages: List[Dict[str, str]], max_retries: int = 5) -> Dict[str, Any]:
"""Request with exponential backoff"""
for attempt in range(max_retries):
try:
return self.client.client.chat.completions.create(
model=self.client.config.model,
messages=messages,
temperature=self.client.config.temperature,
max_tokens=self.client.config.max_tokens
)
except Exception as e:
if attempt == max_retries - 1:
return {"error": f"Max retries exceeded: {str(e)}"}
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
# 9. Usage Analytics and Monitoring
class OpenAIAnalytics:
"""Track and analyze OpenAI API usage"""
def __init__(self):
self.usage_data = []
self.error_data = []
def track_usage(self, request_data: Dict[str, Any], response_data: Dict[str, Any], start_time: float) -> None:
"""Track API usage"""
usage_record = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"request_data": request_data,
"response_data": response_data,
"duration": time.time() - start_time,
"success": "error" not in response_data
}
self.usage_data.append(usage_record)
def track_error(self, error_data: Dict[str, Any]) -> None:
"""Track API errors"""
error_record = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"error": error_data
}
self.error_data.append(error_record)
def get_usage_stats(self) -> Dict[str, Any]:
"""Get usage statistics"""
if not self.usage_data:
return {"message": "No usage data available"}
total_requests = len(self.usage_data)
successful_requests = sum(1 for record in self.usage_data if record["success"])
total_duration = sum(record["duration"] for record in self.usage_data)
return {
"total_requests": total_requests,
"successful_requests": successful_requests,
"success_rate": successful_requests / total_requests * 100,
"average_response_time": total_duration / total_requests,
"total_errors": len(self.error_data),
"usage_by_hour": self._group_usage_by_hour()
}
def _group_usage_by_hour(self) -> Dict[str, int]:
"""Group usage by hour"""
hourly_usage = {}
for record in self.usage_data:
hour = record["timestamp"].split(":")[0]
hourly_usage[hour] = hourly_usage.get(hour, 0) + 1
return hourly_usage
def export_data(self, filename: str) -> None:
"""Export usage and error data to file"""
export_data = {
"usage_data": self.usage_data,
"error_data": self.error_data,
"statistics": self.get_usage_stats()
}
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
# 10. Demonstration Functions
def demo_basic_usage():
"""Demonstrate basic OpenAI API usage"""
print("=== Basic OpenAI API Usage Demo ===")
config = OpenAIConfig(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-3.5-turbo",
temperature=0.7
)
client = OpenAIClient(config)
# Basic chat completion
result = basic_chat_completion(client)
print(f"Chat Completion Result: {result}")
# Token counting and cost estimation
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain machine learning in simple terms."}
]
token_count = client.count_tokens(messages[1]["content"])
estimated_cost = client.estimate_cost(messages)
print(f"Token count: {token_count}")
print(f"Estimated cost: ${estimated_cost:.6f}")
def demo_streaming():
"""Demonstrate streaming responses"""
print("\n=== Streaming Response Demo ===")
config = OpenAIConfig(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-3.5-turbo"
)
client = OpenAIClient(config)
messages = [{"role": "user", "content": "Write a short poem about programming"}]
print("Streaming response:")
for chunk in stream_chat_completion(client, messages):
print(chunk, end='', flush=True)
print()
def demo_prompting_strategies():
"""Demonstrate advanced prompting strategies"""
print("\n=== Prompting Strategies Demo ===")
config = OpenAIConfig(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-3.5-turbo",
temperature=0.3
)
client = OpenAIClient(config)
strategies = PromptingStrategies()
# Few-shot prompting
examples = [
{"input": "2 + 2", "output": "4"},
{"input": "5 * 3", "output": "15"},
{"input": "10 / 2", "output": "5"}
]
result = strategies.few_shot_prompting(client, examples, "7 + 8")
print(f"Few-shot result: {result}")
# Chain of thought
problem = "If a train travels 300 km in 3 hours, what is its average speed?"
cot_result = strategies.chain_of_thought_prompting(client, problem)
print(f"Chain of thought result: {cot_result}")
def demo_function_calling():
"""Demonstrate function calling"""
print("\n=== Function Calling Demo ===")
config = OpenAIConfig(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-3.5-turbo"
)
client = OpenAIClient(config)
function_calling = FunctionCalling(client)
messages = [
{"role": "user", "content": "What's the weather like in Tokyo and calculate my age if I was born in 1990-05-15"}
]
result = function_calling.process_with_functions(messages)
print(f"Function calling result: {json.dumps(result, indent=2)}")
def demo_conversation_management():
"""Demonstrate conversation management"""
print("\n=== Conversation Management Demo ===")
config = OpenAIConfig(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-3.5-turbo"
)
client = OpenAIClient(config)
conversation_manager = ConversationManager(client)
# Create conversation
conv_id = "demo_conv"
conversation_manager.create_conversation(conv_id, persona="casual")
# Simulate conversation
messages = [
"Hi! Can you help me learn Python?",
"What should I start with?",
"Thanks! Can you recommend any good resources?"
]
for msg in messages:
response = conversation_manager.get_response(conv_id, msg)
print(f"User: {msg}")
print(f"Assistant: {response['response'][:100]}...")
# Get summary
summary = conversation_manager.get_conversation_summary(conv_id)
print(f"\nConversation Summary: {summary}")
# Main execution
if __name__ == "__main__":
# Set environment variable
os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")
try:
demo_basic_usage()
demo_streaming()
demo_prompting_strategies()
demo_function_calling()
demo_conversation_management()
except Exception as e:
print(f"Demo error: {e}")
print("Make sure to set your OPENAI_API_KEY environment variable")
💻 OpenAI DALL-E und Whisper API python
🔴 complex
⭐⭐⭐⭐
Bildgenerierung mit DALL-E und Audioverarbeitung mit Whisper inklusive Batch-Operationen
⏱️ 45 min
🏷️ openai, dall-e, whisper, image, audio
Prerequisites:
Python, OpenAI API key, PIL/Pillow, requests
# OpenAI DALL-E and Whisper API Integration
# Image generation and audio processing with advanced features
import os
import json
import time
import requests
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass
from PIL import Image, ImageDraw, ImageFont
import io
import base64
from pathlib import Path
import asyncio
from openai import OpenAI
import soundfile as sf
import numpy as np
from datetime import datetime
# 1. Configuration and Client Setup
@dataclass
class DalleConfig:
"""Configuration for DALL-E API"""
api_key: str
organization: Optional[str] = None
model: str = "dall-e-3"
size: str = "1024x1024" # 1024x1024, 1792x1024, or 1024x1792
quality: str = "standard" # standard or hd
style: str = "vivid" # vivid or natural
max_retries: int = 3
timeout: int = 60
@dataclass
class WhisperConfig:
"""Configuration for Whisper API"""
api_key: str
organization: Optional[str] = None
model: str = "whisper-1"
response_format: str = "json" # json, text, srt, verbose_json, or vtt
language: Optional[str] = None
temperature: float = 0.0
timeout: int = 120
class OpenAIMediaClient:
"""Combined client for DALL-E and Whisper APIs"""
def __init__(self, dalle_config: DalleConfig, whisper_config: WhisperConfig):
self.dalle_config = dalle_config
self.whisper_config = whisper_config
self.client = OpenAI(
api_key=dalle_config.api_key,
organization=dalle_config.organization
)
self.supported_image_sizes = {
"dall-e-2": ["256x256", "512x512", "1024x1024"],
"dall-e-3": ["1024x1024", "1792x1024", "1024x1792"]
}
def validate_dalle_params(self, prompt: str, size: Optional[str] = None) -> Tuple[str, str]:
"""Validate DALL-E parameters"""
if not prompt or len(prompt.strip()) == 0:
raise ValueError("Prompt cannot be empty")
if len(prompt) > 4000: # DALL-E has a 4000 character limit
raise ValueError("Prompt too long (max 4000 characters)")
actual_size = size or self.dalle_config.size
if actual_size not in self.supported_image_sizes.get(self.dalle_config.model, []):
raise ValueError(f"Invalid size {actual_size} for model {self.dalle_config.model}")
return prompt.strip(), actual_size
# 2. DALL-E Image Generation
class DALLEGenerator:
"""Advanced DALL-E image generation with features"""
def __init__(self, client: OpenAIMediaClient):
self.client = client
self.generation_history = []
def generate_image(
self,
prompt: str,
size: Optional[str] = None,
quality: Optional[str] = None,
style: Optional[str] = None,
n: int = 1,
save_path: Optional[str] = None
) -> Dict[str, Any]:
"""Generate images using DALL-E"""
prompt, actual_size = self.client.validate_dalle_params(prompt, size)
generation_params = {
"model": self.client.dalle_config.model,
"prompt": prompt,
"size": actual_size,
"n": min(n, 10) # DALL-E supports max 10 images
}
# Add quality parameter for DALL-E 3
if self.client.dalle_config.model == "dall-e-3":
generation_params["quality"] = quality or self.client.dalle_config.quality
generation_params["style"] = style or self.client.dalle_config.style
try:
start_time = time.time()
response = self.client.client.images.create(**generation_params)
duration = time.time() - start_time
# Process response
images = []
for i, image_data in enumerate(response.data):
image_info = {
"url": image_data.url,
"revised_prompt": image_data.revised_prompt if hasattr(image_data, 'revised_prompt') else None,
"index": i,
"size": actual_size,
"quality": generation_params.get("quality"),
"style": generation_params.get("style")
}
# Download and save image if path provided
if save_path:
image_path = self._download_image(
image_data.url,
f"{save_path}_image_{i}_{int(time.time())}.png"
)
image_info["local_path"] = image_path
images.append(image_info)
# Track generation
generation_record = {
"timestamp": datetime.now().isoformat(),
"prompt": prompt,
"params": generation_params,
"duration": duration,
"image_count": len(images),
"success": True
}
self.generation_history.append(generation_record)
return {
"images": images,
"prompt": prompt,
"params": generation_params,
"duration": duration,
"success": True
}
except Exception as e:
error_record = {
"timestamp": datetime.now().isoformat(),
"prompt": prompt,
"error": str(e),
"success": False
}
self.generation_history.append(error_record)
return {
"prompt": prompt,
"error": str(e),
"success": False
}
def _download_image(self, url: str, filename: str) -> str:
"""Download image from URL"""
response = requests.get(url)
response.raise_for_status()
# Ensure directory exists
Path(filename).parent.mkdir(parents=True, exist_ok=True)
with open(filename, 'wb') as f:
f.write(response.content)
return filename
def generate_variations(
self,
image_path: str,
size: Optional[str] = None,
n: int = 1,
save_path: Optional[str] = None
) -> Dict[str, Any]:
"""Generate variations from an existing image"""
if not os.path.exists(image_path):
raise ValueError(f"Image file not found: {image_path}")
try:
with open(image_path, 'rb') as image_file:
response = self.client.client.images.create_variations(
image=image_file,
size=size or self.client.dalle_config.size,
n=min(n, 10)
)
# Process variations
variations = []
for i, image_data in enumerate(response.data):
variation_info = {
"url": image_data.url,
"index": i,
"source_image": image_path
}
if save_path:
variation_path = self._download_image(
image_data.url,
f"{save_path}_variation_{i}_{int(time.time())}.png"
)
variation_info["local_path"] = variation_path
variations.append(variation_info)
return {
"variations": variations,
"source_image": image_path,
"success": True
}
except Exception as e:
return {
"source_image": image_path,
"error": str(e),
"success": False
}
def edit_image(
self,
image_path: str,
mask_path: Optional[str] = None,
prompt: str,
size: Optional[str] = None,
n: int = 1,
save_path: Optional[str] = None
) -> Dict[str, Any]:
"""Edit an image using a mask"""
if not os.path.exists(image_path):
raise ValueError(f"Image file not found: {image_path}")
try:
with open(image_path, 'rb') as image_file:
files = {"image": image_file}
if mask_path and os.path.exists(mask_path):
with open(mask_path, 'rb') as mask_file:
files["mask"] = mask_file
else:
mask_path = None # If mask doesn't exist, don't use it
response = self.client.client.images.edit(
image=image_file,
mask=mask_file if mask_path else None,
prompt=prompt,
size=size or self.client.dalle_config.size,
n=min(n, 10)
)
# Process edited images
edits = []
for i, image_data in enumerate(response.data):
edit_info = {
"url": image_data.url,
"index": i,
"prompt": prompt,
"source_image": image_path,
"mask_used": mask_path is not None
}
if save_path:
edit_path = self._download_image(
image_data.url,
f"{save_path}_edit_{i}_{int(time.time())}.png"
)
edit_info["local_path"] = edit_path
edits.append(edit_info)
return {
"edits": edits,
"prompt": prompt,
"source_image": image_path,
"mask_used": mask_path is not None,
"success": True
}
except Exception as e:
return {
"prompt": prompt,
"source_image": image_path,
"error": str(e),
"success": False
}
def create_image_grid(
self,
prompts: List[str],
grid_size: Tuple[int, int] = (2, 2),
save_path: Optional[str] = None
) -> Dict[str, Any]:
"""Generate and combine multiple images into a grid"""
if len(prompts) != grid_size[0] * grid_size[1]:
raise ValueError(f"Number of prompts ({len(prompts)}) doesn't match grid size ({grid_size[0]}x{grid_size[1]})")
try:
# Generate individual images
generated_images = []
for i, prompt in enumerate(prompts):
result = self.generate_image(prompt, n=1)
if result["success"] and result["images"]:
generated_images.append(result["images"][0]["url"])
else:
raise ValueError(f"Failed to generate image for prompt {i+1}")
# Download and combine images
image_objects = []
for url in generated_images:
response = requests.get(url)
img = Image.open(io.BytesIO(response.content))
image_objects.append(img)
# Create grid
width, height = image_objects[0].size
grid_image = Image.new('RGB', (width * grid_size[1], height * grid_size[0]))
draw = ImageDraw.Draw(grid_image)
for i, img in enumerate(image_objects):
row = i // grid_size[1]
col = i % grid_size[1]
grid_image.paste(img, (col * width, row * height))
# Add border
draw.rectangle(
[col * width, row * height, (col + 1) * width, (row + 1) * height],
outline='black',
width=2
)
# Save grid
if save_path:
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
grid_image.save(save_path)
local_path = save_path
else:
local_path = f"grid_{int(time.time())}.png"
grid_image.save(local_path)
return {
"grid_path": local_path,
"prompts": prompts,
"grid_size": grid_size,
"success": True
}
except Exception as e:
return {
"prompts": prompts,
"error": str(e),
"success": False
}
# 3. Whisper Audio Processing
class WhisperProcessor:
"""Advanced Whisper audio processing with features"""
def __init__(self, client: OpenAIMediaClient):
self.client = client
self.transcription_history = []
self.supported_formats = ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"]
def transcribe_audio(
self,
audio_path: str,
language: Optional[str] = None,
response_format: Optional[str] = None,
temperature: Optional[float] = None,
save_transcript: Optional[str] = None
) -> Dict[str, Any]:
"""Transcribe audio file using Whisper"""
if not os.path.exists(audio_path):
raise ValueError(f"Audio file not found: {audio_path}")
# Check file format
file_ext = Path(audio_path).suffix.lower().lstrip('.')
if file_ext not in self.supported_formats:
raise ValueError(f"Unsupported audio format: {file_ext}")
try:
start_time = time.time()
with open(audio_path, 'rb') as audio_file:
transcription_params = {
"model": self.client.whisper_config.model,
"file": audio_file,
"response_format": response_format or self.client.whisper_config.response_format
}
if language:
transcription_params["language"] = language
if temperature is not None:
transcription_params["temperature"] = temperature
response = self.client.client.audio.transcriptions.create(**transcription_params)
duration = time.time() - start_time
# Process response based on format
if response_format == "json" or response_format is None:
result = {
"text": response.text,
"language": getattr(response, 'language', None),
"duration": getattr(response, 'duration', None)
}
elif response_format == "verbose_json":
result = {
"text": response.text,
"language": response.language,
"duration": response.duration,
"words": response.words,
"segments": response.segments
}
else: # text, srt, vtt
result = {"text": response}
# Add metadata
result.update({
"audio_path": audio_path,
"params": transcription_params,
"processing_time": duration,
"timestamp": datetime.now().isoformat()
})
# Save transcript if path provided
if save_transcript:
self._save_transcript(result["text"], save_transcript)
result["transcript_path"] = save_transcript
# Track transcription
transcription_record = {
"timestamp": datetime.now().isoformat(),
"audio_path": audio_path,
"duration": duration,
"success": True,
"language": result.get("language")
}
self.transcription_history.append(transcription_record)
result["success"] = True
return result
except Exception as e:
error_record = {
"timestamp": datetime.now().isoformat(),
"audio_path": audio_path,
"error": str(e),
"success": False
}
self.transcription_history.append(error_record)
return {
"audio_path": audio_path,
"error": str(e),
"success": False
}
def translate_audio(
self,
audio_path: str,
response_format: Optional[str] = None,
temperature: Optional[float] = None,
save_transcript: Optional[str] = None
) -> Dict[str, Any]:
"""Translate audio to English using Whisper"""
if not os.path.exists(audio_path):
raise ValueError(f"Audio file not found: {audio_path}")
try:
start_time = time.time()
with open(audio_path, 'rb') as audio_file:
translation_params = {
"model": self.client.whisper_config.model,
"file": audio_file,
"response_format": response_format or self.client.whisper_config.response_format
}
if temperature is not None:
translation_params["temperature"] = temperature
response = self.client.client.audio.translations.create(**translation_params)
duration = time.time() - start_time
# Process response
if response_format == "json" or response_format is None:
result = {
"text": response.text,
"duration": getattr(response, 'duration', None)
}
else: # text, srt, vtt
result = {"text": response}
# Add metadata
result.update({
"audio_path": audio_path,
"operation": "translation",
"params": translation_params,
"processing_time": duration,
"timestamp": datetime.now().isoformat()
})
# Save translation if path provided
if save_transcript:
self._save_transcript(result["text"], save_transcript)
result["translation_path"] = save_transcript
result["success"] = True
return result
except Exception as e:
return {
"audio_path": audio_path,
"operation": "translation",
"error": str(e),
"success": False
}
def _save_transcript(self, text: str, file_path: str) -> None:
"""Save transcript to file"""
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(text)
def batch_transcribe(
self,
audio_files: List[str],
language: Optional[str] = None,
max_concurrent: int = 3
) -> List[Dict[str, Any]]:
"""Transcribe multiple audio files concurrently"""
semaphore = asyncio.Semaphore(max_concurrent)
async def transcribe_single(audio_path: str) -> Dict[str, Any]:
async with semaphore:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self.transcribe_audio,
audio_path,
language
)
async def run_batch():
tasks = [transcribe_single(audio_path) for audio_path in audio_files]
return await asyncio.gather(*tasks, return_exceptions=True)
return asyncio.run(run_batch())
def process_with_timestamps(
self,
audio_path: str,
language: Optional[str] = None
) -> Dict[str, Any]:
"""Process audio with detailed timestamp information"""
result = self.transcribe_audio(
audio_path,
language=language,
response_format="verbose_json"
)
if result["success"] and "segments" in result:
# Process segments for better analysis
segments = result["segments"]
word_count = len(result["words"]) if "words" in result else 0
avg_words_per_second = word_count / result["duration"] if result["duration"] else 0
result["analysis"] = {
"segment_count": len(segments),
"word_count": word_count,
"average_words_per_second": avg_words_per_second,
"first_segment_time": segments[0]["start"] if segments else 0,
"last_segment_time": segments[-1]["end"] if segments else 0
}
return result
# 4. Advanced Image Processing
class ImageProcessor:
"""Advanced image processing utilities"""
@staticmethod
def create_mask_from_image(
image_path: str,
mask_areas: List[Tuple[int, int, int, int]], # [(x1, y1, x2, y2), ...]
output_path: str,
mask_color: Tuple[int, int, int] = (255, 255, 255)
) -> str:
"""Create a mask image from specified areas"""
img = Image.open(image_path)
mask = Image.new('RGB', img.size, (0, 0, 0))
draw = ImageDraw.Draw(mask)
for x1, y1, x2, y2 in mask_areas:
draw.rectangle([x1, y1, x2, y2], fill=mask_color)
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
mask.save(output_path)
return output_path
@staticmethod
def resize_image_with_aspect_ratio(
image_path: str,
target_size: Tuple[int, int],
output_path: str
) -> str:
"""Resize image maintaining aspect ratio"""
img = Image.open(image_path)
target_width, target_height = target_size
# Calculate new size maintaining aspect ratio
original_width, original_height = img.size
aspect_ratio = original_width / original_height
if aspect_ratio > target_width / target_height:
new_width = target_width
new_height = int(target_width / aspect_ratio)
else:
new_height = target_height
new_width = int(target_height * aspect_ratio)
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
resized_img.save(output_path)
return output_path
@staticmethod
def add_watermark(
image_path: str,
watermark_text: str,
output_path: str,
position: str = "bottom-right"
) -> str:
"""Add text watermark to image"""
img = Image.open(image_path)
draw = ImageDraw.Draw(img)
# Try to use a font, fallback to default
try:
font = ImageFont.truetype("arial.ttf", 20)
except:
font = ImageFont.load_default()
# Calculate text size and position
bbox = draw.textbbox((0, 0), watermark_text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
img_width, img_height = img.size
margin = 20
if position == "bottom-right":
x, y = img_width - text_width - margin, img_height - text_height - margin
elif position == "bottom-left":
x, y = margin, img_height - text_height - margin
elif position == "top-right":
x, y = img_width - text_width - margin, margin
else: # top-left
x, y = margin, margin
# Add semi-transparent watermark
overlay = Image.new('RGBA', img.size, (0, 0, 0, 0))
overlay_draw = ImageDraw.Draw(overlay)
overlay_draw.text((x, y), watermark_text, font=font, fill=(255, 255, 255, 128))
# Combine with original image
watermarked = Image.alpha_composite(img.convert('RGBA'), overlay)
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
watermarked.save(output_path)
return output_path
# 5. Content Moderation and Safety
class ContentModerator:
"""Content moderation for generated images and transcriptions"""
@staticmethod
def check_prompt_safety(prompt: str) -> Dict[str, Any]:
"""Check if prompt contains potentially unsafe content"""
# This is a basic implementation
# In production, use proper content moderation APIs
unsafe_keywords = [
"violent", "harmful", "illegal", "adult", "nude", "naked",
"blood", "weapon", "gun", "knife", "death", "kill"
]
prompt_lower = prompt.lower()
found_keywords = [word for word in unsafe_keywords if word in prompt_lower]
return {
"safe": len(found_keywords) == 0,
"flagged_keywords": found_keywords,
"confidence": max(0, 1 - len(found_keywords) * 0.2)
}
@staticmethod
def moderate_transcription(transcription: str) -> Dict[str, Any]:
"""Moderate transcription content"""
# Basic implementation
personal_info_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', # Credit card
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' # Email
]
import re
flagged_items = []
for pattern in personal_info_patterns:
matches = re.findall(pattern, transcription)
flagged_items.extend(matches)
return {
"contains_personal_info": len(flagged_items) > 0,
"flagged_items": flagged_items,
"recommendation": "review" if flagged_items else "safe"
}
# 6. Usage Analytics
class MediaAnalytics:
"""Track and analyze media generation and processing"""
def __init__(self):
self.generation_data = []
self.transcription_data = []
self.error_data = []
def track_generation(self, data: Dict[str, Any]) -> None:
"""Track image generation data"""
self.generation_data.append(data)
def track_transcription(self, data: Dict[str, Any]) -> None:
"""Track transcription data"""
self.transcription_data.append(data)
def track_error(self, data: Dict[str, Any]) -> None:
"""Track error data"""
self.error_data.append(data)
def get_statistics(self) -> Dict[str, Any]:
"""Get comprehensive statistics"""
return {
"image_generation": {
"total_requests": len(self.generation_data),
"successful_requests": sum(1 for d in self.generation_data if d.get("success", False)),
"total_images": sum(d.get("image_count", 1) for d in self.generation_data),
"average_duration": sum(d.get("duration", 0) for d in self.generation_data) / max(len(self.generation_data), 1),
"popular_sizes": self._get_popular_sizes(),
"popular_styles": self._get_popular_styles()
},
"transcription": {
"total_requests": len(self.transcription_data),
"successful_requests": sum(1 for d in self.transcription_data if d.get("success", False)),
"total_duration": sum(d.get("processing_time", 0) for d in self.transcription_data),
"languages": self._get_language_distribution()
},
"errors": {
"total_errors": len(self.error_data),
"error_types": self._get_error_types()
}
}
def _get_popular_sizes(self) -> Dict[str, int]:
"""Get most popular image sizes"""
sizes = {}
for d in self.generation_data:
size = d.get("params", {}).get("size", "unknown")
sizes[size] = sizes.get(size, 0) + 1
return sizes
def _get_popular_styles(self) -> Dict[str, int]:
"""Get most popular image styles"""
styles = {}
for d in self.generation_data:
style = d.get("params", {}).get("style", "unknown")
styles[style] = styles.get(style, 0) + 1
return styles
def _get_language_distribution(self) -> Dict[str, int]:
"""Get language distribution in transcriptions"""
languages = {}
for d in self.transcription_data:
lang = d.get("language", "unknown")
languages[lang] = languages.get(lang, 0) + 1
return languages
def _get_error_types(self) -> Dict[str, int]:
"""Get error type distribution"""
error_types = {}
for d in self.error_data:
error = d.get("error", "unknown error")
error_types[error] = error_types.get(error, 0) + 1
return error_types
# 7. Demonstration Functions
def demo_dalle_generation():
"""Demonstrate DALL-E image generation"""
print("=== DALL-E Image Generation Demo ===")
dalle_config = DalleConfig(
api_key=os.getenv("OPENAI_API_KEY"),
model="dall-e-3",
size="1024x1024",
quality="standard",
style="vivid"
)
whisper_config = WhisperConfig(api_key=os.getenv("OPENAI_API_KEY"))
client = OpenAIMediaClient(dalle_config, whisper_config)
dalle_gen = DALLEGenerator(client)
# Basic generation
result = dalle_gen.generate_image(
"A futuristic city with flying cars and neon lights, cyberpunk style",
save_path="./generated_images/cyberpunk_city"
)
print(f"Generation result: {result['success']}")
if result['success']:
print(f"Generated {len(result['images'])} images")
for img in result['images']:
print(f"- Image {img['index']}: {img.get('local_path', img['url'])}")
def demo_whisper_transcription():
"""Demonstrate Whisper transcription"""
print("\n=== Whisper Transcription Demo ===")
dalle_config = DalleConfig(api_key=os.getenv("OPENAI_API_KEY"))
whisper_config = WhisperConfig(api_key=os.getenv("OPENAI_API_KEY"))
client = OpenAIMediaClient(dalle_config, whisper_config)
whisper_proc = WhisperProcessor(client)
# Note: This requires an actual audio file
audio_path = "sample_audio.mp3" # Replace with actual audio file path
if os.path.exists(audio_path):
result = whisper_proc.transcribe_audio(
audio_path,
language="en",
save_transcript="./transcriptions/sample_transcript.txt"
)
print(f"Transcription result: {result['success']}")
if result['success']:
print(f"Transcribed text: {result['text'][:200]}...")
else:
print(f"Audio file not found: {audio_path}")
def demo_advanced_features():
"""Demonstrate advanced features"""
print("\n=== Advanced Features Demo ===")
dalle_config = DalleConfig(api_key=os.getenv("OPENAI_API_KEY"))
whisper_config = WhisperConfig(api_key=os.getenv("OPENAI_API_KEY"))
client = OpenAIMediaClient(dalle_config, whisper_config)
dalle_gen = DALLEGenerator(client)
moderator = ContentModerator()
# Test content moderation
test_prompt = "A beautiful sunset over mountains"
safety_check = moderator.check_prompt_safety(test_prompt)
print(f"Prompt safety check: {safety_check}")
# Generate image grid
prompts = [
"A serene lake at dawn",
"A busy city street at night",
"A cozy cabin in the woods",
"A tropical beach paradise"
]
grid_result = dalle_gen.create_image_grid(
prompts,
grid_size=(2, 2),
save_path="./generated_images/scenery_grid.png"
)
print(f"Image grid result: {grid_result['success']}")
# Main execution
if __name__ == "__main__":
# Set environment variable
os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")
try:
demo_dalle_generation()
demo_whisper_transcription()
demo_advanced_features()
except Exception as e:
print(f"Demo error: {e}")
print("Make sure to set your OPENAI_API_KEY environment variable")
💻 OpenAI Embeddings und Text-Ähnlichkeit python
🔴 complex
⭐⭐⭐⭐⭐
Fortgeschrittene Text-Embedding-Analyse mit Ähnlichkeitssuche, Clustering und semantischen Anwendungen
⏱️ 50 min
🏷️ openai, embeddings, similarity, clustering
Prerequisites:
Python, OpenAI API, scikit-learn, numpy, matplotlib
# OpenAI Embeddings and Text Similarity
# Advanced embedding analysis with similarity search, clustering, and applications
import os
import json
import time
import math
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from sklearn.cluster import KMeans, DBSCAN
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns
from openai import OpenAI
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import silhouette_score
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# 1. Configuration and Client Setup
@dataclass
class EmbeddingConfig:
"""Configuration for OpenAI embeddings"""
api_key: str
organization: Optional[str] = None
model: str = "text-embedding-ada-002"
max_tokens: int = 8192
batch_size: int = 100
dimension: int = 1536 # OpenAI embedding dimension
class EmbeddingClient:
"""Enhanced OpenAI embeddings client with advanced features"""
def __init__(self, config: EmbeddingConfig):
self.config = config
self.client = OpenAI(
api_key=config.api_key,
organization=config.organization
)
self.embedding_cache = {} # Simple cache for embeddings
def get_embedding(self, text: str, model: Optional[str] = None) -> List[float]:
"""Get embedding for a single text"""
# Check cache first
cache_key = f"{text[:100]}_{model or self.config.model}"
if cache_key in self.embedding_cache:
return self.embedding_cache[cache_key]
try:
response = self.client.embeddings.create(
model=model or self.config.model,
input=text.replace("\n", " ") # Remove newlines
)
embedding = response.data[0].embedding
# Cache result
self.embedding_cache[cache_key] = embedding
return embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0.0] * self.config.dimension
def get_batch_embeddings(
self,
texts: List[str],
model: Optional[str] = None,
show_progress: bool = True
) -> List[List[float]]:
"""Get embeddings for multiple texts efficiently"""
embeddings = []
batch_size = self.config.batch_size
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
if show_progress:
print(f"Processing batch {i//batch_size + 1}/{(len(texts) + batch_size - 1)//batch_size}")
try:
response = self.client.embeddings.create(
model=model or self.config.model,
input=[text.replace("\n", " ") for text in batch]
)
batch_embeddings = [item.embedding for item in response.data]
embeddings.extend(batch_embeddings)
# Small delay to avoid rate limits
time.sleep(0.1)
except Exception as e:
print(f"Error processing batch {i}: {e}")
# Add zero embeddings for failed batch
embeddings.extend([[0.0] * self.config.dimension] * len(batch))
return embeddings
# 2. Similarity Analysis
class SimilarityAnalyzer:
"""Advanced similarity analysis using embeddings"""
def __init__(self, embedding_client: EmbeddingClient):
self.client = embedding_client
def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""Calculate cosine similarity between two vectors"""
try:
vec1_np = np.array(vec1)
vec2_np = np.array(vec2)
dot_product = np.dot(vec1_np, vec2_np)
norm1 = np.linalg.norm(vec1_np)
norm2 = np.linalg.norm(vec2_np)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
except Exception as e:
print(f"Error calculating similarity: {e}")
return 0.0
def euclidean_distance(self, vec1: List[float], vec2: List[float]) -> float:
"""Calculate Euclidean distance between two vectors"""
try:
vec1_np = np.array(vec1)
vec2_np = np.array(vec2)
return np.linalg.norm(vec1_np - vec2_np)
except Exception as e:
print(f"Error calculating distance: {e}")
return float('inf')
def find_most_similar(
self,
query_text: str,
documents: List[str],
top_k: int = 5,
similarity_threshold: float = 0.7
) -> List[Dict[str, Any]]:
"""Find most similar documents to query"""
query_embedding = self.client.get_embedding(query_text)
doc_embeddings = self.client.get_batch_embeddings(documents, show_progress=False)
similarities = []
for i, doc_embedding in enumerate(doc_embeddings):
sim = self.cosine_similarity(query_embedding, doc_embedding)
similarities.append({
"document": documents[i],
"similarity": sim,
"index": i
})
# Sort by similarity and filter by threshold
similarities.sort(key=lambda x: x["similarity"], reverse=True)
filtered_similarities = [
sim for sim in similarities
if sim["similarity"] >= similarity_threshold
]
return filtered_similarities[:top_k]
def create_similarity_matrix(
self,
texts: List[str]
) -> Tuple[np.ndarray, List[str]]:
"""Create similarity matrix for a list of texts"""
embeddings = self.client.get_batch_embeddings(texts)
similarity_matrix = cosine_similarity(embeddings)
return similarity_matrix, texts
def visualize_similarity_heatmap(
self,
texts: List[str],
title: str = "Text Similarity Heatmap",
save_path: Optional[str] = None
) -> None:
"""Create and save similarity heatmap"""
similarity_matrix, labels = self.create_similarity_matrix(texts)
# Truncate labels for better display
short_labels = [label[:50] + "..." if len(label) > 50 else label for label in labels]
plt.figure(figsize=(12, 10))
sns.heatmap(
similarity_matrix,
xticklabels=short_labels,
yticklabels=short_labels,
annot=True,
cmap="coolwarm",
center=0,
fmt=".2f"
)
plt.title(title)
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches="tight")
plt.show()
# 3. Text Clustering
class TextClusterer:
"""Advanced text clustering using embeddings"""
def __init__(self, embedding_client: EmbeddingClient):
self.client = embedding_client
def kmeans_clustering(
self,
texts: List[str],
n_clusters: int,
random_state: int = 42
) -> Dict[str, Any]:
"""Perform K-means clustering on texts"""
embeddings = self.client.get_batch_embeddings(texts)
embedding_array = np.array(embeddings)
# Perform K-means clustering
kmeans = KMeans(
n_clusters=n_clusters,
random_state=random_state,
n_init=10
)
cluster_labels = kmeans.fit_predict(embedding_array)
# Calculate silhouette score
silhouette_avg = silhouette_score(embedding_array, cluster_labels)
# Organize results
clusters = {}
for i, label in enumerate(cluster_labels):
if label not in clusters:
clusters[label] = []
clusters[label].append({
"text": texts[i],
"index": i
})
return {
"clusters": clusters,
"cluster_labels": cluster_labels.tolist(),
"cluster_centers": kmeans.cluster_centers_.tolist(),
"silhouette_score": silhouette_avg,
"n_clusters": n_clusters
}
def dbscan_clustering(
self,
texts: List[str],
eps: float = 0.3,
min_samples: int = 2
) -> Dict[str, Any]:
"""Perform DBSCAN clustering on texts"""
embeddings = self.client.get_batch_embeddings(texts)
embedding_array = np.array(embeddings)
# Convert to cosine distance matrix
cosine_dist = 1 - cosine_similarity(embedding_array)
# Perform DBSCAN clustering
dbscan = DBSCAN(
eps=eps,
min_samples=min_samples,
metric="precomputed"
)
cluster_labels = dbscan.fit_predict(cosine_dist)
# Calculate silhouette score (excluding noise points)
if len(set(cluster_labels)) > 1 and -1 not in cluster_labels:
silhouette_avg = silhouette_score(embedding_array, cluster_labels)
else:
silhouette_avg = -1
# Organize results
clusters = {}
noise_points = []
for i, label in enumerate(cluster_labels):
if label == -1:
noise_points.append({
"text": texts[i],
"index": i
})
else:
if label not in clusters:
clusters[label] = []
clusters[label].append({
"text": texts[i],
"index": i
})
return {
"clusters": clusters,
"noise_points": noise_points,
"cluster_labels": cluster_labels.tolist(),
"silhouette_score": silhouette_avg,
"n_clusters": len(clusters),
"n_noise_points": len(noise_points)
}
def find_optimal_clusters(
self,
texts: List[str],
max_clusters: int = 10
) -> Dict[str, Any]:
"""Find optimal number of clusters using elbow method"""
embeddings = self.client.get_batch_embeddings(texts)
embedding_array = np.array(embeddings)
inertias = []
silhouette_scores = []
cluster_range = range(2, min(max_clusters + 1, len(texts)))
for n_clusters in cluster_range:
kmeans = KMeans(
n_clusters=n_clusters,
random_state=42,
n_init=10
)
cluster_labels = kmeans.fit_predict(embedding_array)
inertias.append(kmeans.inertia_)
silhouette_scores.append(silhouette_score(embedding_array, cluster_labels))
# Find optimal number based on silhouette score
optimal_k = cluster_range[np.argmax(silhouette_scores)]
return {
"cluster_range": list(cluster_range),
"inertias": inertias,
"silhouette_scores": silhouette_scores,
"optimal_clusters": optimal_k,
"max_silhouette_score": max(silhouette_scores)
}
def visualize_clusters(
self,
texts: List[str],
cluster_result: Dict[str, Any],
method: str = "tsne",
save_path: Optional[str] = None
) -> None:
"""Visualize clusters using dimensionality reduction"""
embeddings = self.client.get_batch_embeddings(texts)
embedding_array = np.array(embeddings)
# Apply dimensionality reduction
if method == "tsne":
reducer = TSNE(
n_components=2,
random_state=42,
perplexity=min(30, len(texts) - 1)
)
else: # PCA
reducer = PCA(n_components=2, random_state=42)
embeddings_2d = reducer.fit_transform(embedding_array)
cluster_labels = cluster_result["cluster_labels"]
# Create interactive plot
fig = px.scatter(
x=embeddings_2d[:, 0],
y=embeddings_2d[:, 1],
color=cluster_labels,
hover_data=[texts],
title=f"Text Clusters Visualization ({method.upper()})",
labels={
"x": f"{method.upper()} 1",
"y": f"{method.upper()} 2",
"color": "Cluster"
}
)
fig.update_traces(
text=[f"Cluster {label}" for label in cluster_labels],
marker=dict(size=8)
)
if save_path:
fig.write_html(save_path)
fig.show()
# 4. Semantic Search
class SemanticSearchEngine:
"""Advanced semantic search engine"""
def __init__(self, embedding_client: EmbeddingClient):
self.client = embedding_client
self.documents = []
self.embeddings = []
def index_documents(self, documents: List[Dict[str, Any]]) -> None:
"""Index documents for semantic search"""
self.documents = documents
texts = [doc.get("text", "") for doc in documents]
self.embeddings = self.client.get_batch_embeddings(texts)
def search(
self,
query: str,
top_k: int = 10,
similarity_threshold: float = 0.5,
filters: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""Search documents using semantic similarity"""
if not self.documents or not self.embeddings:
return []
# Get query embedding
query_embedding = self.client.get_embedding(query)
query_array = np.array(query_embedding).reshape(1, -1)
# Calculate similarities
embedding_array = np.array(self.embeddings)
similarities = cosine_similarity(query_array, embedding_array)[0]
# Create search results
results = []
for i, similarity in enumerate(similarities):
if similarity >= similarity_threshold:
result = {
"document": self.documents[i],
"similarity": float(similarity),
"index": i
}
# Apply filters if provided
if filters:
match = True
for key, value in filters.items():
if self.documents[i].get(key) != value:
match = False
break
if match:
results.append(result)
else:
results.append(result)
# Sort by similarity and return top_k
results.sort(key=lambda x: x["similarity"], reverse=True)
return results[:top_k]
def hybrid_search(
self,
query: str,
keyword_query: Optional[str] = None,
top_k: int = 10,
semantic_weight: float = 0.7,
keyword_weight: float = 0.3
) -> List[Dict[str, Any]]:
"""Hybrid search combining semantic and keyword search"""
# Semantic search
semantic_results = self.search(query, top_k=top_k * 2)
results = []
for result in semantic_results:
results.append({
**result,
"semantic_score": result["similarity"],
"keyword_score": 0.0
})
# Keyword search if keyword query provided
if keyword_query:
from sklearn.feature_extraction.text import TfidfVectorizer
# Prepare documents
doc_texts = [doc.get("text", "") for doc in self.documents]
# Create TF-IDF vectorizer
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(doc_texts)
# Transform keyword query
query_vector = vectorizer.transform([keyword_query])
# Calculate keyword similarities
keyword_similarities = cosine_similarity(query_vector, tfidf_matrix)[0]
# Add keyword scores to results
for i, score in enumerate(keyword_similarities):
if score > 0:
existing_result = next(
(r for r in results if r["index"] == i),
None
)
if existing_result:
existing_result["keyword_score"] = float(score)
else:
results.append({
"document": self.documents[i],
"similarity": 0.0,
"semantic_score": 0.0,
"keyword_score": float(score),
"index": i
})
# Combine scores
for result in results:
combined_score = (
result["semantic_score"] * semantic_weight +
result["keyword_score"] * keyword_weight
)
result["combined_score"] = combined_score
# Sort by combined score and return top_k
results.sort(key=lambda x: x["combined_score"], reverse=True)
return results[:top_k]
# 5. Text Classification
class TextClassifier:
"""Zero-shot text classification using embeddings"""
def __init__(self, embedding_client: EmbeddingClient):
self.client = embedding_client
def zero_shot_classify(
self,
text: str,
labels: List[str],
top_k: int = 3
) -> List[Dict[str, Any]]:
"""Classify text using zero-shot approach"""
# Create label descriptions
label_descriptions = [
f"This text is about {label}."
for label in labels
]
# Get embeddings
text_embedding = self.client.get_embedding(text)
label_embeddings = self.client.get_batch_embeddings(label_descriptions)
# Calculate similarities
similarities = []
for i, label_embedding in enumerate(label_embeddings):
sim = self.client.embedding_client.embedding_cache.get(
f"{text[:100]}_text-embedding-ada-002"
)
if sim is None:
sim = self.client.embedding_client.embedding_cache.get(
f"{text[:100]}_text-embedding-ada-002"
)
similarity = self.client.embedding_client.embedding_cache.get(
f"{label_descriptions[i][:100]}_text-embedding-ada-002"
)
if similarity is None:
similarity = self.client.embedding_client.embedding_cache.get(
f"{label_descriptions[i][:100]}_text-embedding-ada-002"
)
# Calculate cosine similarity
vec1_np = np.array(text_embedding)
vec2_np = np.array(label_embedding)
dot_product = np.dot(vec1_np, vec2_np)
norm1 = np.linalg.norm(vec1_np)
norm2 = np.linalg.norm(vec2_np)
sim_score = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0
similarities.append({
"label": labels[i],
"similarity": sim_score
})
# Sort by similarity and return top_k
similarities.sort(key=lambda x: x["similarity"], reverse=True)
return similarities[:top_k]
def classify_with_examples(
self,
text: str,
examples: Dict[str, List[str]]
) -> Dict[str, Any]:
"""Classify text using few-shot examples"""
label_similarities = {}
for label, example_texts in examples.items():
# Calculate average similarity with examples
total_similarity = 0
for example_text in example_texts:
similarity = self.client.embedding_client.embedding_cache.get(
f"{text[:100]}_text-embedding-ada-002"
)
if similarity is None:
similarity = self.client.embedding_client.embedding_cache.get(
f"{text[:100]}_text-embedding-ada-002"
)
example_embedding = self.client.embedding_client.embedding_cache.get(
f"{example_text[:100]}_text-embedding-ada-002"
)
if example_embedding is None:
example_embedding = self.client.embedding_client.embedding_cache.get(
f"{example_text[:100]}_text-embedding-ada-002"
)
vec1_np = np.array(similarity)
vec2_np = np.array(example_embedding)
dot_product = np.dot(vec1_np, vec2_np)
norm1 = np.linalg.norm(vec1_np)
norm2 = np.linalg.norm(vec2_np)
sim_score = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0
total_similarity += sim_score
avg_similarity = total_similarity / len(example_texts)
label_similarities[label] = avg_similarity
# Find best matching label
best_label = max(label_similarities, key=label_similarities.get)
confidence = label_similarities[best_label]
return {
"predicted_label": best_label,
"confidence": confidence,
"all_similarities": label_similarities
}
# 6. Advanced Applications
class EmbeddingApplications:
"""Advanced applications using text embeddings"""
@staticmethod
def find_anomalies(
embedding_client: EmbeddingClient,
texts: List[str],
threshold_percentile: float = 95
) -> List[Dict[str, Any]]:
"""Find anomalous texts using embedding distances"""
embeddings = embedding_client.get_batch_embeddings(texts)
embedding_array = np.array(embeddings)
# Calculate pairwise distances
similarities = cosine_similarity(embedding_array)
# Calculate average similarity for each text
avg_similarities = []
for i in range(len(texts)):
# Remove self-similarity
row_similarities = similarities[i, :i].tolist() + similarities[i, i+1:].tolist()
avg_sim = np.mean(row_similarities) if row_similarities else 0
avg_similarities.append(avg_sim)
# Find threshold
threshold = np.percentile(avg_similarities, threshold_percentile)
# Identify anomalies
anomalies = []
for i, avg_sim in enumerate(avg_similarities):
if avg_sim < threshold:
anomalies.append({
"text": texts[i],
"index": i,
"avg_similarity": avg_sim,
"is_anomaly": True
})
return anomalies
@staticmethod
def generate_summaries(
embedding_client: EmbeddingClient,
long_text: str,
num_sentences: int = 3
) -> List[str]:
"""Generate extractive summary using embeddings"""
# Split text into sentences
sentences = [s.strip() for s in long_text.split('.') if s.strip()]
if len(sentences) <= num_sentences:
return sentences
# Get embeddings for all sentences
sentence_embeddings = embedding_client.get_batch_embeddings(sentences)
# Calculate sentence importance using centrality
similarity_matrix = cosine_similarity(sentence_embeddings)
centrality_scores = np.mean(similarity_matrix, axis=1)
# Select top sentences
top_indices = np.argsort(centrality_scores)[-num_sentences:]
top_indices.sort() # Maintain original order
summary = [sentences[i] + '.' for i in top_indices]
return summary
@staticmethod
def track_topic_evolution(
embedding_client: EmbeddingClient,
text_series: List[str],
time_labels: List[str]
) -> Dict[str, Any]:
"""Track topic evolution over time"""
embeddings = embedding_client.get_batch_embeddings(text_series)
embedding_array = np.array(embeddings)
# Calculate similarities between consecutive texts
evolution_scores = []
for i in range(1, len(embedding_array)):
sim = cosine_similarity(
embedding_array[i-1:i],
embedding_array[i:i+1]
)[0][0]
evolution_scores.append({
"from_time": time_labels[i-1],
"to_time": time_labels[i],
"similarity": float(sim),
"change": 1 - float(sim)
})
return {
"evolution_scores": evolution_scores,
"average_change": np.mean([score["change"] for score in evolution_scores]),
"max_change": max([score["change"] for score in evolution_scores]),
"min_change": min([score["change"] for score in evolution_scores])
}
# 7. Demonstration Functions
def demo_similarity_analysis():
"""Demonstrate similarity analysis"""
print("=== Similarity Analysis Demo ===")
config = EmbeddingConfig(api_key=os.getenv("OPENAI_API_KEY"))
client = EmbeddingClient(config)
analyzer = SimilarityAnalyzer(client)
texts = [
"The weather is beautiful today",
"It's a lovely sunny day outside",
"I love programming in Python",
"Machine learning is fascinating",
"AI will change the world",
"Python is great for data science"
]
# Find similar texts
query = "I enjoy coding with Python"
similar = analyzer.find_most_similar(query, texts, top_k=3)
print(f"Most similar to '{query}':")
for result in similar:
print(f"- {result['document']} (similarity: {result['similarity']:.3f})")
# Create similarity matrix
analyzer.visualize_similarity_heatmap(
texts,
title="Text Similarity Heatmap",
save_path="similarity_heatmap.png"
)
def demo_text_clustering():
"""Demonstrate text clustering"""
print("\n=== Text Clustering Demo ===")
config = EmbeddingConfig(api_key=os.getenv("OPENAI_API_KEY"))
client = EmbeddingClient(config)
clusterer = TextClusterer(client)
# Sample texts for clustering
texts = [
"Python is a popular programming language",
"JavaScript is used for web development",
"React is a frontend framework",
"Django is a Python web framework",
"Machine learning algorithms are complex",
"Neural networks are deep learning models",
"Data science involves statistics",
"AI research is advancing rapidly",
"Coffee is a popular beverage",
"Tea has many health benefits",
"Water is essential for life",
"Fruits contain important vitamins"
]
# Find optimal clusters
optimal = clusterer.find_optimal_clusters(texts, max_clusters=6)
print(f"Optimal number of clusters: {optimal['optimal_clusters']}")
print(f"Max silhouette score: {optimal['max_silhouette_score']:.3f}")
# Perform clustering
kmeans_result = clusterer.kmeans_clustering(texts, optimal['optimal_clusters'])
print(f"\nK-means clustering results:")
for cluster_id, cluster_data in kmeans_result['clusters'].items():
print(f"Cluster {cluster_id}: {len(cluster_data)} texts")
for item in cluster_data:
print(f" - {item['text'][:50]}...")
# Visualize clusters
clusterer.visualize_clusters(
texts,
kmeans_result,
method="tsne",
save_path="cluster_visualization.html"
)
def demo_semantic_search():
"""Demonstrate semantic search"""
print("\n=== Semantic Search Demo ===")
config = EmbeddingConfig(api_key=os.getenv("OPENAI_API_KEY"))
client = EmbeddingClient(config)
search_engine = SemanticSearchEngine(client)
# Index documents
documents = [
{"text": "Python is a versatile programming language for web development", "category": "programming"},
{"text": "React is a popular JavaScript library for building user interfaces", "category": "programming"},
{"text": "Machine learning uses algorithms to find patterns in data", "category": "ai"},
{"text": "Neural networks are a key component of deep learning", "category": "ai"},
{"text": "Coffee beans are roasted to develop their flavor", "category": "food"},
{"text": "Tea cultivation requires specific climate conditions", "category": "food"}
]
search_engine.index_documents(documents)
# Semantic search
query = "AI and neural networks"
results = search_engine.search(query, top_k=3)
print(f"Semantic search for '{query}':")
for result in results:
print(f"- {result['document']['text'][:60]}... (similarity: {result['similarity']:.3f})")
# Hybrid search
hybrid_results = search_engine.hybrid_search(
"programming languages",
keyword_query="Python web",
top_k=2
)
print(f"\nHybrid search results:")
for result in hybrid_results:
print(f"- {result['document']['text'][:60]}... (combined score: {result['combined_score']:.3f})")
def demo_advanced_applications():
"""Demonstrate advanced applications"""
print("\n=== Advanced Applications Demo ===")
config = EmbeddingConfig(api_key=os.getenv("OPENAI_API_KEY"))
client = EmbeddingClient(config)
# Anomaly detection
texts = [
"Normal business email about quarterly reports",
"Standard project update with timelines",
"Regular team meeting minutes",
"URGENT: Your account will be suspended immediately",
"Congratulations! You've won $1,000,000!!!",
"Usual marketing campaign performance metrics"
]
anomalies = EmbeddingApplications.find_anomalies(client, texts)
print(f"Found {len(anomalies)} anomalies:")
for anomaly in anomalies:
print(f"- {anomaly['text'][:50]}... (avg similarity: {anomaly['avg_similarity']:.3f})")
# Text summarization
long_text = """
Artificial intelligence has revolutionized many industries. Machine learning algorithms can now
process vast amounts of data to identify patterns and make predictions. Deep learning, a subset
of machine learning, uses neural networks with multiple layers to learn hierarchical representations.
These technologies are being applied in healthcare, finance, transportation, and entertainment.
As AI continues to evolve, it raises important questions about ethics, privacy, and the future
of work. Researchers are working on developing more interpretable and fair AI systems.
"""
summary = EmbeddingApplications.generate_summaries(client, long_text, num_sentences=3)
print(f"\nGenerated summary:")
for sentence in summary:
print(f"- {sentence}")
# Main execution
if __name__ == "__main__":
# Set environment variable
os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")
try:
demo_similarity_analysis()
demo_text_clustering()
demo_semantic_search()
demo_advanced_applications()
except Exception as e:
print(f"Demo error: {e}")
print("Make sure to set your OPENAI_API_KEY environment variable")