🎯 Рекомендуемые коллекции
Балансированные коллекции примеров кода из различных категорий, которые вы можете исследовать
Примеры LangChain
Примеры фреймворка разработки ИИ-приложений с использованием LangChain для создания приложений на основе LLM
💻 Основы LangChain и Базовые Цепочки python
🟢 simple
⭐⭐
Основные концепции и реализации базовых цепочек включая LLM-цепочки, шаблоны промптов и простые последовательные цепочки
⏱️ 30 min
🏷️ langchain, ai, chains, python
Prerequisites:
Python basics, OpenAI API key, LangChain installation
# LangChain Fundamentals and Basic Chains
# Core concepts and implementations
import os
from typing import List, Dict, Any
from langchain.llms import OpenAI, Anthropic, HuggingFaceHub
from langchain.chat_models import ChatOpenAI, ChatAnthropic
from langchain.prompts import PromptTemplate, ChatPromptTemplate, FewShotPromptTemplate
from langchain.chains import LLMChain, SimpleSequentialChain, SequentialChain
from langchain.memory import ConversationBufferMemory, ConversationBufferWindowMemory
from langchain.schema import HumanMessage, AIMessage, SystemMessage
# 1. Initialize LLM Models
def initialize_models():
"""Initialize different LLM models for LangChain usage"""
# OpenAI models
openai_llm = OpenAI(
openai_api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-3.5-turbo-instruct",
temperature=0.7,
max_tokens=1000
)
openai_chat = ChatOpenAI(
openai_api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-3.5-turbo",
temperature=0.7
)
# Anthropic models
anthropic_llm = Anthropic(
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-2.1",
temperature=0.7
)
anthropic_chat = ChatAnthropic(
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-sonnet-20240229",
temperature=0.7
)
# HuggingFace Hub models
hf_llm = HuggingFaceHub(
repo_id="google/flan-t5-large",
huggingfacehub_api_token=os.getenv("HUGGINGFACEHUB_API_TOKEN"),
model_kwargs={"temperature": 0.7, "max_length": 512}
)
return {
'openai_llm': openai_llm,
'openai_chat': openai_chat,
'anthropic_llm': anthropic_llm,
'anthropic_chat': anthropic_chat,
'huggingface_llm': hf_llm
}
# 2. Basic Prompt Templates
def create_prompt_templates():
"""Create and demonstrate various prompt templates"""
# Simple prompt template
simple_template = PromptTemplate(
input_variables=["product", "feature"],
template="Describe the {feature} of {product} in detail."
)
# Chat prompt template for conversation
chat_template = ChatPromptTemplate.from_messages([
("system", "You are a helpful AI assistant that specializes in {topic}."),
("human", "{question}"),
])
# Template with examples
example_template = FewShotPromptTemplate(
examples=[
{
"input": "2 + 2",
"output": "4"
},
{
"input": "5 + 3",
"output": "8"
}
],
example_prompt=PromptTemplate(
input_variables=["input", "output"],
template="Question: {input}\nAnswer: {output}"
),
prefix="Here are some examples:",
suffix="Question: {input}\nAnswer:",
input_variables=["input"]
)
# Template for story generation
story_template = PromptTemplate(
input_variables=["genre", "character", "setting"],
template="""Write a short {genre} story about {character} in {setting}.
The story should be engaging and creative."""
)
return {
'simple': simple_template,
'chat': chat_template,
'examples': example_template,
'story': story_template
}
# 3. Basic LLM Chains
def create_basic_chains(models, templates):
"""Create various types of basic chains"""
# Simple LLM Chain
simple_chain = LLMChain(
llm=models['openai_llm'],
prompt=templates['simple']
)
# Chain for story generation
story_chain = LLMChain(
llm=models['openai_llm'],
prompt=templates['story']
)
# Chain with memory
memory = ConversationBufferMemory()
conversation_chain = LLMChain(
llm=models['openai_chat'],
prompt=PromptTemplate(
input_variables=["input", "chat_history"],
template="Current conversation:\n{chat_history}\nHuman: {input}\nAI:"
),
memory=memory
)
return {
'simple': simple_chain,
'story': story_chain,
'conversation': conversation_chain
}
# 4. Sequential Chains
def create_sequential_chains(models, templates):
"""Create sequential chains for multi-step processing"""
# Chain 1: Generate ideas
idea_prompt = PromptTemplate(
input_variables=["topic"],
template="Generate 3 creative ideas related to {topic}."
)
idea_chain = LLMChain(llm=models['openai_llm'], prompt=idea_prompt)
# Chain 2: Expand on the best idea
expansion_prompt = PromptTemplate(
input_variables=["ideas"],
template="From these ideas:\n{ideas}\nExpand on the most promising one with details."
)
expansion_chain = LLMChain(llm=models['openai_llm'], prompt=expansion_prompt)
# Simple Sequential Chain
sequential_chain = SimpleSequentialChain(
chains=[idea_chain, expansion_chain],
verbose=True
)
# More complex sequential chain
title_prompt = PromptTemplate(
input_variables=["content"],
template="Generate a catchy title for: {content}"
)
title_chain = LLMChain(llm=models['openai_llm'], prompt=title_prompt)
summary_prompt = PromptTemplate(
input_variables=["content"],
template="Create a one-sentence summary: {content}"
)
summary_chain = LLMChain(llm=models['openai_llm'], prompt=summary_prompt)
# Complex Sequential Chain
complex_chain = SequentialChain(
chains=[idea_chain, expansion_chain, title_chain, summary_chain],
input_variables=["topic"],
output_variables=["ideas", "expanded_content", "title", "summary"],
verbose=True
)
return {
'simple': sequential_chain,
'complex': complex_chain
}
# 5. Memory Management
def demonstrate_memory_management(models):
"""Different types of memory for conversation context"""
# Buffer Memory (keeps all conversation)
buffer_memory = ConversationBufferMemory(
return_messages=True,
memory_key="chat_history"
)
# Window Memory (keeps last k messages)
window_memory = ConversationBufferWindowMemory(
k=5, # Keep last 5 exchanges
return_messages=True,
memory_key="chat_history"
)
# Chains with different memory types
buffer_chain = LLMChain(
llm=models['openai_chat'],
prompt=PromptTemplate(
input_variables=["input", "chat_history"],
template="Previous conversation:\n{chat_history}\nCurrent question: {input}\nAnswer:"
),
memory=buffer_memory
)
window_chain = LLMChain(
llm=models['openai_chat'],
prompt=PromptTemplate(
input_variables=["input", "chat_history"],
template="Recent conversation:\n{chat_history}\nCurrent question: {input}\nAnswer:"
),
memory=window_memory
)
return {
'buffer_chain': buffer_chain,
'window_chain': window_chain,
'buffer_memory': buffer_memory,
'window_memory': window_memory
}
# 6. Output Parsers
from langchain.output_parsers import (
StructuredOutputParser,
ResponseSchema,
CommaSeparatedListOutputParser,
DatetimeOutputParser
)
def create_output_parsers():
"""Create different output parsers for structured responses"""
# Structured output parser
response_schemas = [
ResponseSchema(
name="summary",
description="Brief summary of the input text"
),
ResponseSchema(
name="key_points",
description="List of key points from the text"
),
ResponseSchema(
name="sentiment",
description="Overall sentiment of the text (positive, negative, neutral)"
)
]
structured_parser = StructuredOutputParser.from_response_schemas(response_schemas)
# Comma-separated list parser
list_parser = CommaSeparatedListOutputParser()
# Datetime parser
datetime_parser = DatetimeOutputParser()
# Format instructions for parsers
structured_format_instructions = structured_parser.get_format_instructions()
list_format_instructions = list_parser.get_format_instructions()
datetime_format_instructions = datetime_parser.get_format_instructions()
return {
'structured': structured_parser,
'list': list_parser,
'datetime': datetime_parser,
'structured_instructions': structured_format_instructions,
'list_instructions': list_format_instructions,
'datetime_instructions': datetime_format_instructions
}
# 7. Custom Chain Class
from langchain.chains.base import Chain
from langchain.schema import BaseLanguageModel
from typing import Optional, Dict, Any
class CustomAnalysisChain(Chain):
"""Custom chain for text analysis with multiple outputs"""
input_variables: List[str] = ["text"]
output_variables: List[str] = ["sentiment", "keywords", "summary"]
llm: BaseLanguageModel
def __init__(self, llm: BaseLanguageModel, **kwargs):
super().__init__(llm=llm, **kwargs)
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
text = inputs["text"]
# Prompt templates for different analyses
sentiment_prompt = f"Analyze the sentiment of this text (positive/negative/neutral): {text}"
keywords_prompt = f"Extract 5 key topics from this text: {text}"
summary_prompt = f"Summarize this text in one sentence: {text}"
# Get responses
sentiment = self.llm(sentiment_prompt).strip()
keywords = self.llm(keywords_prompt).strip()
summary = self.llm(summary_prompt).strip()
return {
"sentiment": sentiment,
"keywords": keywords,
"summary": summary
}
# 8. Error Handling and Retry Logic
from langchain.callbacks import get_openai_callback
import time
from tenacity import retry, stop_after_attempt, wait_exponential
class RobustLLMChain:
"""Chain with built-in error handling and retry logic"""
def __init__(self, llm, max_retries=3):
self.llm = llm
self.max_retries = max_retries
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def generate_with_retry(self, prompt: str) -> str:
"""Generate response with exponential backoff retry"""
return self.llm(prompt)
def safe_generate(self, prompt: str, timeout: int = 30) -> Optional[str]:
"""Generate response with timeout and error handling"""
try:
with get_openai_callback() as cb:
response = self.generate_with_retry(prompt)
print(f"Tokens used: {cb.total_tokens}, Cost: ${cb.total_cost:.4f}")
return response
except Exception as e:
print(f"Error generating response: {e}")
return None
# 9. Demonstration Functions
def demo_basic_usage():
"""Demonstrate basic LangChain usage"""
print("=== LangChain Basic Usage Demo ===")
# Initialize models
models = initialize_models()
# Create templates
templates = create_prompt_templates()
# Create chains
chains = create_basic_chains(models, templates)
# Use simple chain
result = chains['simple'].run({
"product": "Smart Home Assistant",
"feature": "voice recognition"
})
print(f"Simple Chain Result: {result}")
# Use story chain
story_result = chains['story'].run({
"genre": "science fiction",
"character": "AI botanist",
"setting": "Mars colony"
})
print(f"Story Result: {story_result}")
def demo_sequential_chains():
"""Demonstrate sequential chains"""
print("\n=== Sequential Chains Demo ===")
models = initialize_models()
templates = create_prompt_templates()
chains = create_sequential_chains(models, templates)
# Use complex sequential chain
result = chains['complex'].run({
"topic": "sustainable urban transportation"
})
print(f"Complex Chain Result: {result}")
def demo_memory_usage():
"""Demonstrate memory in conversations"""
print("\n=== Memory Usage Demo ===")
models = initialize_models()
memory_chains = demonstrate_memory_management(models)
# Test conversation with memory
conversation_chain = memory_chains['buffer_chain']
# First message
response1 = conversation_chain.run("Hi, I'm interested in renewable energy.")
print(f"AI Response 1: {response1}")
# Follow-up message
response2 = conversation_chain.run("What are the latest developments?")
print(f"AI Response 2: {response2}")
# Check memory
memory = memory_chains['buffer_memory']
print(f"Conversation History: {memory.chat_memory.messages}")
def demo_output_parsing():
"""Demonstrate output parsers"""
print("\n=== Output Parsing Demo ===")
models = initialize_models()
parsers = create_output_parsers()
# Use structured output parser
structured_parser = parsers['structured']
format_instructions = parsers['structured_instructions']
prompt = PromptTemplate(
input_variables=["text"],
template="Analyze the following text:\n{text}\n\n{format_instructions}"
)
chain = LLMChain(
llm=models['openai_llm'],
prompt=prompt
)
result = chain.run({
"text": "LangChain is an amazing framework that makes building AI applications much easier. It provides various components like chains, agents, and memory management. I love how it simplifies complex workflows.",
"format_instructions": format_instructions
})
# Parse the result
parsed_result = structured_parser.parse(result)
print(f"Parsed Result: {parsed_result}")
def demo_custom_chain():
"""Demonstrate custom chain implementation"""
print("\n=== Custom Chain Demo ===")
models = initialize_models()
custom_chain = CustomAnalysisChain(llm=models['openai_llm'])
result = custom_chain.run({
"text": "The weather today is absolutely beautiful! The sun is shining, birds are singing, and I feel energized. This perfect weather reminds me of childhood summers."
})
print(f"Custom Chain Analysis: {result}")
# Main execution
if __name__ == "__main__":
# Set environment variables (in production, use .env file or environment)
os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")
os.environ.setdefault("ANTHROPIC_API_KEY", "your-anthropic-api-key")
os.environ.setdefault("HUGGINGFACEHUB_API_TOKEN", "your-huggingface-token")
# Run demonstrations
try:
demo_basic_usage()
demo_sequential_chains()
demo_memory_usage()
demo_output_parsing()
demo_custom_chain()
except Exception as e:
print(f"Demo error: {e}")
print("Make sure to set your API keys in environment variables")
💻 Агенты и Инструменты LangChain python
🔴 complex
⭐⭐⭐⭐
Продвинутая реализация агентов с кастомными инструментами, веб-поиском, интеграцией API и мультиагентными системами
⏱️ 45 min
🏷️ langchain, agents, tools, automation
Prerequisites:
Python, LangChain basics, API keys
# LangChain Agents and Tools Implementation
# Advanced agent systems with custom tools and capabilities
import os
import json
import requests
from typing import List, Dict, Any, Optional
from datetime import datetime
from langchain.llms import OpenAI, ChatOpenAI
from langchain.agents import Tool, AgentExecutor, initialize_agent, AgentType
from langchain.agents.react.base import DocstoreExplorer
from langchain.tools import BaseTool
from langchain.tools.google_search import GoogleSearchAPIWrapper
from langchain.tools wikipedia.tool import WikipediaQueryRun
from langchain.utilities import WikipediaAPIWrapper
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
from langchain.schema import AIMessage, HumanMessage
from pydantic import BaseModel, Field
import wikipedia
import numpy as np
# 1. Custom Tool Class Implementation
class WeatherTool(BaseTool):
"""Custom weather tool using OpenWeatherMap API"""
name = "weather"
description = "Get current weather information for any city"
def __init__(self):
super().__init__()
self.api_key = os.getenv("OPENWEATHER_API_KEY")
self.base_url = "https://api.openweathermap.org/data/2.5/weather"
def _run(self, city: str) -> str:
"""Execute the weather tool"""
try:
params = {
'q': city,
'appid': self.api_key,
'units': 'metric'
}
response = requests.get(self.base_url, params=params)
response.raise_for_status()
data = response.json()
weather_info = f"""
Weather in {data['name']}:
- Temperature: {data['main']['temp']}°C
- Feels like: {data['main']['feels_like']}°C
- Description: {data['weather'][0]['description']}
- Humidity: {data['main']['humidity']}%
- Wind speed: {data['wind']['speed']} m/s
"""
return weather_info.strip()
except Exception as e:
return f"Error getting weather for {city}: {str(e)}"
async def _arun(self, city: str) -> str:
"""Async version of the tool"""
return self._run(city)
class CalculatorTool(BaseTool):
"""Advanced calculator tool with multiple operations"""
name = "calculator"
description = "Perform mathematical calculations. Accepts expressions like '2 + 2', 'sqrt(16)', 'sin(30 degrees)'"
def _run(self, expression: str) -> str:
"""Execute calculator tool"""
try:
# Safe evaluation using numpy
# Remove potentially dangerous operations
allowed_functions = {
'sin': np.sin,
'cos': np.cos,
'tan': np.tan,
'sqrt': np.sqrt,
'log': np.log,
'exp': np.exp,
'abs': np.abs,
'round': np.round,
'floor': np.floor,
'ceil': np.ceil
}
# Convert degrees to radians for trig functions
expression = expression.replace('degrees', '* np.pi/180')
# Create safe namespace
safe_dict = {
'__builtins__': {},
'np': np,
**allowed_functions
}
# Evaluate expression
result = eval(expression, safe_dict)
return f"Result: {result}"
except Exception as e:
return f"Calculation error: {str(e)}\nSupported operations: +, -, *, /, **, sqrt(), sin(), cos(), tan(), log(), exp()"
async def _arun(self, expression: str) -> str:
return self._run(expression)
class FileAnalyzerTool(BaseTool):
"""Tool for analyzing text files"""
name = "file_analyzer"
description = "Analyze text files for word count, reading time, and sentiment"
def _run(self, file_path: str) -> str:
"""Execute file analyzer tool"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
word_count = len(content.split())
char_count = len(content)
line_count = len(content.split('\n'))
# Estimate reading time (average 200 words per minute)
reading_time = max(1, word_count // 200)
analysis = f"""
File Analysis for {file_path}:
- Word count: {word_count}
- Character count: {char_count}
- Line count: {line_count}
- Estimated reading time: {reading_time} minutes
- File size: {os.path.getsize(file_path)} bytes
"""
return analysis.strip()
except FileNotFoundError:
return f"File not found: {file_path}"
except Exception as e:
return f"Error analyzing file: {str(e)}"
async def _arun(self, file_path: str) -> str:
return self._run(file_path)
class EmailTool(BaseTool):
"""Tool for sending email notifications (demo implementation)"""
name = "email_sender"
description = "Send email notifications (demo - requires proper email configuration)"
def _run(self, recipient: str, subject: str, body: str) -> str:
"""Execute email tool (demo)"""
# This is a demo implementation
# In production, integrate with actual email service
email_info = f"""
Email Information:
- To: {recipient}
- Subject: {subject}
- Body: {body[:100]}{'...' if len(body) > 100 else ''}
- Status: Queued for delivery (demo mode)
- Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return email_info.strip()
async def _arun(self, recipient: str, subject: str, body: str) -> str:
return self._run(recipient, subject, body)
# 2. Initialize Standard Tools
def initialize_standard_tools():
"""Initialize built-in LangChain tools"""
# Wikipedia tool
wikipedia_wrapper = WikipediaAPIWrapper()
wikipedia_tool = WikipediaQueryRun(api_wrapper=wikipedia_wrapper)
# Google Search tool (requires API key)
search_tool = None
if os.getenv("GOOGLE_API_KEY") and os.getenv("GOOGLE_CSE_ID"):
search = GoogleSearchAPIWrapper()
search_tool = Tool(
name="google_search",
description="Search Google for recent results",
func=search.run
)
return {
'wikipedia': wikipedia_tool,
'google_search': search_tool
}
# 3. Initialize Custom Tools
def initialize_custom_tools():
"""Initialize custom tools"""
return {
'weather': WeatherTool(),
'calculator': CalculatorTool(),
'file_analyzer': FileAnalyzerTool(),
'email_sender': EmailTool()
}
# 4. Create Different Types of Agents
def create_react_agent(llm, tools):
"""Create a ReAct (Reasoning and Acting) agent"""
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.REACT_DOCSTORE,
verbose=True,
max_iterations=5,
early_stopping_method="generate"
)
return agent
def create_conversational_agent(llm, tools):
"""Create a conversational agent with memory"""
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
memory=memory,
verbose=True,
max_iterations=5
)
return agent
def create_structured_agent(llm, tools):
"""Create a structured chat agent"""
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
max_iterations=5
)
return agent
# 5. Multi-Agent System
class MultiAgentSystem:
"""System for coordinating multiple specialized agents"""
def __init__(self, llm):
self.llm = llm
self.agents = {}
self.initialize_agents()
def initialize_agents(self):
"""Initialize specialized agents for different tasks"""
# Research agent
research_tools = [
initialize_standard_tools()['wikipedia'],
initialize_standard_tools()['google_search']
].filter(tool => tool is not None)
self.agents['researcher'] = create_react_agent(self.llm, research_tools)
# Analyst agent
analyst_tools = [
initialize_custom_tools()['calculator'],
initialize_custom_tools()['file_analyzer']
]
self.agents['analyst'] = create_react_agent(self.llm, analyst_tools)
# Assistant agent
assistant_tools = [
initialize_custom_tools()['weather'],
initialize_custom_tools()['email_sender']
]
self.agents['assistant'] = create_conversational_agent(self.llm, assistant_tools)
def route_task(self, task: str) -> str:
"""Route task to appropriate agent based on content analysis"""
routing_prompt = PromptTemplate(
input_variables=["task"],
template="""Analyze this task and determine which agent should handle it:
Task: {task}
Available agents:
- researcher: Handles research, fact-finding, and information gathering
- analyst: Handles calculations, data analysis, and file processing
- assistant: Handles general assistance, weather, and communication
Respond with only the agent name (researcher, analyst, or assistant):"""
)
routing_chain = LLMChain(llm=self.llm, prompt=routing_prompt)
agent_choice = routing_chain.run(task=task).strip().lower()
# Validate agent choice
if agent_choice not in self.agents:
agent_choice = 'assistant' # Default fallback
# Execute task with chosen agent
agent = self.agents[agent_choice]
result = agent.run(task)
return f"Agent used: {agent_choice}\nResult: {result}"
# 6. Custom Agent with Planning
class PlanningAgent:
"""Agent that can break down complex tasks into subtasks"""
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
self.planning_chain = self._create_planning_chain()
def _create_planning_chain(self):
"""Create a chain for task planning"""
planning_prompt = PromptTemplate(
input_variables=["task", "available_tools"],
template="""Given the task "{task}" and these available tools: {available_tools}
Break down this task into a step-by-step plan. For each step, specify:
1. What needs to be done
2. Which tool to use
3. Expected outcome
Format as a numbered list."""
)
return LLMChain(llm=self.llm, prompt=planning_prompt)
def execute_task(self, task: str) -> str:
"""Execute task with planning phase"""
# Get available tools info
tools_info = "\n".join([f"- {tool.name}: {tool.description}" for tool in self.tools])
# Generate plan
plan = self.planning_chain.run(task=task, available_tools=tools_info)
print(f"Generated Plan:\n{plan}\n")
# Create agent to execute the plan
agent = initialize_agent(
tools=self.tools,
llm=self.llm,
agent=AgentType.REACT_DOCSTORE,
verbose=True
)
# Execute the original task
result = agent.run(task)
return f"Plan:\n{plan}\n\nExecution Result:\n{result}"
# 7. Agent with Self-Correction
class SelfCorrectingAgent:
"""Agent that can review and correct its own responses"""
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
self.agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.REACT_DOCSTORE,
verbose=True
)
self.review_chain = self._create_review_chain()
def _create_review_chain(self):
"""Create a chain for reviewing responses"""
review_prompt = PromptTemplate(
input_variables=["task", "response"],
template="""Review this response for the given task:
Task: {task}
Response: {response}
Check for:
1. Accuracy and correctness
2. Completeness
3. Clarity and usefulness
4. Any potential improvements
Provide a brief review and suggest corrections if needed."""
)
return LLMChain(llm=self.llm, prompt=review_prompt)
def execute_with_correction(self, task: str, max_corrections: int = 2) -> str:
"""Execute task with self-correction capability"""
initial_response = self.agent.run(task)
current_response = initial_response
for attempt in range(max_corrections):
# Review the current response
review = self.review_chain.run(task=task, response=current_response)
print(f"Review (attempt {attempt + 1}): {review}")
# Check if corrections are needed
if "improvement" in review.lower() or "correction" in review.lower() or "error" in review.lower():
correction_prompt = f"""Based on this review: {review}
Please correct and improve this response for the task: {task}
Previous response: {current_response}
Provide the corrected response:"""
corrected_response = self.llm(correction_prompt)
current_response = corrected_response.strip()
else:
break # No corrections needed
return current_response
# 8. Demonstration Functions
def demo_custom_tools():
"""Demonstrate custom tools"""
print("=== Custom Tools Demo ===")
tools = initialize_custom_tools()
# Test weather tool
weather_result = tools['weather']._run("New York")
print(f"Weather Tool Result: {weather_result}")
# Test calculator tool
calc_result = tools['calculator']._run("sqrt(144) + 10")
print(f"Calculator Tool Result: {calc_result}")
def demo_react_agent():
"""Demonstrate ReAct agent"""
print("\n=== ReAct Agent Demo ===")
llm = ChatOpenAI(temperature=0)
tools = initialize_standard_tools()
# Filter out None tools
available_tools = [tool for tool in tools.values() if tool is not None]
if not available_tools:
print("No tools available. Please set up API keys.")
return
agent = create_react_agent(llm, available_tools)
# Test with a research query
result = agent.run("What is the latest news about artificial intelligence?")
print(f"ReAct Agent Result: {result}")
def demo_conversational_agent():
"""Demonstrate conversational agent with memory"""
print("\n=== Conversational Agent Demo ===")
llm = ChatOpenAI(temperature=0.7)
tools = initialize_custom_tools()
agent = create_conversational_agent(llm, list(tools.values()))
# Test conversation
response1 = agent.run("Hi, can you help me?")
print(f"Agent Response 1: {response1}")
response2 = agent.run("What's the weather like in London?")
print(f"Agent Response 2: {response2}")
def demo_multi_agent_system():
"""Demonstrate multi-agent system"""
print("\n=== Multi-Agent System Demo ===")
llm = ChatOpenAI(temperature=0)
multi_agent = MultiAgentSystem(llm)
# Test different task types
tasks = [
"Research the history of renewable energy",
"Calculate the compound interest on $10,000 at 5% for 10 years",
"Send me weather information for Tokyo"
]
for task in tasks:
print(f"\nTask: {task}")
result = multi_agent.route_task(task)
print(f"Result: {result}")
def demo_planning_agent():
"""Demonstrate planning agent"""
print("\n=== Planning Agent Demo ===")
llm = ChatOpenAI(temperature=0)
tools = [
initialize_standard_tools()['wikipedia'],
initialize_custom_tools()['calculator']
]
planning_agent = PlanningAgent(llm, tools)
result = planning_agent.execute_task("Analyze the economic impact of renewable energy adoption")
print(f"Planning Agent Result: {result}")
def demo_self_correcting_agent():
"""Demonstrate self-correcting agent"""
print("\n=== Self-Correcting Agent Demo ===")
llm = ChatOpenAI(temperature=0)
tools = [initialize_custom_tools()['calculator']]
correcting_agent = SelfCorrectingAgent(llm, tools)
result = correcting_agent.execute_with_correction("Calculate the area of a circle with radius 5. Use the formula πr²")
print(f"Self-Correcting Agent Result: {result}")
# Main execution
if __name__ == "__main__":
# Set environment variables
os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")
os.environ.setdefault("GOOGLE_API_KEY", "your-google-api-key")
os.environ.setdefault("GOOGLE_CSE_ID", "your-google-cse-id")
os.environ.setdefault("OPENWEATHER_API_KEY", "your-openweather-api-key")
try:
demo_custom_tools()
demo_react_agent()
demo_conversational_agent()
demo_multi_agent_system()
demo_planning_agent()
demo_self_correcting_agent()
except Exception as e:
print(f"Demo error: {e}")
print("Make sure to set your API keys in environment variables")
💻 Векторные Хранилища и RAG с LangChain python
🔴 complex
⭐⭐⭐⭐⭐
Интеграция векторных баз данных с RAG (Retrieval-Augmented Generation) используя Chroma, Pinecone и кастомные векторные хранилища
⏱️ 50 min
🏷️ langchain, vector-stores, rag, embeddings
Prerequisites:
Python, LangChain, Vector databases, OpenAI API
# LangChain Vector Stores and RAG Implementation
# Retrieval-Augmented Generation with various vector databases
import os
import json
import uuid
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
import numpy as np
from langchain.llms import OpenAI, ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.vectorstores import Chroma, FAISS, Pinecone
from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
from langchain.docstore.document import Document
from langchain.chains import RetrievalQA, ConversationalRetrievalChain
from langchain.chains.question_answering import load_qa_chain
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.schema import BaseRetriever
import chromadb
import pinecone
# 1. Text Processing and Chunking
class TextProcessor:
"""Advanced text processing with multiple chunking strategies"""
def __init__(self):
self.chunkers = {
'recursive': RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""]
),
'character': CharacterTextSplitter(
chunk_size=1000,
chunk_overlap=0,
separator="\n\n"
),
'semantic': RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=300,
length_function=len,
separators=["\n\n\n", "\n\n", "\n", " ", ""]
)
}
def process_document(self, text: str, source: str, chunking_strategy: str = 'recursive') -> List[Document]:
"""Process text into document chunks with metadata"""
chunker = self.chunkers[chunking_strategy]
chunks = chunker.create_documents([text])
# Add metadata to each chunk
for i, chunk in enumerate(chunks):
chunk.metadata.update({
'source': source,
'chunk_id': i,
'chunk_count': len(chunks),
'timestamp': datetime.now().isoformat(),
'chunking_strategy': chunking_strategy
})
return chunks
def process_file(self, file_path: str, chunking_strategy: str = 'recursive') -> List[Document]:
"""Process file into document chunks"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
return self.process_document(content, file_path, chunking_strategy)
except Exception as e:
print(f"Error processing file {file_path}: {e}")
return []
# 2. Vector Store Implementations
class VectorStoreManager:
"""Manage multiple vector store implementations"""
def __init__(self, embeddings):
self.embeddings = embeddings
self.stores = {}
def create_chroma_store(self, collection_name: str, persist_directory: str = "./chroma_db") -> Chroma:
"""Create and configure Chroma vector store"""
client = chromadb.PersistentClient(path=persist_directory)
store = Chroma(
client=client,
collection_name=collection_name,
embedding_function=self.embeddings,
persist_directory=persist_directory
)
self.stores['chroma'] = store
return store
def create_faiss_store(self, documents: List[Document]) -> FAISS:
"""Create FAISS vector store from documents"""
store = FAISS.from_documents(documents, self.embeddings)
self.stores['faiss'] = store
return store
def create_pinecone_store(self, index_name: str, dimension: int = 1536) -> Pinecone:
"""Create Pinecone vector store"""
# Initialize Pinecone
pinecone.init(
api_key=os.getenv("PINECONE_API_KEY"),
environment=os.getenv("PINECONE_ENVIRONMENT")
)
# Create index if it doesn't exist
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=dimension,
metric="cosine"
)
store = Pinecone.from_existing_index(
index_name=index_name,
embedding=self.embeddings
)
self.stores['pinecone'] = store
return store
def get_store(self, store_type: str):
"""Get a specific vector store"""
return self.stores.get(store_type)
# 3. Custom Retriever Implementation
class HybridRetriever(BaseRetriever):
"""Hybrid retriever combining multiple retrieval strategies"""
def __init__(self, vector_store, keyword_search_weight: float = 0.3):
super().__init__()
self.vector_store = vector_store
self.keyword_search_weight = keyword_search_weight
self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
def _get_relevant_documents(self, query: str, k: int = 4) -> List[Document]:
"""Retrieve documents using hybrid approach"""
# Vector search
vector_docs = self.vector_store.similarity_search(query, k=k)
# Keyword-based search (simplified)
keyword_docs = self._keyword_search(query, k=k)
# Combine and rank results
combined_docs = self._combine_results(vector_docs, keyword_docs, k)
return combined_docs
def _keyword_search(self, query: str, k: int = 4) -> List[Document]:
"""Simple keyword-based search implementation"""
# This is a simplified implementation
# In production, use BM25 or other advanced algorithms
query_words = set(query.lower().split())
scored_docs = []
for doc in self.vector_store.docstore.values():
if hasattr(doc, 'page_content'):
doc_words = set(doc.page_content.lower().split())
intersection = query_words.intersection(doc_words)
score = len(intersection) / len(query_words)
if score > 0:
scored_docs.append((doc, score))
# Sort by score and return top k
scored_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in scored_docs[:k]]
def _combine_results(self, vector_docs: List[Document], keyword_docs: List[Document], k: int) -> List[Document]:
"""Combine vector and keyword search results"""
# Score documents
scored_docs = {}
# Score vector documents
for i, doc in enumerate(vector_docs):
score = 1.0 - (i / len(vector_docs)) # Higher score for better rank
scored_docs[doc] = score * (1 - self.keyword_search_weight)
# Score keyword documents
for i, doc in enumerate(keyword_docs):
score = 1.0 - (i / len(keyword_docs))
if doc in scored_docs:
scored_docs[doc] += score * self.keyword_search_weight
else:
scored_docs[doc] = score * self.keyword_search_weight
# Sort by score and return top k
sorted_docs = sorted(scored_docs.items(), key=lambda x: x[1], reverse=True)
return [doc for doc, _ in sorted_docs[:k]]
# 4. RAG Chain Implementations
class RAGChainManager:
"""Manage different RAG chain configurations"""
def __init__(self, llm, vector_store):
self.llm = llm
self.vector_store = vector_store
self.chains = {}
def create_basic_rag(self, prompt_template: Optional[str] = None) -> RetrievalQA:
"""Create basic RAG chain"""
if not prompt_template:
prompt_template = """Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know. Don't try to make up an answer.
Context: {context}
Question: {question}
Helpful Answer:"""
prompt = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.vector_store.as_retriever(),
chain_type_kwargs={"prompt": prompt},
return_source_documents=True
)
self.chains['basic'] = chain
return chain
def create_conversational_rag(self) -> ConversationalRetrievalChain:
"""Create conversational RAG chain with memory"""
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful AI assistant. Use the following context to answer questions.
If the context doesn't contain the answer, say you don't know based on the provided information.
Context: {context}"""),
("human", "{question}"),
])
chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.vector_store.as_retriever(),
memory=memory,
combine_docs_chain_kwargs={"prompt": prompt},
return_source_documents=True
)
self.chains['conversational'] = chain
return chain
def create_map_reduce_rag(self) -> RetrievalQA:
"""Create RAG chain with map-reduce document processing"""
question_prompt = PromptTemplate(
template="""For the following piece of context, answer any questions based on this context.
If the context doesn't contain the answer, say that you don't know.
Context: {context}
Question: {question}
Answer:""",
input_variables=["context", "question"]
)
combine_prompt = PromptTemplate(
template="""Given the following extracted parts of a long document and a question,
create a final answer. If you don't know the answer, just say that you don't know.
SUMMARIES: {summaries}
Question: {question}
Final Answer:""",
input_variables=["summaries", "question"]
)
chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="map_reduce",
retriever=self.vector_store.as_retriever(),
chain_type_kwargs={
"question_prompt": question_prompt,
"combine_prompt": combine_prompt
},
return_source_documents=True
)
self.chains['map_reduce'] = chain
return chain
# 5. Document Indexing and Management
class DocumentIndexer:
"""Manage document indexing and updates"""
def __init__(self, vector_store_manager, text_processor):
self.vector_store_manager = vector_store_manager
self.text_processor = text_processor
self.indexed_documents = {}
def index_document(self, content: str, doc_id: str, metadata: Dict[str, Any] = None) -> bool:
"""Index a single document"""
try:
# Process document into chunks
chunks = self.text_processor.process_document(content, doc_id)
# Add additional metadata
for chunk in chunks:
if metadata:
chunk.metadata.update(metadata)
chunk.metadata['doc_id'] = doc_id
# Add to vector store
store = self.vector_store_manager.get_store('chroma')
if store:
store.add_documents(chunks)
# Track indexed document
self.indexed_documents[doc_id] = {
'chunk_count': len(chunks),
'indexed_at': datetime.now().isoformat(),
'metadata': metadata
}
return True
except Exception as e:
print(f"Error indexing document {doc_id}: {e}")
return False
def index_file(self, file_path: str, metadata: Dict[str, Any] = None) -> bool:
"""Index a file"""
try:
chunks = self.text_processor.process_file(file_path)
# Add file-specific metadata
file_metadata = {
'file_path': file_path,
'file_name': os.path.basename(file_path),
'file_size': os.path.getsize(file_path),
'file_type': os.path.splitext(file_path)[1]
}
if metadata:
file_metadata.update(metadata)
for chunk in chunks:
chunk.metadata.update(file_metadata)
# Add to vector store
store = self.vector_store_manager.get_store('chroma')
if store:
store.add_documents(chunks)
doc_id = os.path.basename(file_path)
self.indexed_documents[doc_id] = {
'chunk_count': len(chunks),
'indexed_at': datetime.now().isoformat(),
'metadata': file_metadata
}
return True
except Exception as e:
print(f"Error indexing file {file_path}: {e}")
return False
def delete_document(self, doc_id: str) -> bool:
"""Delete a document from the index"""
try:
store = self.vector_store_manager.get_store('chroma')
if store:
# Delete by metadata filter
store.delete(where={"doc_id": doc_id})
if doc_id in self.indexed_documents:
del self.indexed_documents[doc_id]
return True
except Exception as e:
print(f"Error deleting document {doc_id}: {e}")
return False
def update_document(self, doc_id: str, new_content: str, metadata: Dict[str, Any] = None) -> bool:
"""Update an existing document"""
# Delete old version
self.delete_document(doc_id)
# Index new version
return self.index_document(new_content, doc_id, metadata)
def get_indexed_docs_info(self) -> Dict[str, Any]:
"""Get information about indexed documents"""
return self.indexed_documents
# 6. Advanced RAG Features
class AdvancedRAG:
"""Advanced RAG features including query expansion, contextual compression"""
def __init__(self, vector_store, llm, embeddings):
self.vector_store = vector_store
self.llm = llm
self.embeddings = embeddings
def query_expansion(self, original_query: str, num_expansions: int = 3) -> List[str]:
"""Expand query with related terms"""
expansion_prompt = PromptTemplate(
input_variables=["query"],
template="""Generate {num_expansions} alternative ways to phrase this search query:
Query: {query}
Alternative queries (one per line):"""
)
chain = LLMChain(llm=self.llm, prompt=expansion_prompt)
expanded = chain.run(query=original_query, num_expansions=num_expansions)
expanded_queries = [original_query] + [q.strip() for q in expanded.split('\n') if q.strip()]
return expanded_queries[:num_expansions + 1]
def multi_query_search(self, query: str, num_queries: int = 3) -> List[Document]:
"""Search using multiple query formulations"""
expanded_queries = self.query_expansion(query, num_queries)
all_docs = []
for expanded_query in expanded_queries:
docs = self.vector_store.similarity_search(expanded_query, k=2)
all_docs.extend(docs)
# Remove duplicates and re-rank
unique_docs = []
seen_content = set()
for doc in all_docs:
content_hash = hash(doc.page_content)
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_docs.append(doc)
return unique_docs[:5] # Return top 5 unique documents
def contextual_compression(self, query: str, documents: List[Document]) -> List[Document]:
"""Compress retrieved documents to focus on relevant context"""
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# Create compression chain
compressor = LLMChainExtractor.from_llm(self.llm)
# Create base retriever
base_retriever = self.vector_store.as_retriever()
# Create compression retriever
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
# Get compressed documents
compressed_docs = compression_retriever.get_relevant_documents(query)
return compressed_docs
def create_moderag_chain(self) -> RetrievalQA:
"""Create MMR (Maximal Marginal Relevance) RAG chain"""
from langchain.retrieversers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import EmbeddingsFilter
# Create retriever with MMR
retriever = self.vector_store.as_retriever(
search_type="mmr",
search_kwargs={'k': 5, 'fetch_k': 20}
)
# Add compression to focus on relevant content
compressor = EmbeddingsFilter(embeddings=self.embeddings, similarity_threshold=0.76)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)
chain = RetrievalQA.from_chain_type(
llm=self.llm,
retriever=compression_retriever,
return_source_documents=True
)
return chain
# 7. Demonstration Functions
def demo_vector_store_creation():
"""Demonstrate vector store creation and management"""
print("=== Vector Store Creation Demo ===")
embeddings = OpenAIEmbeddings()
vector_manager = VectorStoreManager(embeddings)
# Create sample documents
sample_docs = [
Document(page_content="Python is a versatile programming language used for web development, data science, and automation.",
metadata={"category": "programming", "language": "python"}),
Document(page_content="Machine learning algorithms can learn patterns from data to make predictions.",
metadata={"category": "AI", "topic": "ML"}),
Document(page_content="React is a JavaScript library for building user interfaces.",
metadata={"category": "web", "language": "javascript"}),
]
# Create FAISS store
faiss_store = vector_manager.create_faiss_store(sample_docs)
print(f"Created FAISS store with {len(sample_docs)} documents")
# Test similarity search
results = faiss_store.similarity_search("programming", k=2)
print(f"Similarity search results for 'programming':")
for doc in results:
print(f"- {doc.page_content[:50]}... (Category: {doc.metadata.get('category')})")
def demo_rag_chains():
"""Demonstrate different RAG chain implementations"""
print("\n=== RAG Chains Demo ===")
# Initialize components
embeddings = OpenAIEmbeddings()
llm = ChatOpenAI(temperature=0)
# Create vector store
sample_docs = [
Document(page_content="Artificial Intelligence (AI) is the simulation of human intelligence in machines."),
Document(page_content="Machine Learning is a subset of AI that enables systems to learn and improve from experience."),
Document(page_content="Deep Learning uses neural networks with multiple layers to process data."),
Document(page_content="Natural Language Processing allows computers to understand and generate human language."),
]
vector_manager = VectorStoreManager(embeddings)
faiss_store = vector_manager.create_faiss_store(sample_docs)
# Create RAG chain manager
rag_manager = RAGChainManager(llm, faiss_store)
# Test basic RAG
basic_chain = rag_manager.create_basic_rag()
result = basic_chain("What is the relationship between AI and Machine Learning?")
print(f"Basic RAG Result: {result['result']}")
# Test conversational RAG
conversational_chain = rag_manager.create_conversational_rag()
result2 = conversational_chain("Tell me more about Deep Learning")
print(f"Conversational RAG Result: {result2['answer']}")
def demo_document_indexing():
"""Demonstrate document indexing and management"""
print("\n=== Document Indexing Demo ===")
embeddings = OpenAIEmbeddings()
vector_manager = VectorStoreManager(embeddings)
vector_manager.create_chroma_store("demo_collection")
text_processor = TextProcessor()
indexer = DocumentIndexer(vector_manager, text_processor)
# Index sample documents
documents = {
"ai_basics": """
Artificial Intelligence (AI) refers to the simulation of human intelligence in machines
that are programmed to think and mimic human actions. The potential benefits of AI are
extensive and include increased efficiency, accuracy, and the ability to process
vast amounts of data quickly.
""",
"python_ml": """
Python has become the go-to programming language for machine learning and data science.
Libraries like TensorFlow, PyTorch, and scikit-learn make it easy to implement complex
ML algorithms. Python's simplicity and readability make it ideal for rapid prototyping.
""",
"web_development": """
Modern web development involves frontend frameworks like React, Vue, and Angular,
combined with backend technologies like Node.js, Python, or Java. Responsive design
and performance optimization are crucial for user experience.
"""
}
# Index documents
for doc_id, content in documents.items():
success = indexer.index_document(content, doc_id, {
"category": "technology",
"indexed_by": "demo_script"
})
print(f"Indexed {doc_id}: {'✓' if success else '✗'}")
# Get indexing info
info = indexer.get_indexed_docs_info()
print(f"\nIndexed Documents Info: {json.dumps(info, indent=2)}")
def demo_advanced_rag():
"""Demonstrate advanced RAG features"""
print("\n=== Advanced RAG Features Demo ===")
embeddings = OpenAIEmbeddings()
llm = ChatOpenAI(temperature=0)
# Create sample documents
docs = [
Document(page_content="The Renaissance was a period of cultural rebirth in Europe from the 14th to 17th centuries.", metadata={"period": "renaissance"}),
Document(page_content="The Industrial Revolution transformed society with steam power and manufacturing.", metadata={"period": "industrial"}),
Document(page_content="The Digital Revolution brought computers and internet technology to the masses.", metadata={"period": "digital"}),
]
vector_manager = VectorStoreManager(embeddings)
faiss_store = vector_manager.create_faiss_store(docs)
# Create advanced RAG
advanced_rag = AdvancedRAG(faiss_store, llm, embeddings)
# Test query expansion
print("Query Expansion Test:")
original_query = "European historical periods"
expanded_queries = advanced_rag.query_expansion(original_query)
print(f"Original: {original_query}")
print(f"Expanded: {expanded_queries}")
# Test multi-query search
print("\nMulti-Query Search Test:")
multi_results = advanced_rag.multi_query_search("technological changes in society")
for i, doc in enumerate(multi_results):
print(f"{i+1}. {doc.page_content[:80]}...")
# Test MMR chain
print("\nMMR Chain Test:")
mmr_chain = advanced_rag.create_moderag_chain()
mmr_result = mmr_chain("How did different revolutions impact society?")
print(f"MMR Result: {mmr_result['result']}")
# Main execution
if __name__ == "__main__":
# Set environment variables
os.environ.setdefault("OPENAI_API_KEY", "your-openai-api-key")
os.environ.setdefault("PINECONE_API_KEY", "your-pinecone-api-key")
os.environ.setdefault("PINECONE_ENVIRONMENT", "your-pinecone-environment")
try:
demo_vector_store_creation()
demo_rag_chains()
demo_document_indexing()
demo_advanced_rag()
except Exception as e:
print(f"Demo error: {e}")
print("Make sure to set your API keys in environment variables")