🎯 empfohlene Sammlungen
Balanced sample collections from various categories for you to explore
Vektor-Datenbank Beispiele
Umfassende Vektor-Datenbank Beispiele einschließlich Chroma, Pinecone, Weaviate, FAISS und benutzerdefinierte Vektorlösungen
💻 Chroma Vektor-Datenbank python
🟡 intermediate
⭐⭐⭐⭐
Vollständige Chroma Vektor-Datenbank mit Collections, Metadaten-Filterung, Ähnlichkeitssuche und Produktionsfunktionen
⏱️ 40 min
🏷️ chroma, vector-database, embeddings, search
Prerequisites:
Python, ChromaDB, sentence-transformers, numpy
# Chroma Vector Database Implementation
# Complete semantic search and document retrieval system
import os
import json
import time
import uuid
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, asdict
from datetime import datetime
import hashlib
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
import numpy as np
from sentence_transformers import SentenceTransformer
import logging
from pathlib import Path
import pickle
import threading
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 1. Configuration and Setup
@dataclass
class ChromaConfig:
"""Configuration for Chroma vector database"""
persist_directory: str = "./chroma_db"
collection_name: str = "default"
embedding_model: str = "all-MiniLM-L6-v2"
distance_metric: str = "cosine" # cosine, euclidean, manhattan
max_results: int = 10
batch_size: int = 100
auto_persist: bool = True
enable_gpu: bool = False
class EmbeddingManager:
"""Advanced embedding management with multiple models"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = "cpu"):
self.model_name = model_name
self.device = device
self.model = None
self.embedding_cache = {}
self.cache_hits = 0
self.cache_misses = 0
self._load_model()
def _load_model(self):
"""Load the embedding model"""
try:
self.model = SentenceTransformer(self.model_name, device=self.device)
logger.info(f"Loaded embedding model: {self.model_name}")
except Exception as e:
logger.error(f"Failed to load model {self.model_name}: {e}")
raise
def get_embedding(self, text: str, use_cache: bool = True) -> List[float]:
"""Get embedding for a single text"""
if not text or not text.strip():
return [0.0] * 384 # Default dimension for MiniLM
# Check cache first
cache_key = hashlib.md5(text.encode()).hexdigest()
if use_cache and cache_key in self.embedding_cache:
self.cache_hits += 1
return self.embedding_cache[cache_key]
self.cache_misses += 1
try:
embedding = self.model.encode(text, convert_to_numpy=True)
embedding_list = embedding.tolist()
# Cache the result
if use_cache:
self.embedding_cache[cache_key] = embedding_list
# Limit cache size
if len(self.embedding_cache) > 10000:
# Remove oldest 1000 entries
keys_to_remove = list(self.embedding_cache.keys())[:1000]
for key in keys_to_remove:
del self.embedding_cache[key]
return embedding_list
except Exception as e:
logger.error(f"Error generating embedding: {e}")
return [0.0] * 384
def get_batch_embeddings(self, texts: List[str], show_progress: bool = False) -> List[List[float]]:
"""Get embeddings for multiple texts efficiently"""
if not texts:
return []
# Check cache for each text
uncached_texts = []
uncached_indices = []
cached_embeddings = [[] for _ in texts]
for i, text in enumerate(texts):
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self.embedding_cache:
cached_embeddings[i] = self.embedding_cache[cache_key]
self.cache_hits += 1
else:
uncached_texts.append(text)
uncached_indices.append(i)
self.cache_misses += 1
# Generate embeddings for uncached texts
if uncached_texts:
try:
if show_progress:
logger.info(f"Generating embeddings for {len(uncached_texts)} uncached texts")
new_embeddings = self.model.encode(
uncached_texts,
convert_to_numpy=True,
show_progress_bar=show_progress
)
for i, embedding in enumerate(new_embeddings):
embedding_list = embedding.tolist()
original_index = uncached_indices[i]
cached_embeddings[original_index] = embedding_list
# Cache the result
cache_key = hashlib.md5(uncached_texts[i].encode()).hexdigest()
self.embedding_cache[cache_key] = embedding_list
except Exception as e:
logger.error(f"Error generating batch embeddings: {e}")
# Fill with zeros
for i, original_index in enumerate(uncached_indices):
cached_embeddings[original_index] = [0.0] * 384
return cached_embeddings
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
total_requests = self.cache_hits + self.cache_misses
hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0
return {
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"hit_rate_percent": round(hit_rate, 2),
"cache_size": len(self.embedding_cache)
}
# 2. Enhanced Chroma Client
class ChromaVectorDB:
"""Enhanced Chroma vector database with advanced features"""
def __init__(self, config: ChromaConfig):
self.config = config
self.client = None
self.collection = None
self.embedding_manager = EmbeddingManager(
config.embedding_model,
"cuda" if config.enable_gpu else "cpu"
)
self.document_stats = {
"total_documents": 0,
"total_embeddings": 0,
"last_updated": None
}
self._initialize_client()
self._load_or_create_collection()
def _initialize_client(self):
"""Initialize Chroma client"""
try:
# Create persist directory if it doesn't exist
Path(self.config.persist_directory).mkdir(parents=True, exist_ok=True)
self.client = chromadb.PersistentClient(
path=self.config.persist_directory,
settings=Settings(
anonymized_telemetry=False,
allow_reset=False
)
)
logger.info(f"Chroma client initialized at {self.config.persist_directory}")
except Exception as e:
logger.error(f"Failed to initialize Chroma client: {e}")
raise
def _load_or_create_collection(self):
"""Load existing collection or create new one"""
try:
# Try to get existing collection
self.collection = self.client.get_collection(
name=self.config.collection_name
)
logger.info(f"Loaded existing collection: {self.config.collection_name}")
# Update stats
self._update_document_stats()
except Exception:
# Create new collection with embedding function
self.collection = self.client.create_collection(
name=self.config.collection_name,
embedding_function=None, # We'll provide our own embeddings
metadata={
"created_at": datetime.now().isoformat(),
"embedding_model": self.config.embedding_model,
"distance_metric": self.config.distance_metric
}
)
logger.info(f"Created new collection: {self.config.collection_name}")
def add_documents(
self,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None,
batch_size: Optional[int] = None
) -> Dict[str, Any]:
"""Add documents to the collection with advanced features"""
if not documents:
return {"success": False, "error": "No documents provided"}
batch_size = batch_size or self.config.batch_size
start_time = time.time()
try:
# Generate IDs if not provided
if ids is None:
ids = [str(uuid.uuid4()) for _ in documents]
# Prepare metadata
if metadatas is None:
metadatas = [{} for _ in documents]
elif len(metadatas) != len(documents):
# Pad metadata
metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]
# Add timestamps and other metadata
current_time = datetime.now().isoformat()
for i in range(len(metadatas)):
metadatas[i].update({
"created_at": current_time,
"length": len(documents[i]),
"word_count": len(documents[i].split()),
"char_count": len(documents[i])
})
# Process in batches
all_ids = []
total_batches = (len(documents) + batch_size - 1) // batch_size
for batch_idx in range(total_batches):
start_idx = batch_idx * batch_size
end_idx = min(start_idx + batch_size, len(documents))
batch_docs = documents[start_idx:end_idx]
batch_metas = metadatas[start_idx:end_idx]
batch_ids = ids[start_idx:end_idx]
# Generate embeddings
embeddings = self.embedding_manager.get_batch_embeddings(
batch_docs,
show_progress=(batch_idx == 0 and len(documents) > batch_size)
)
# Add to Chroma
self.collection.add(
embeddings=embeddings,
documents=batch_docs,
metadatas=batch_metas,
ids=batch_ids
)
all_ids.extend(batch_ids)
logger.info(f"Processed batch {batch_idx + 1}/{total_batches}")
# Small delay to prevent overwhelming
if batch_idx < total_batches - 1:
time.sleep(0.1)
# Persist if enabled
if self.config.auto_persist:
self.persist()
# Update stats
self._update_document_stats()
processing_time = time.time() - start_time
return {
"success": True,
"ids": all_ids,
"documents_added": len(documents),
"processing_time": processing_time,
"avg_time_per_document": processing_time / len(documents)
}
except Exception as e:
logger.error(f"Error adding documents: {e}")
return {
"success": False,
"error": str(e),
"documents_attempted": len(documents)
}
def search(
self,
query: str,
n_results: Optional[int] = None,
where: Optional[Dict[str, Any]] = None,
where_document: Optional[Dict[str, Any]] = None,
include: List[str] = None,
include_distances: bool = True
) -> Dict[str, Any]:
"""Advanced search with filtering and multiple result formats"""
start_time = time.time()
n_results = n_results or self.config.max_results
if include is None:
include = ["metadatas", "documents", "distances"] if include_distances else ["metadatas", "documents"]
try:
# Get query embedding
query_embedding = self.embedding_manager.get_embedding(query)
# Perform search
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=where,
where_document=where_document,
include=include
)
# Process results
processed_results = {
"query": query,
"n_results": n_results,
"processing_time": time.time() - start_time,
"total_found": len(results["ids"][0]) if "ids" in results else 0
}
# Add requested result types
if "documents" in results:
processed_results["documents"] = results["documents"][0]
if "metadatas" in results:
processed_results["metadatas"] = results["metadatas"][0]
if "ids" in results:
processed_results["ids"] = results["ids"][0]
if "distances" in results:
processed_results["distances"] = results["distances"][0]
# Convert distances to similarities (cosine similarity = 1 - cosine distance)
similarities = [1 - dist for dist in results["distances"][0]]
processed_results["similarities"] = similarities
return processed_results
except Exception as e:
logger.error(f"Error during search: {e}")
return {
"query": query,
"error": str(e),
"processing_time": time.time() - start_time
}
def get_similar_documents(
self,
document_id: str,
n_results: int = 5
) -> Dict[str, Any]:
"""Find documents similar to a given document"""
try:
# Get the document
doc_result = self.collection.get(ids=[document_id])
if not doc_result["ids"]:
return {"error": f"Document with ID {document_id} not found"}
# Use the document content as query
document_text = doc_result["documents"][0]
return self.search(document_text, n_results=n_results)
except Exception as e:
logger.error(f"Error finding similar documents: {e}")
return {"error": str(e)}
def update_document(
self,
document_id: str,
new_text: Optional[str] = None,
new_metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Update an existing document"""
try:
# Get current document
current_result = self.collection.get(ids=[document_id])
if not current_result["ids"]:
return {"error": f"Document with ID {document_id} not found"}
# Prepare updates
updated_text = new_text or current_result["documents"][0]
updated_metadata = current_result["metadatas"][0].copy()
if new_metadata:
updated_metadata.update(new_metadata)
# Add updated timestamp
updated_metadata["updated_at"] = datetime.now().isoformat()
updated_metadata["length"] = len(updated_text)
updated_metadata["word_count"] = len(updated_text.split())
# Delete old document
self.collection.delete(ids=[document_id])
# Add updated document
new_embedding = self.embedding_manager.get_embedding(updated_text)
self.collection.add(
embeddings=[new_embedding],
documents=[updated_text],
metadatas=[updated_metadata],
ids=[document_id]
)
if self.config.auto_persist:
self.persist()
self._update_document_stats()
return {
"success": True,
"document_id": document_id,
"updated_at": updated_metadata["updated_at"]
}
except Exception as e:
logger.error(f"Error updating document: {e}")
return {"error": str(e)}
def delete_documents(
self,
ids: Optional[List[str]] = None,
where: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Delete documents by ID or metadata filter"""
try:
if ids:
# Delete by IDs
deleted_count = len(ids)
self.collection.delete(ids=ids)
elif where:
# Get matching IDs first
matching_docs = self.collection.get(where=where)
if matching_docs["ids"]:
self.collection.delete(ids=matching_docs["ids"])
deleted_count = len(matching_docs["ids"])
else:
deleted_count = 0
else:
return {"error": "Either ids or where must be provided"}
if self.config.auto_persist:
self.persist()
self._update_document_stats()
return {
"success": True,
"deleted_count": deleted_count
}
except Exception as e:
logger.error(f"Error deleting documents: {e}")
return {"error": str(e)}
def get_collection_stats(self) -> Dict[str, Any]:
"""Get comprehensive collection statistics"""
try:
# Basic stats
count = self.collection.count()
# Sample some documents to analyze
sample_docs = self.collection.get(limit=min(100, count))
# Calculate statistics
doc_lengths = [len(doc) for doc in sample_docs.get("documents", [])]
word_counts = [len(doc.split()) for doc in sample_docs.get("documents", [])]
# Metadata analysis
all_metadata = sample_docs.get("metadatas", [])
metadata_keys = set()
for meta in all_metadata:
metadata_keys.update(meta.keys())
# Embedding cache stats
cache_stats = self.embedding_manager.get_cache_stats()
stats = {
"collection_name": self.config.collection_name,
"total_documents": count,
"embedding_model": self.config.embedding_model,
"distance_metric": self.config.distance_metric,
"sample_stats": {
"avg_doc_length": np.mean(doc_lengths) if doc_lengths else 0,
"avg_word_count": np.mean(word_counts) if word_counts else 0,
"min_doc_length": min(doc_lengths) if doc_lengths else 0,
"max_doc_length": max(doc_lengths) if doc_lengths else 0
},
"metadata_keys": list(metadata_keys),
"embedding_cache": cache_stats,
"document_stats": self.document_stats
}
return stats
except Exception as e:
logger.error(f"Error getting collection stats: {e}")
return {"error": str(e)}
def _update_document_stats(self):
"""Update internal document statistics"""
try:
self.document_stats["total_documents"] = self.collection.count()
self.document_stats["total_embeddings"] = self.document_stats["total_documents"]
self.document_stats["last_updated"] = datetime.now().isoformat()
except Exception as e:
logger.error(f"Error updating stats: {e}")
def persist(self):
"""Persist the collection to disk"""
try:
# Chroma automatically persists with PersistentClient
# This is more of a consistency check
logger.info("Collection persisted to disk")
except Exception as e:
logger.error(f"Error persisting collection: {e}")
def reset(self) -> Dict[str, Any]:
"""Reset the entire collection (use with caution!)"""
try:
# Delete the collection
self.client.delete_collection(name=self.config.collection_name)
# Recreate it
self._load_or_create_collection()
# Reset stats
self.document_stats = {
"total_documents": 0,
"total_embeddings": 0,
"last_updated": None
}
return {"success": True, "message": "Collection reset successfully"}
except Exception as e:
logger.error(f"Error resetting collection: {e}")
return {"error": str(e)}
# 3. Advanced Query Operations
class AdvancedQueryOperations:
"""Advanced query operations and semantic search techniques"""
def __init__(self, vector_db: ChromaVectorDB):
self.vector_db = vector_db
def hybrid_search(
self,
query: str,
semantic_weight: float = 0.7,
keyword_weight: float = 0.3,
n_results: int = 10,
keywords: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Hybrid search combining semantic and keyword matching"""
try:
# Semantic search
semantic_results = self.vector_db.search(query, n_results=n_results * 2)
if not semantic_results.get("documents"):
return {"error": "No semantic search results found"}
# Process semantic results
processed_results = []
for i, doc in enumerate(semantic_results["documents"]):
score = semantic_results["similarities"][i] if "similarities" in semantic_results else 1.0
# Calculate keyword score
keyword_score = 0.0
if keywords:
doc_lower = doc.lower()
keyword_matches = sum(1 for kw in keywords if kw.lower() in doc_lower)
keyword_score = keyword_matches / len(keywords)
# Combine scores
combined_score = (semantic_weight * score + keyword_weight * keyword_score)
processed_results.append({
"document": doc,
"metadata": semantic_results["metadatas"][i],
"id": semantic_results["ids"][i],
"semantic_score": score,
"keyword_score": keyword_score,
"combined_score": combined_score
})
# Sort by combined score
processed_results.sort(key=lambda x: x["combined_score"], reverse=True)
return {
"query": query,
"results": processed_results[:n_results],
"search_weights": {
"semantic": semantic_weight,
"keyword": keyword_weight
}
}
except Exception as e:
logger.error(f"Error in hybrid search: {e}")
return {"error": str(e)}
def multi_query_search(
self,
queries: List[str],
strategy: str = "average", # average, best, union
n_results: int = 10
) -> Dict[str, Any]:
"""Search with multiple queries and combine results"""
try:
all_results = {}
# Perform individual searches
for query in queries:
results = self.vector_db.search(query, n_results=n_results)
if "documents" in results:
all_results[query] = results
# Combine results based on strategy
if strategy == "average":
return self._average_combine_results(all_results, n_results)
elif strategy == "best":
return self._best_combine_results(all_results, n_results)
elif strategy == "union":
return self._union_combine_results(all_results, n_results)
else:
return {"error": f"Unknown strategy: {strategy}"}
except Exception as e:
logger.error(f"Error in multi-query search: {e}")
return {"error": str(e)}
def _average_combine_results(self, all_results: Dict, n_results: int) -> Dict[str, Any]:
"""Combine results by averaging similarity scores"""
doc_scores = {}
doc_data = {}
# Collect all scores
for query, results in all_results.items():
if "documents" not in results:
continue
for i, doc in enumerate(results["documents"]):
doc_id = results["ids"][i]
similarity = results["similarities"][i] if "similarities" in results else 1.0
if doc_id not in doc_scores:
doc_scores[doc_id] = []
doc_data[doc_id] = {
"document": doc,
"metadata": results["metadatas"][i],
"id": doc_id
}
doc_scores[doc_id].append(similarity)
# Calculate average scores
final_results = []
for doc_id, scores in doc_scores.items():
avg_score = sum(scores) / len(scores)
final_results.append({
**doc_data[doc_id],
"average_score": avg_score,
"score_count": len(scores)
})
# Sort and return top results
final_results.sort(key=lambda x: x["average_score"], reverse=True)
return {
"queries": list(all_results.keys()),
"strategy": "average",
"results": final_results[:n_results]
}
def _best_combine_results(self, all_results: Dict, n_results: int) -> Dict[str, Any]:
"""Combine results by taking best score per document"""
doc_best_score = {}
doc_data = {}
for query, results in all_results.items():
if "documents" not in results:
continue
for i, doc in enumerate(results["documents"]):
doc_id = results["ids"][i]
similarity = results["similarities"][i] if "similarities" in results else 1.0
if doc_id not in doc_best_score or similarity > doc_best_score[doc_id]:
doc_best_score[doc_id] = similarity
doc_data[doc_id] = {
"document": doc,
"metadata": results["metadatas"][i],
"id": doc_id,
"best_query": query,
"best_score": similarity
}
# Sort and return top results
final_results = []
for doc_id, score in doc_best_score.items():
final_results.append({
**doc_data[doc_id],
"best_score": score
})
final_results.sort(key=lambda x: x["best_score"], reverse=True)
return {
"queries": list(all_results.keys()),
"strategy": "best",
"results": final_results[:n_results]
}
def _union_combine_results(self, all_results: Dict, n_results: int) -> Dict[str, Any]:
"""Combine all results without deduplication"""
all_docs = []
for query, results in all_results.items():
if "documents" not in results:
continue
for i, doc in enumerate(results["documents"]):
similarity = results["similarities"][i] if "similarities" in results else 1.0
all_docs.append({
"document": doc,
"metadata": results["metadatas"][i],
"id": results["ids"][i],
"source_query": query,
"similarity": similarity
})
# Sort and return top results
all_docs.sort(key=lambda x: x["similarity"], reverse=True)
return {
"queries": list(all_results.keys()),
"strategy": "union",
"results": all_docs[:n_results]
}
# 4. Document Management and Analytics
class DocumentManager:
"""Advanced document management and analytics"""
def __init__(self, vector_db: ChromaVectorDB):
self.vector_db = vector_db
self.analytics = DocumentAnalytics(vector_db)
def batch_import_from_directory(
self,
directory_path: str,
file_types: List[str] = None,
recursive: bool = True,
batch_size: int = 100
) -> Dict[str, Any]:
"""Batch import documents from directory"""
if file_types is None:
file_types = [".txt", ".md", ".py", ".js", ".html", ".css"]
try:
from pathlib import Path
directory = Path(directory_path)
if not directory.exists():
return {"error": f"Directory not found: {directory_path}"}
# Find all files
files_found = []
if recursive:
for file_type in file_types:
files_found.extend(directory.rglob(f"*{file_type}"))
else:
for file_type in file_types:
files_found.extend(directory.glob(f"*{file_type}"))
if not files_found:
return {"message": "No files found matching the criteria"}
# Process files
documents = []
metadatas = []
for file_path in files_found:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
if content.strip():
documents.append(content)
metadatas.append({
"file_path": str(file_path),
"file_name": file_path.name,
"file_type": file_path.suffix,
"file_size": file_path.stat().st_size,
"imported_at": datetime.now().isoformat()
})
except Exception as e:
logger.warning(f"Could not read file {file_path}: {e}")
if not documents:
return {"message": "No readable content found in files"}
# Add to vector database
import_result = self.vector_db.add_documents(
documents,
metadatas,
batch_size=batch_size
)
import_result.update({
"files_scanned": len(files_found),
"files_imported": len(documents),
"directory": directory_path
})
return import_result
except Exception as e:
logger.error(f"Error in batch import: {e}")
return {"error": str(e)}
def export_collection(
self,
output_path: str,
include_embeddings: bool = False
) -> Dict[str, Any]:
"""Export entire collection to file"""
try:
# Get all documents
all_docs = self.vector_db.collection.get()
export_data = {
"metadata": {
"collection_name": self.vector_db.config.collection_name,
"exported_at": datetime.now().isoformat(),
"total_documents": len(all_docs["ids"]),
"include_embeddings": include_embeddings
},
"documents": []
}
# Process each document
for i in range(len(all_docs["ids"])):
doc_data = {
"id": all_docs["ids"][i],
"content": all_docs["documents"][i],
"metadata": all_docs["metadatas"][i]
}
if include_embeddings:
# Get embedding for this document
embedding = self.vector_db.embedding_manager.get_embedding(
all_docs["documents"][i],
use_cache=True
)
doc_data["embedding"] = embedding
export_data["documents"].append(doc_data)
# Save to file
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
return {
"success": True,
"output_path": str(output_file),
"documents_exported": len(export_data["documents"]),
"file_size_mb": output_file.stat().st_size / (1024 * 1024)
}
except Exception as e:
logger.error(f"Error exporting collection: {e}")
return {"error": str(e)}
class DocumentAnalytics:
"""Analytics and insights for document collections"""
def __init__(self, vector_db: ChromaVectorDB):
self.vector_db = vector_db
def generate_insights(self) -> Dict[str, Any]:
"""Generate comprehensive analytics insights"""
try:
# Get collection stats
stats = self.vector_db.get_collection_stats()
# Get sample documents for analysis
sample_size = min(1000, stats.get("total_documents", 0))
sample_docs = self.vector_db.collection.get(limit=sample_size)
insights = {
"collection_overview": stats,
"content_analysis": self._analyze_content(sample_docs),
"metadata_analysis": self._analyze_metadata(sample_docs),
"performance_metrics": self._analyze_performance(),
"recommendations": self._generate_recommendations(stats)
}
return insights
except Exception as e:
logger.error(f"Error generating insights: {e}")
return {"error": str(e)}
def _analyze_content(self, sample_docs: Dict) -> Dict[str, Any]:
"""Analyze document content"""
try:
documents = sample_docs.get("documents", [])
if not documents:
return {"message": "No documents to analyze"}
# Content statistics
lengths = [len(doc) for doc in documents]
word_counts = [len(doc.split()) for doc in documents]
# Most common words (simple analysis)
all_words = []
for doc in documents:
all_words.extend(doc.lower().split())
word_freq = {}
for word in all_words:
if len(word) > 3: # Ignore very short words
word_freq[word] = word_freq.get(word, 0) + 1
top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:20]
return {
"document_count": len(documents),
"content_stats": {
"avg_length": sum(lengths) / len(lengths),
"median_length": sorted(lengths)[len(lengths) // 2],
"min_length": min(lengths),
"max_length": max(lengths),
"total_characters": sum(lengths)
},
"word_stats": {
"avg_words_per_doc": sum(word_counts) / len(word_counts),
"median_words_per_doc": sorted(word_counts)[len(word_counts) // 2],
"min_words_per_doc": min(word_counts),
"max_words_per_doc": max(word_counts),
"total_words": sum(word_counts)
},
"top_words": top_words
}
except Exception as e:
logger.error(f"Error analyzing content: {e}")
return {"error": str(e)}
def _analyze_metadata(self, sample_docs: Dict) -> Dict[str, Any]:
"""Analyze document metadata"""
try:
metadatas = sample_docs.get("metadatas", [])
if not metadatas:
return {"message": "No metadata to analyze"}
# Analyze metadata fields
all_keys = set()
for meta in metadatas:
all_keys.update(meta.keys())
# Analyze value types and patterns
field_analysis = {}
for key in all_keys:
values = [meta.get(key) for meta in metadatas if key in meta]
non_null_values = [v for v in values if v is not None]
field_analysis[key] = {
"present_in_percent": (len(non_null_values) / len(metadatas)) * 100,
"unique_values": len(set(str(v) for v in non_null_values)),
"sample_values": list(set(str(v) for v in non_null_values[:5]))
}
# Try to determine field type
if all(isinstance(v, str) for v in non_null_values if v is not None):
field_analysis[key]["type"] = "string"
elif all(isinstance(v, (int, float)) for v in non_null_values if v is not None):
field_analysis[key]["type"] = "numeric"
elif all(isinstance(v, bool) for v in non_null_values if v is not None):
field_analysis[key]["type"] = "boolean"
elif all(isinstance(v, list) for v in non_null_values if v is not None):
field_analysis[key]["type"] = "list"
else:
field_analysis[key]["type"] = "mixed"
return {
"total_fields": len(all_keys),
"field_names": list(all_keys),
"field_analysis": field_analysis
}
except Exception as e:
logger.error(f"Error analyzing metadata: {e}")
return {"error": str(e)}
def _analyze_performance(self) -> Dict[str, Any]:
"""Analyze performance metrics"""
try:
cache_stats = self.vector_db.embedding_manager.get_cache_stats()
return {
"embedding_cache": cache_stats,
"collection_size_mb": self._estimate_collection_size()
}
except Exception as e:
logger.error(f"Error analyzing performance: {e}")
return {"error": str(e)}
def _estimate_collection_size(self) -> float:
"""Estimate collection size in MB"""
try:
# This is a rough estimation
doc_count = self.vector_db.collection.count()
# Assume average document size of 2KB + metadata + embedding (384 * 4 bytes for float32)
estimated_size_bytes = doc_count * (2048 + 1536)
return estimated_size_bytes / (1024 * 1024) # Convert to MB
except Exception as e:
logger.error(f"Error estimating collection size: {e}")
return 0.0
def _generate_recommendations(self, stats: Dict) -> List[str]:
"""Generate optimization recommendations"""
recommendations = []
doc_count = stats.get("total_documents", 0)
if doc_count > 100000:
recommendations.append("Consider sharding the collection for better performance")
if doc_count > 10000:
recommendations.append("Implement pagination for large result sets")
cache_hit_rate = stats.get("embedding_cache", {}).get("hit_rate_percent", 0)
if cache_hit_rate < 50:
recommendations.append("Consider implementing query result caching for frequently searched content")
# Check for missing metadata
if "sample_stats" in stats:
avg_doc_length = stats["sample_stats"].get("avg_doc_length", 0)
if avg_doc_length > 5000:
recommendations.append("Consider text chunking for very long documents to improve search precision")
if not recommendations:
recommendations.append("Collection appears to be well-optimized")
return recommendations
# 5. Demonstration Functions
def demo_basic_chroma_usage():
"""Demonstrate basic Chroma usage"""
print("=== Basic Chroma Usage Demo ===")
# Configuration
config = ChromaConfig(
persist_directory="./demo_chroma_db",
collection_name="demo_collection",
embedding_model="all-MiniLM-L6-v2"
)
# Initialize vector database
vector_db = ChromaVectorDB(config)
# Sample documents
documents = [
"Python is a high-level programming language known for its simplicity and readability.",
"Machine learning algorithms can automatically learn from data without being explicitly programmed.",
"React is a JavaScript library for building user interfaces, particularly web applications.",
"Artificial intelligence aims to create machines that can perform tasks requiring human intelligence.",
"Data science combines statistics, mathematics, and computer science to extract insights from data."
]
# Add documents
result = vector_db.add_documents(documents)
print(f"Added {result['documents_added']} documents successfully")
# Search
search_result = vector_db.search("programming languages", n_results=3)
print(f"\nSearch results for 'programming languages':")
for i, doc in enumerate(search_result.get("documents", [])):
similarity = search_result.get("similarities", [])[i]
print(f"{i+1}. {doc[:100]}... (similarity: {similarity:.3f})")
# Get collection stats
stats = vector_db.get_collection_stats()
print(f"\nCollection stats:")
print(f"- Total documents: {stats['total_documents']}")
print(f"- Embedding model: {stats['embedding_model']}")
def demo_advanced_search():
"""Demonstrate advanced search operations"""
print("\n=== Advanced Search Demo ===")
config = ChromaConfig(
persist_directory="./demo_chroma_db",
collection_name="advanced_demo"
)
vector_db = ChromaVectorDB(config)
advanced_ops = AdvancedQueryOperations(vector_db)
# Add sample documents with metadata
documents = [
"Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience.",
"Deep learning uses neural networks with multiple layers to progressively extract higher-level features from raw input.",
"Natural language processing allows computers to understand, interpret, and generate human language.",
"Computer vision enables machines to interpret and make decisions based on visual data from the world.",
"Reinforcement learning trains agents to make sequences of decisions in an environment to maximize reward."
]
metadatas = [
{"category": "ML", "difficulty": "intermediate", "tags": ["AI", "learning"]},
{"category": "DL", "difficulty": "advanced", "tags": ["AI", "neural"]},
{"category": "NLP", "difficulty": "intermediate", "tags": ["AI", "language"]},
{"category": "CV", "difficulty": "advanced", "tags": ["AI", "vision"]},
{"category": "RL", "difficulty": "advanced", "tags": ["AI", "decision"]}
]
vector_db.add_documents(documents, metadatas)
# Hybrid search
hybrid_result = advanced_ops.hybrid_search(
"artificial intelligence",
keywords=["learning", "systems"],
n_results=5
)
print("Hybrid search results:")
for result in hybrid_result["results"]:
print(f"- Combined score: {result['combined_score']:.3f}")
print(f" Semantic: {result['semantic_score']:.3f}, Keyword: {result['keyword_score']:.3f}")
print(f" {result['document'][:100]}...\n")
# Multi-query search
multi_result = advanced_ops.multi_query_search(
["machine learning", "neural networks", "computer algorithms"],
strategy="average"
)
print("Multi-query search results (average strategy):")
for result in multi_result["results"][:3]:
print(f"- Score: {result['average_score']:.3f}")
print(f" {result['document'][:100]}...\n")
def demo_document_management():
"""Demonstrate document management features"""
print("\n=== Document Management Demo ===")
config = ChromaConfig(
persist_directory="./demo_chroma_db",
collection_name="management_demo"
)
vector_db = ChromaVectorDB(config)
doc_manager = DocumentManager(vector_db)
# Add some documents
documents = [
"Initial document about web development.",
"Another document about database systems."
]
result = vector_db.add_documents(documents)
doc_ids = result["ids"]
print(f"Added documents with IDs: {doc_ids}")
# Update first document
update_result = vector_db.update_document(
doc_ids[0],
new_text="Updated document about modern web development frameworks including React, Vue, and Angular.",
new_metadata={"category": "web", "updated": True}
)
print(f"Document update result: {update_result}")
# Search similar documents
similar_result = vector_db.get_similar_documents(doc_ids[0])
print(f"Similar documents to updated doc:")
for doc in similar_result.get("documents", [])[:3]:
print(f"- {doc[:100]}...")
# Get analytics
analytics = DocumentAnalytics(vector_db)
insights = analytics.generate_insights()
print(f"\nAnalytics insights:")
print(f"- Total documents: {insights['collection_overview']['total_documents']}")
print(f"- Recommendations: {insights['recommendations']}")
def demo_batch_import():
"""Demonstrate batch import functionality"""
print("\n=== Batch Import Demo ===")
config = ChromaConfig(
persist_directory="./demo_chroma_db",
collection_name="batch_import_demo"
)
vector_db = ChromaVectorDB(config)
doc_manager = DocumentManager(vector_db)
# Create sample directory with files
import tempfile
import os
with tempfile.TemporaryDirectory() as temp_dir:
# Create sample files
sample_files = {
"python_basics.txt": "Python is a versatile programming language used for web development, data science, AI, and automation.",
"javascript_intro.md": "JavaScript is the language of the web, enabling interactive and dynamic web applications.",
"data_science.py": "Data science combines statistics, programming, and domain knowledge to extract insights from data."
}
for filename, content in sample_files.items():
file_path = os.path.join(temp_dir, filename)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
# Import from directory
import_result = doc_manager.batch_import_from_directory(
temp_dir,
file_types=[".txt", ".md", ".py"]
)
print(f"Batch import result:")
print(f"- Files scanned: {import_result['files_scanned']}")
print(f"- Files imported: {import_result['files_imported']}")
print(f"- Documents added: {import_result.get('documents_added', 0)}")
# Search the imported content
search_result = vector_db.search("programming language", n_results=3)
print(f"\nSearch results:")
for doc in search_result.get("documents", []):
print(f"- {doc}")
# Main execution
if __name__ == "__main__":
try:
demo_basic_chroma_usage()
demo_advanced_search()
demo_document_management()
demo_batch_import()
except Exception as e:
print(f"Demo error: {e}")
logger.exception("Demo failed")
💻 Pinecone und Weaviate Vektor-Datenbanken python
🔴 complex
⭐⭐⭐⭐⭐
Cloud-native Vektor-Datenbank-Implementationen für semantische Suche und KI-Anwendungen im Produktionsmaßstab
⏱️ 50 min
🏷️ pinecone, weaviate, vector-database, cloud
Prerequisites:
Python, Pinecone API, Weaviate, sentence-transformers
# Pinecone and Weaviate Vector Database Implementation
# Production-ready cloud-native vector databases for semantic search
import os
import json
import time
import uuid
import hashlib
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
import asyncio
import aiohttp
import requests
from sentence_transformers import SentenceTransformer
import logging
from pathlib import Path
import pickle
import threading
from concurrent.futures import ThreadPoolExecutor
import backoff
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 1. Configuration Classes
@dataclass
class PineconeConfig:
"""Configuration for Pinecone vector database"""
api_key: str
environment: str = "us-west1-gcp-free"
project_name: str = "default"
index_name: str = "semantic-search"
dimension: int = 1536 # OpenAI embedding dimension
metric: str = "cosine" # cosine, euclidean, dotproduct
pods: int = 1
replicas: int = 1
pod_type: str = "p1.x1"
metadata_config: Optional[Dict[str, Any]] = None
@dataclass
class WeaviateConfig:
"""Configuration for Weaviate vector database"""
url: str = "http://localhost:8080"
api_key: Optional[str] = None
batch_size: int = 100
timeout: int = 30
grpc_port: int = 50051
class_name: str = "Document"
vectorizer: str = "none" # We'll provide our own embeddings
class EmbeddingManager:
"""Thread-safe embedding management with caching and batch processing"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = "cpu"):
self.model_name = model_name
self.device = device
self.model = None
self.embedding_cache = {}
self.cache_lock = threading.Lock()
self.cache_hits = 0
self.cache_misses = 0
self.batch_size = 32
self._load_model()
def _load_model(self):
"""Load the embedding model"""
try:
self.model = SentenceTransformer(self.model_name, device=self.device)
logger.info(f"Loaded embedding model: {self.model_name}")
except Exception as e:
logger.error(f"Failed to load model {self.model_name}: {e}")
raise
def get_embedding(self, text: str, use_cache: bool = True) -> List[float]:
"""Get embedding for a single text (thread-safe)"""
if not text or not text.strip():
return [0.0] * self.model.get_sentence_embedding_dimension()
# Check cache
if use_cache:
cache_key = hashlib.md5(text.encode()).hexdigest()
with self.cache_lock:
if cache_key in self.embedding_cache:
self.cache_hits += 1
return self.embedding_cache[cache_key]
self.cache_misses += 1
try:
embedding = self.model.encode(text, convert_to_numpy=True)
embedding_list = embedding.tolist()
# Cache result
if use_cache:
with self.cache_lock:
cache_key = hashlib.md5(text.encode()).hexdigest()
self.embedding_cache[cache_key] = embedding_list
# Limit cache size
if len(self.embedding_cache) > 10000:
keys_to_remove = list(self.embedding_cache.keys())[:1000]
for key in keys_to_remove:
del self.embedding_cache[key]
return embedding_list
except Exception as e:
logger.error(f"Error generating embedding: {e}")
dim = self.model.get_sentence_embedding_dimension()
return [0.0] * dim
def get_batch_embeddings(self, texts: List[str], show_progress: bool = False) -> List[List[float]]:
"""Get embeddings for multiple texts efficiently"""
if not texts:
return []
# Check cache for each text
uncached_texts = []
uncached_indices = []
cached_embeddings = [[] for _ in texts]
with self.cache_lock:
for i, text in enumerate(texts):
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self.embedding_cache:
cached_embeddings[i] = self.embedding_cache[cache_key]
self.cache_hits += 1
else:
uncached_texts.append(text)
uncached_indices.append(i)
self.cache_misses += 1
# Generate embeddings for uncached texts in batches
if uncached_texts:
try:
all_embeddings = []
# Process in smaller batches for memory efficiency
for i in range(0, len(uncached_texts), self.batch_size):
batch_texts = uncached_texts[i:i + self.batch_size]
if show_progress:
logger.info(f"Processing batch {i//self.batch_size + 1}/{(len(uncached_texts) + self.batch_size - 1)//self.batch_size}")
batch_embeddings = self.model.encode(
batch_texts,
convert_to_numpy=True,
show_progress_bar=False
)
for embedding in batch_embeddings:
all_embeddings.append(embedding.tolist())
# Small delay to prevent overwhelming
time.sleep(0.01)
# Place embeddings back in correct positions
for i, embedding in enumerate(all_embeddings):
original_index = uncached_indices[i]
cached_embeddings[original_index] = embedding
# Cache the result
with self.cache_lock:
cache_key = hashlib.md5(uncached_texts[i].encode()).hexdigest()
self.embedding_cache[cache_key] = embedding
except Exception as e:
logger.error(f"Error generating batch embeddings: {e}")
# Fill with zeros
dim = self.model.get_sentence_embedding_dimension()
for original_index in uncached_indices:
cached_embeddings[original_index] = [0.0] * dim
return cached_embeddings
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
with self.cache_lock:
total_requests = self.cache_hits + self.cache_misses
hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0
return {
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"hit_rate_percent": round(hit_rate, 2),
"cache_size": len(self.embedding_cache)
}
# 2. Pinecone Implementation
class PineconeVectorDB:
"""Production-ready Pinecone vector database implementation"""
def __init__(self, config: PineconeConfig):
self.config = config
self.embedding_manager = EmbeddingManager()
self.index = None
self._initialize_client()
self._ensure_index_exists()
def _initialize_client(self):
"""Initialize Pinecone client"""
try:
import pinecone
pinecone.init(
api_key=self.config.api_key,
environment=self.config.environment
)
logger.info("Pinecone client initialized")
except Exception as e:
logger.error(f"Failed to initialize Pinecone client: {e}")
raise
def _ensure_index_exists(self):
"""Ensure index exists, create if it doesn't"""
try:
import pinecone
if self.config.index_name not in pinecone.list_indexes():
logger.info(f"Creating new index: {self.config.index_name}")
pinecone.create_index(
name=self.config.index_name,
dimension=self.config.dimension,
metric=self.config.metric,
pods=self.config.pods,
replicas=self.config.replicas,
pod_type=self.config.pod_type,
metadata_config=self.config.metadata_config
)
# Wait for index to be ready
self._wait_for_index_ready()
else:
logger.info(f"Using existing index: {self.config.index_name}")
# Connect to index
self.index = pinecone.Index(self.config.index_name)
except Exception as e:
logger.error(f"Failed to ensure index exists: {e}")
raise
@backoff.on_exception(backoff.expo, Exception, max_tries=5)
def _wait_for_index_ready(self):
"""Wait for index to be ready with exponential backoff"""
import pinecone
while self.config.index_name not in pinecone.list_indexes():
logger.info("Waiting for index to be ready...")
time.sleep(5)
# Check index stats
stats = pinecone.describe_index(self.config.index_name)
logger.info(f"Index ready: {stats}")
def upsert_documents(
self,
documents: List[str],
ids: Optional[List[str]] = None,
metadatas: Optional[List[Dict[str, Any]]] = None,
namespace: str = "",
batch_size: int = 100
) -> Dict[str, Any]:
"""Upsert documents to Pinecone index"""
if not documents:
return {"success": False, "error": "No documents provided"}
start_time = time.time()
try:
# Generate IDs if not provided
if ids is None:
ids = [str(uuid.uuid4()) for _ in documents]
# Generate embeddings
embeddings = self.embedding_manager.get_batch_embeddings(documents)
# Prepare metadata
if metadatas is None:
metadatas = [{} for _ in documents]
elif len(metadatas) != len(documents):
metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]
# Add timestamps
current_time = datetime.now().isoformat()
for i in range(len(metadatas)):
metadatas[i].update({
"created_at": current_time,
"length": len(documents[i]),
"word_count": len(documents[i].split())
})
# Prepare vectors for upsert
vectors = []
for i, (doc_id, embedding, doc_text, metadata) in enumerate(zip(ids, embeddings, documents, metadatas)):
vectors.append({
"id": doc_id,
"values": embedding,
"metadata": {
**metadata,
"text": doc_text
}
})
# Upsert in batches
total_upserted = 0
all_ids = []
for i in range(0, len(vectors), batch_size):
batch_vectors = vectors[i:i + batch_size]
# Upsert batch
self.index.upsert(
vectors=batch_vectors,
namespace=namespace
)
total_upserted += len(batch_vectors)
all_ids.extend([v["id"] for v in batch_vectors])
logger.info(f"Upserted batch {i//batch_size + 1}/{(len(vectors) + batch_size - 1)//batch_size}")
# Small delay to avoid rate limits
if i < len(vectors) - batch_size:
time.sleep(0.1)
processing_time = time.time() - start_time
return {
"success": True,
"ids": all_ids,
"documents_upserted": total_upserted,
"processing_time": processing_time,
"avg_time_per_document": processing_time / len(documents),
"namespace": namespace
}
except Exception as e:
logger.error(f"Error upserting documents: {e}")
return {
"success": False,
"error": str(e),
"documents_attempted": len(documents)
}
def search(
self,
query: str,
top_k: int = 10,
namespace: str = "",
include_metadata: bool = True,
filter_dict: Optional[Dict[str, Any]] = None,
include_values: bool = True
) -> Dict[str, Any]:
"""Search for similar documents"""
start_time = time.time()
try:
# Get query embedding
query_embedding = self.embedding_manager.get_embedding(query)
# Prepare search parameters
search_params = {
"vector": query_embedding,
"top_k": top_k,
"include_metadata": include_metadata,
"namespace": namespace
}
if include_values:
search_params["include_values"] = include_values
if filter_dict:
search_params["filter"] = filter_dict
# Perform search
results = self.index.query(**search_params)
# Process results
processing_time = time.time() - start_time
processed_results = {
"query": query,
"namespace": namespace,
"processing_time": processing_time,
"results": []
}
if "matches" in results and results["matches"]:
for match in results["matches"]:
result_item = {
"id": match["id"],
"score": match["score"],
"metadata": match.get("metadata", {})
}
if "values" in match:
result_item["values"] = match["values"]
processed_results["results"].append(result_item)
processed_results["total_found"] = len(processed_results["results"])
return processed_results
except Exception as e:
logger.error(f"Error during search: {e}")
return {
"query": query,
"error": str(e),
"processing_time": time.time() - start_time
}
def delete_documents(
self,
ids: List[str],
namespace: str = "",
delete_all: bool = False
) -> Dict[str, Any]:
"""Delete documents from index"""
try:
if delete_all:
# Delete all documents in namespace
self.index.delete(namespace=namespace, delete_all=True)
deleted_count = "all"
else:
# Delete specific documents
self.index.delete(ids=ids, namespace=namespace)
deleted_count = len(ids)
return {
"success": True,
"deleted_count": deleted_count,
"namespace": namespace
}
except Exception as e:
logger.error(f"Error deleting documents: {e}")
return {"error": str(e)}
def get_index_stats(self) -> Dict[str, Any]:
"""Get index statistics"""
try:
import pinecone
# Describe index
index_description = pinecone.describe_index(self.config.index_name)
# Get index statistics
index_stats = self.index.describe_index_stats()
# Get embedding cache stats
cache_stats = self.embedding_manager.get_cache_stats()
return {
"index_name": self.config.index_name,
"dimension": self.config.dimension,
"metric": self.config.metric,
"pods": self.config.pods,
"replicas": self.config.replicas,
"pod_type": self.config.pod_type,
"total_vector_count": index_stats.get("totalVectorCount", 0),
"vector_count_per_pod": index_stats.get("dimension", 0),
"index_fullness": index_stats.get("indexFullness", 0),
"description": index_description,
"embedding_cache": cache_stats
}
except Exception as e:
logger.error(f"Error getting index stats: {e}")
return {"error": str(e)}
# 3. Weaviate Implementation
class WeaviateVectorDB:
"""Production-ready Weaviate vector database implementation"""
def __init__(self, config: WeaviateConfig):
self.config = config
self.embedding_manager = EmbeddingManager()
self.client = None
self._initialize_client()
self._ensure_schema_exists()
def _initialize_client(self):
"""Initialize Weaviate client"""
try:
import weaviate
client_config = {
"url": self.config.url,
"timeout_config": (5, 15, 300, 300), # (connect, read, write, init)
}
if self.config.api_key:
client_config["auth_client_secret"] = weaviate.AuthApiKey(
api_key=self.config.api_key
)
self.client = weaviate.Client(client_config)
logger.info("Weaviate client initialized")
except Exception as e:
logger.error(f"Failed to initialize Weaviate client: {e}")
raise
def _ensure_schema_exists(self):
"""Ensure schema exists, create if it doesn't"""
try:
# Check if class exists
if not self.client.schema.exists(self.config.class_name):
logger.info(f"Creating new class: {self.config.class_name}")
# Define schema
class_obj = {
"class": self.config.class_name,
"description": "Document class for semantic search",
"vectorizer": self.config.vectorizer,
"moduleConfig": {
"text2vec-openai": None # We're providing our own embeddings
},
"properties": [
{
"name": "content",
"dataType": ["text"],
"description": "Document content"
},
{
"name": "title",
"dataType": ["text"],
"description": "Document title"
},
{
"name": "category",
"dataType": ["string"],
"description": "Document category"
},
{
"name": "created_at",
"dataType": ["date"],
"description": "Creation timestamp"
},
{
"name": "length",
"dataType": ["int"],
"description": "Document length"
},
{
"name": "word_count",
"dataType": ["int"],
"description": "Word count"
}
]
}
self.client.schema.create_class(class_obj)
logger.info(f"Created class: {self.config.class_name}")
else:
logger.info(f"Using existing class: {self.config.class_name}")
except Exception as e:
logger.error(f"Failed to ensure schema exists: {e}")
raise
def add_documents(
self,
documents: List[str],
titles: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
batch_size: Optional[int] = None
) -> Dict[str, Any]:
"""Add documents to Weaviate"""
if not documents:
return {"success": False, "error": "No documents provided"}
batch_size = batch_size or self.config.batch_size
start_time = time.time()
try:
# Generate embeddings
embeddings = self.embedding_manager.get_batch_embeddings(documents)
# Prepare data objects
data_objects = []
for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
data_object = {
"class": self.config.class_name,
"vector": embedding,
"properties": {
"content": doc,
"created_at": datetime.now().isoformat(),
"length": len(doc),
"word_count": len(doc.split())
}
}
# Add title if provided
if titles and i < len(titles):
data_object["properties"]["title"] = titles[i]
# Add category if provided
if categories and i < len(categories):
data_object["properties"]["category"] = categories[i]
data_objects.append(data_object)
# Batch import
all_ids = []
total_imported = 0
for i in range(0, len(data_objects), batch_size):
batch_data = data_objects[i:i + batch_size]
# Import batch
result = self.client.batch.create_objects(batch_data)
if result and len(result) > 0:
batch_ids = []
for item in result:
if item.get("class") == self.config.class_name:
batch_ids.append(item.get("id"))
all_ids.extend(batch_ids)
total_imported += len(batch_ids)
logger.info(f"Imported batch {i//batch_size + 1}/{(len(data_objects) + batch_size - 1)//batch_size}")
# Small delay to avoid overwhelming
if i < len(data_objects) - batch_size:
time.sleep(0.1)
processing_time = time.time() - start_time
return {
"success": True,
"ids": all_ids,
"documents_imported": total_imported,
"processing_time": processing_time,
"avg_time_per_document": processing_time / len(documents)
}
except Exception as e:
logger.error(f"Error adding documents: {e}")
return {
"success": False,
"error": str(e),
"documents_attempted": len(documents)
}
def search(
self,
query: str,
limit: int = 10,
filters: Optional[Dict[str, Any]] = None,
return_properties: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Search for similar documents"""
start_time = time.time()
try:
# Get query embedding
query_embedding = self.embedding_manager.get_embedding(query)
# Build near_text query
search_query = {
"class": self.config.class_name,
"nearText": {
"concepts": [query],
"distance": 0.7
},
"limit": limit,
"autocut": 1,
"returnMetadata": ["creationTime", "lastUpdateTime"],
"returnProperties": return_properties or ["content", "title", "category"]
}
# Add filters if provided
if filters:
search_query["where"] = {
"operator": "And",
"operands": [
{"path": [prop], "operator": "Equal", "valueText": value}
for prop, value in filters.items()
]
}
# Perform search
result = self.client.query.get(search_query)
processing_time = time.time() - start_time
# Process results
processed_results = {
"query": query,
"processing_time": processing_time,
"results": []
}
if result and "data" in result:
for item in result["data"]:
result_item = {
"id": item.get("id"),
"class": item.get("class"),
"properties": item.get("properties", {}),
"metadata": item.get("metadata", {}),
"certainty": item.get("certainty", 0.0)
}
processed_results["results"].append(result_item)
processed_results["total_found"] = len(processed_results["results"])
return processed_results
except Exception as e:
logger.error(f"Error during search: {e}")
return {
"query": query,
"error": str(e),
"processing_time": time.time() - start_time
}
def update_document(
self,
document_id: str,
new_content: Optional[str] = None,
new_properties: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Update an existing document"""
try:
# Get current document
current_result = self.client.data_object.get_by_id(
document_id,
class_name=self.config.class_name
)
if not current_result:
return {"error": f"Document with ID {document_id} not found"}
# Prepare update data
update_data = {
"id": document_id,
"class": self.config.class_name
}
# Update content if provided
if new_content:
new_embedding = self.embedding_manager.get_embedding(new_content)
update_data["vector"] = new_embedding
update_data["properties"] = {
"content": new_content,
"updated_at": datetime.now().isoformat(),
"length": len(new_content),
"word_count": len(new_content.split())
}
# Merge new properties
if new_properties:
if "properties" not in update_data:
update_data["properties"] = {}
update_data["properties"].update(new_properties)
# Perform update
result = self.client.data_object.update(update_data)
return {
"success": True,
"document_id": document_id,
"updated_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error updating document: {e}")
return {"error": str(e)}
def delete_document(self, document_id: str) -> Dict[str, Any]:
"""Delete a document"""
try:
result = self.client.data_object.delete(
document_id,
class_name=self.config.class_name
)
return {
"success": True,
"document_id": document_id,
"deleted": True
}
except Exception as e:
logger.error(f"Error deleting document: {e}")
return {"error": str(e)}
def get_schema_stats(self) -> Dict[str, Any]:
"""Get schema statistics"""
try:
schema = self.client.schema.get()
# Find our class
our_class = None
for class_obj in schema.get("classes", []):
if class_obj.get("class") == self.config.class_name:
our_class = class_obj
break
if not our_class:
return {"error": f"Class {self.config.class_name} not found"}
# Get class statistics
stats = self.client.query.aggregate(self.config.class_name)
# Get embedding cache stats
cache_stats = self.embedding_manager.get_cache_stats()
return {
"class_name": self.config.class_name,
"vectorizer": self.config.vectorizer,
"module_config": our_class.get("moduleConfig", {}),
"properties": our_class.get("properties", []),
"total_objects": stats.get("data", {}).get("count", 0),
"embedding_cache": cache_stats
}
except Exception as e:
logger.error(f"Error getting schema stats: {e}")
return {"error": str(e)}
# 4. Vector Database Comparison and Benchmarking
class VectorDatabaseComparator:
"""Compare and benchmark different vector databases"""
def __init__(self):
self.benchmark_results = {}
def benchmark_operations(
self,
vector_dbs: Dict[str, Any],
documents: List[str],
queries: List[str]
) -> Dict[str, Any]:
"""Benchmark operations across multiple vector databases"""
results = {}
for db_name, db in vector_dbs.items():
logger.info(f"Benchmarking {db_name}...")
try:
db_results = self._benchmark_single_db(db, documents, queries)
results[db_name] = db_results
except Exception as e:
logger.error(f"Error benchmarking {db_name}: {e}")
results[db_name] = {"error": str(e)}
self.benchmark_results = results
return results
def _benchmark_single_db(self, db: Any, documents: List[str], queries: List[str]) -> Dict[str, Any]:
"""Benchmark a single vector database"""
results = {}
# Benchmark indexing
indexing_start = time.time()
if hasattr(db, 'upsert_documents'): # Pinecone
index_result = db.upsert_documents(documents[:100]) # Limit for benchmark
elif hasattr(db, 'add_documents'): # Weaviate
index_result = db.add_documents(documents[:100])
else:
index_result = {"success": False}
indexing_time = time.time() - indexing_start
# Benchmark search
search_times = []
successful_searches = 0
for query in queries[:10]: # Limit queries
search_start = time.time()
try:
if hasattr(db, 'search'):
search_result = db.search(query, top_k=5)
if search_result and "error" not in search_result:
successful_searches += 1
search_times.append(time.time() - search_start)
except Exception as e:
logger.warning(f"Search error for query '{query}': {e}")
# Calculate statistics
results = {
"indexing": {
"success": index_result.get("success", False),
"time": indexing_time,
"docs_per_second": len(documents[:100]) / indexing_time if indexing_time > 0 else 0
},
"search": {
"successful_searches": successful_searches,
"total_searches": len(queries[:10]),
"success_rate": successful_searches / len(queries[:10]) * 100,
"avg_search_time": np.mean(search_times) if search_times else 0,
"min_search_time": min(search_times) if search_times else 0,
"max_search_time": max(search_times) if search_times else 0
}
}
return results
def generate_comparison_report(self) -> Dict[str, Any]:
"""Generate comparison report from benchmark results"""
if not self.benchmark_results:
return {"error": "No benchmark results available"}
report = {
"summary": {},
"detailed_results": self.benchmark_results,
"recommendations": []
}
# Analyze results
indexing_times = []
search_times = []
for db_name, results in self.benchmark_results.items():
if "error" not in results:
if "indexing" in results:
indexing_times.append((db_name, results["indexing"]["time"]))
if "search" in results:
search_times.append((db_name, results["search"]["avg_search_time"]))
# Find best performing databases
if indexing_times:
best_indexing = min(indexing_times, key=lambda x: x[1])
worst_indexing = max(indexing_times, key=lambda x: x[1])
report["summary"]["best_indexing"] = best_indexing[0]
report["summary"]["worst_indexing"] = worst_indexing[0]
report["summary"]["indexing_speed_ratio"] = worst_indexing[1] / best_indexing[1]
if search_times:
best_search = min(search_times, key=lambda x: x[1])
worst_search = max(search_times, key=lambda x: x[1])
report["summary"]["best_search"] = best_search[0]
report["summary"]["worst_search"] = worst_search[0]
report["summary"]["search_speed_ratio"] = worst_search[1] / best_search[1]
# Generate recommendations
recommendations = []
if indexing_times and search_times:
# Check if any database is consistently better
best_overall = None
best_scores = {}
for db_name in set([x[0] for x in indexing_times + search_times]):
idx_time = next((x[1] for x in indexing_times if x[0] == db_name), float('inf'))
search_time = next((x[1] for x in search_times if x[0] == db_name), float('inf'))
if idx_time != float('inf') and search_time != float('inf'):
score = (1 / idx_time) + (1 / search_time) # Higher is better
best_scores[db_name] = score
if best_scores:
best_overall = max(best_scores.items(), key=lambda x: x[1])
recommendations.append(
f"Best overall performance: {best_overall[0]}"
)
if recommendations:
report["recommendations"] = recommendations
return report
# 5. Production Features
class ProductionFeatures:
"""Production-ready features for vector databases"""
@staticmethod
def create_backup(vector_db, backup_path: str) -> Dict[str, Any]:
"""Create backup of vector database"""
try:
backup_data = {
"timestamp": datetime.now().isoformat(),
"config": vector_db.config if hasattr(vector_db, 'config') else {},
"documents": [],
"metadata": {}
}
# This is a simplified backup implementation
# In production, use database-specific backup mechanisms
if hasattr(vector_db, 'get_index_stats'): # Pinecone
stats = vector_db.get_index_stats()
backup_data["stats"] = stats
elif hasattr(vector_db, 'get_schema_stats'): # Weaviate
stats = vector_db.get_schema_stats()
backup_data["stats"] = stats
# Save backup
backup_file = Path(backup_path)
backup_file.parent.mkdir(parents=True, exist_ok=True)
with open(backup_file, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
return {
"success": True,
"backup_path": str(backup_file),
"file_size_mb": backup_file.stat().st_size / (1024 * 1024)
}
except Exception as e:
logger.error(f"Error creating backup: {e}")
return {"error": str(e)}
@staticmethod
def monitor_performance(vector_db, duration_minutes: int = 5) -> Dict[str, Any]:
"""Monitor vector database performance"""
try:
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
metrics = {
"query_times": [],
"error_count": 0,
"success_count": 0,
"monitoring_duration": duration_minutes
}
test_queries = [
"test query for performance monitoring",
"another test query",
"performance test query"
]
while time.time() < end_time:
query_start = time.time()
try:
if hasattr(vector_db, 'search'):
result = vector_db.search(
test_queries[0],
top_k=5
)
if result and "error" not in result:
metrics["success_count"] += 1
else:
metrics["error_count"] += 1
query_time = time.time() - query_start
metrics["query_times"].append(query_time)
except Exception as e:
metrics["error_count"] += 1
logger.warning(f"Monitoring query error: {e}")
# Wait between queries
time.sleep(10)
# Calculate statistics
if metrics["query_times"]:
metrics.update({
"avg_query_time": np.mean(metrics["query_times"]),
"min_query_time": min(metrics["query_times"]),
"max_query_time": max(metrics["query_times"]),
"p95_query_time": np.percentile(metrics["query_times"], 95),
"queries_per_minute": (metrics["success_count"] + metrics["error_count"]) / duration_minutes,
"success_rate": (metrics["success_count"] / (metrics["success_count"] + metrics["error_count"]) * 100) if (metrics["success_count"] + metrics["error_count"]) > 0 else 0
})
return metrics
except Exception as e:
logger.error(f"Error monitoring performance: {e}")
return {"error": str(e)}
# 6. Demonstration Functions
def demo_pinecone():
"""Demonstrate Pinecone vector database"""
print("=== Pinecone Vector Database Demo ===")
# Configuration
config = PineconeConfig(
api_key=os.getenv("PINECONE_API_KEY", "your-pinecone-api-key"),
environment=os.getenv("PINECONE_ENVIRONMENT", "us-west1-gcp-free"),
index_name="demo-index"
)
# Initialize
vector_db = PineconeVectorDB(config)
# Sample documents
documents = [
"Pinecone is a vector database service for machine learning applications.",
"Vector databases enable efficient similarity search and recommendation systems.",
"Embedding-based search powers modern AI applications including semantic search and RAG.",
"Cloud-native vector databases provide scalability and managed infrastructure for production use.",
"Real-time search capabilities enable instant similarity matching in large datasets."
]
# Upsert documents
result = vector_db.upsert_documents(documents)
print(f"Upserted {result['documents_upserted']} documents")
# Search
search_result = vector_db.search("AI applications", top_k=3)
print(f"\nSearch results:")
for result_item in search_result["results"]:
print(f"- Score: {result_item['score']:.3f}")
print(f" Content: {result_item['metadata']['text'][:100]}...")
# Get stats
stats = vector_db.get_index_stats()
print(f"\nIndex stats:")
print(f"- Total vectors: {stats['total_vector_count']}")
print(f"- Dimension: {stats['dimension']}")
def demo_weaviate():
"""Demonstrate Weaviate vector database"""
print("\n=== Weaviate Vector Database Demo ===")
# Configuration
config = WeaviateConfig(
url="http://localhost:8080", # Replace with your Weaviate URL
class_name="Document"
)
try:
# Initialize
vector_db = WeaviateVectorDB(config)
# Sample documents with metadata
documents = [
"Weaviate is an open-source vector database that scales with machine learning.",
"GraphQL API provides flexible querying capabilities for vector databases.",
"Schema-first design ensures type-safe data management in Weaviate.",
"Hybrid search combines vector and keyword search for comprehensive results.",
"Real-time collaboration features enable multiple users to work with vector databases simultaneously."
]
titles = [
"Weaviate Overview",
"GraphQL in Weaviate",
"Schema Design",
"Search Capabilities",
"Collaboration Features"
]
categories = [
"database",
"api",
"design",
"search",
"features"
]
# Add documents
result = vector_db.add_documents(documents, titles, categories)
print(f"Added {result['documents_imported']} documents")
# Search
search_result = vector_db.search("vector database", limit=3)
print(f"\nSearch results:")
for result_item in search_result["results"]:
print(f"- Certainty: {result_item['certainty']:.3f}")
print(f" Title: {result_item['properties'].get('title', 'No title')}")
print(f" Category: {result_item['properties'].get('category', 'No category')}")
print(f" Content: {result_item['properties'].get('content', '')[:100]}...")
# Get schema stats
stats = vector_db.get_schema_stats()
print(f"\nSchema stats:")
print(f"- Total objects: {stats['total_objects']}")
print(f"- Properties: {len(stats['properties'])}")
except Exception as e:
print(f"Weaviate demo error: {e}")
print("Make sure Weaviate is running at http://localhost:8080")
def demo_comparison():
"""Demonstrate vector database comparison"""
print("\n=== Vector Database Comparison Demo ===")
try:
# Initialize databases (only if credentials are available)
vector_dbs = {}
# Pinecone
if os.getenv("PINECONE_API_KEY"):
pinecone_config = PineconeConfig(
api_key=os.getenv("PINECONE_API_KEY"),
environment="us-west1-gcp-free",
index_name="comparison-index"
)
vector_dbs["Pinecone"] = PineconeVectorDB(pinecone_config)
# Weaviate (only if running)
try:
weaviate_config = WeaviateConfig(
url="http://localhost:8080",
class_name="ComparisonDocument"
)
vector_dbs["Weaviate"] = WeaviateVectorDB(weaviate_config)
except:
print("Weaviate not available for comparison")
if not vector_dbs:
print("No vector databases available for comparison")
return
# Sample data
documents = [
"Vector databases enable efficient similarity search and retrieval in high-dimensional spaces.",
"Machine learning applications benefit from vector search capabilities for recommendation systems.",
"Semantic search uses embeddings to understand the meaning behind queries and documents.",
"Real-time vector search powers modern applications with instant response times.",
"Scalable vector databases handle millions of vectors with consistent performance."
]
queries = [
"vector database performance",
"machine learning applications",
"semantic search technology"
]
# Benchmark
comparator = VectorDatabaseComparator()
results = comparator.benchmark_operations(vector_dbs, documents, queries)
# Generate report
report = comparator.generate_comparison_report()
print("Comparison Results:")
print(json.dumps(report, indent=2))
except Exception as e:
print(f"Comparison demo error: {e}")
def demo_production_features():
"""Demonstrate production features"""
print("\n=== Production Features Demo ===")
# Create sample vector DB for demo
config = PineconeConfig(
api_key=os.getenv("PINECONE_API_KEY", "demo-key"),
environment="us-west1-gcp-free",
index_name="prod-demo-index"
)
try:
vector_db = PineconeVectorDB(config)
# Performance monitoring
print("Starting performance monitoring (2 minutes)...")
# Note: This would run for 2 minutes in a real scenario
# For demo, we'll just show the structure
print("Monitoring features include:")
print("- Query time tracking")
print("- Success/error rate monitoring")
print("- Performance statistics")
print("- Automated alerting")
# Backup demonstration
backup_result = ProductionFeatures.create_backup(
vector_db,
"./vector_db_backup.json"
)
print(f"\nBackup result: {backup_result}")
except Exception as e:
print(f"Production features demo error: {e}")
# Main execution
if __name__ == "__main__":
try:
demo_pinecone()
demo_weaviate()
demo_comparison()
demo_production_features()
except Exception as e:
print(f"Demo error: {e}")
logger.exception("Demo failed")
💻 FAISS und Benutzerdefinierte Vektor-Lösungen python
🔴 complex
⭐⭐⭐⭐⭐
Lokale Vektor-Datenbank-Implementationen mit FAISS, Annoy und benutzerdefinierten Lösungen für privacy-fokussierte Anwendungen
⏱️ 45 min
🏷️ faiss, annoy, vector-database, local, embedding
Prerequisites:
Python, NumPy, FAISS, Annoy, sentence-transformers
# FAISS and Custom Vector Solutions
# Local vector database implementations for privacy-focused applications
import os
import json
import time
import uuid
import pickle
import numpy as np
import faiss
import threading
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
from pathlib import Path
import logging
import mmap
from sentence_transformers import SentenceTransformer
import h5py
from annoy import AnnoyIndex
import scipy.sparse as sp
import scipy.spatial.distance as spd
import gc
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 1. Configuration Classes
@dataclass
class FAISSConfig:
"""Configuration for FAISS vector index"""
dimension: int = 384 # MiniLM dimension
index_type: str = "IVF" # IVF, HNSW, Flat, IVFPQ, SCANN
metric: str = "INNER_PRODUCT" # INNER_PRODUCT, L2, COSINE
nlist: int = 100 # For IVF
m: int = 16 # For HNSW
nbits: int = 8 # For IVFPQ
use_gpu: bool = False
gpu_id: int = 0
batch_size: int = 1000
nprobe: int = 10 # Search parameter for IVF
@dataclass
class CustomVectorDBConfig:
"""Configuration for custom vector database"""
storage_path: str = "./vector_db"
embedding_model: str = "all-MiniLM-L6-v2"
max_memory_mb: int = 1024
enable_compression: bool = True
enable_mmap: bool = True
auto_save_interval: int = 100 # Save after N operations
backup_enabled: bool = True
index_type: str = "faiss" # faiss, annoy, custom
distance_metric: str = "cosine" # cosine, euclidean, manhattan
# 2. Embedding Management
class EfficientEmbeddingManager:
"""Memory-efficient embedding management with advanced caching"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = "cpu"):
self.model_name = model_name
self.device = device
self.model = None
self.embedding_cache = {}
self.cache_lock = threading.RLock()
self.cache_hits = 0
self.cache_misses = 0
self.cache_size_mb = 0
self.max_cache_size_mb = 512 # 512MB cache
self._load_model()
def _load_model(self):
"""Load embedding model with memory optimization"""
try:
self.model = SentenceTransformer(
self.model_name,
device=self.device
)
logger.info(f"Loaded embedding model: {self.model_name}")
# Optimize model for inference
self.model.eval()
if hasattr(self.model, "max_seq_length"):
self.model.max_seq_length = 512 # Limit sequence length
except Exception as e:
logger.error(f"Failed to load model {self.model_name}: {e}")
raise
def get_embedding(self, text: str, use_cache: bool = True) -> np.ndarray:
"""Get embedding with memory-efficient caching"""
if not text or not text.strip():
return np.zeros(self.model.get_sentence_embedding_dimension(), dtype=np.float32)
# Cache key
cache_key = hashlib.md5(text.encode('utf-8')).hexdigest()
# Check cache
if use_cache:
with self.cache_lock:
if cache_key in self.embedding_cache:
self.cache_hits += 1
return self.embedding_cache[cache_key]
self.cache_misses += 1
try:
# Generate embedding
embedding = self.model.encode(
text,
convert_to_numpy=True,
show_progress_bar=False,
normalize_embeddings=True
)
# Ensure float32 for memory efficiency
if embedding.dtype != np.float32:
embedding = embedding.astype(np.float32)
# Cache result if within memory limits
if use_cache:
with self.cache_lock:
self.embedding_cache[cache_key] = embedding
self.cache_size_mb += embedding.nbytes / (1024 * 1024)
# Clean cache if over limit
if self.cache_size_mb > self.max_cache_size_mb:
self._clean_cache()
return embedding
except Exception as e:
logger.error(f"Error generating embedding: {e}")
dim = self.model.get_sentence_embedding_dimension()
return np.zeros(dim, dtype=np.float32)
def get_batch_embeddings(self, texts: List[str], show_progress: bool = False) -> np.ndarray:
"""Batch embedding generation with memory management"""
if not texts:
return np.array([])
# Check cache first
cached_embeddings = []
uncached_texts = []
uncached_indices = []
with self.cache_lock:
for i, text in enumerate(texts):
cache_key = hashlib.md5(text.encode('utf-8')).hexdigest()
if cache_key in self.embedding_cache:
cached_embeddings.append(self.embedding_cache[cache_key])
self.cache_hits += 1
else:
cached_embeddings.append(None) # Placeholder
uncached_texts.append(text)
uncached_indices.append(i)
self.cache_misses += 1
# Generate uncached embeddings
if uncached_texts:
try:
# Process in batches for memory efficiency
batch_size = 64
all_uncached_embeddings = []
for i in range(0, len(uncached_texts), batch_size):
batch_texts = uncached_texts[i:i + batch_size]
if show_progress:
logger.info(f"Processing embedding batch {i//batch_size + 1}/{(len(uncached_texts) + batch_size - 1)//batch_size}")
batch_embeddings = self.model.encode(
batch_texts,
convert_to_numpy=True,
show_progress_bar=False,
normalize_embeddings=True,
batch_size=len(batch_texts)
)
# Ensure float32
if batch_embeddings.dtype != np.float32:
batch_embeddings = batch_embeddings.astype(np.float32)
all_uncached_embeddings.append(batch_embeddings)
# Clear memory
del batch_embeddings
gc.collect()
# Combine all uncached embeddings
if all_uncached_embeddings:
all_uncached_embeddings = np.vstack(all_uncached_embeddings)
else:
all_uncached_embeddings = np.array([]).reshape(0, 0)
# Cache results
for i, embedding in enumerate(all_uncached_embeddings):
original_index = uncached_indices[i]
cache_key = hashlib.md5(uncached_texts[i].encode('utf-8')).hexdigest()
with self.cache_lock:
self.embedding_cache[cache_key] = embedding
self.cache_size_mb += embedding.nbytes / (1024 * 1024)
# Check cache size
if self.cache_size_mb > self.max_cache_size_mb:
self._clean_cache()
# Reconstruct full result array
full_embeddings = np.zeros((len(texts), all_uncached_embeddings.shape[1] if all_uncached_embeddings.size > 0 else self.model.get_sentence_embedding_dimension()), dtype=np.float32)
# Fill with cached and newly generated embeddings
cached_idx = 0
uncached_idx = 0
for i in range(len(texts)):
if cached_embeddings[i] is not None:
full_embeddings[i] = cached_embeddings[i]
else:
full_embeddings[i] = all_uncached_embeddings[uncached_idx]
uncached_idx += 1
return full_embeddings
except Exception as e:
logger.error(f"Error in batch embedding generation: {e}")
# Return zeros as fallback
dim = self.model.get_sentence_embedding_dimension()
return np.zeros((len(texts), dim), dtype=np.float32)
# All embeddings were cached
return np.array([emb for emb in cached_embeddings if emb is not None])
def _clean_cache(self):
"""Clean cache to free memory"""
# Remove oldest entries
entries_to_remove = len(self.embedding_cache) // 2
keys_to_remove = list(self.embedding_cache.keys())[:entries_to_remove]
removed_bytes = 0
for key in keys_to_remove:
if key in self.embedding_cache:
removed_bytes += self.embedding_cache[key].nbytes
del self.embedding_cache[key]
self.cache_size_mb -= removed_bytes / (1024 * 1024)
logger.info(f"Cleaned cache, removed {len(keys_to_remove)} entries")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
with self.cache_lock:
total_requests = self.cache_hits + self.cache_misses
hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0
return {
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"hit_rate_percent": round(hit_rate, 2),
"cache_size_mb": round(self.cache_size_mb, 2),
"cache_entries": len(self.embedding_cache),
"max_cache_size_mb": self.max_cache_size_mb
}
# 3. FAISS Implementation
class FAISSVectorStore:
"""Production-ready FAISS vector store with advanced features"""
def __init__(self, config: FAISSConfig):
self.config = config
self.embedding_manager = EfficientEmbeddingManager()
self.index = None
self.documents = []
self.metadata = []
self.ids = []
self.index_path = None
self._initialize_index()
def _initialize_index(self):
"""Initialize FAISS index based on configuration"""
try:
dimension = self.config.dimension
if self.config.index_type == "IVF":
# Inverted File Index
quantizer = faiss.IndexFlatIP(dimension)
self.index = faiss.IndexIVFFlat(quantizer, dimension, self.config.nlist)
elif self.config.index_type == "HNSW":
# Hierarchical Navigable Small World
self.index = faiss.IndexHNSWFlat(dimension, self.config.m)
elif self.config.index_type == "Flat":
# Simple flat index
self.index = faiss.IndexFlatIP(dimension)
elif self.config.index_type == "IVFPQ":
# Product Quantization
quantizer = faiss.IndexFlatIP(dimension)
self.index = faiss.IndexIVFPQ(quantizer, dimension, self.config.nbits, self.config.nlist)
else:
raise ValueError(f"Unsupported index type: {self.config.index_type}")
logger.info(f"Initialized FAISS index: {self.config.index_type}")
except Exception as e:
logger.error(f"Failed to initialize FAISS index: {e}")
raise
def add_documents(
self,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None,
train: bool = False
) -> Dict[str, Any]:
"""Add documents to the FAISS index"""
if not documents:
return {"success": False, "error": "No documents provided"}
start_time = time.time()
try:
# Generate IDs if not provided
if ids is None:
ids = [str(uuid.uuid4()) for _ in documents]
# Generate embeddings
embeddings = self.embedding_manager.get_batch_embeddings(documents)
# Prepare metadata
if metadatas is None:
metadatas = [{} for _ in documents]
elif len(metadatas) != len(documents):
metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]
# Add timestamps and document info
current_time = datetime.now().isoformat()
for i in range(len(metadatas)):
metadatas[i].update({
"created_at": current_time,
"length": len(documents[i]),
"word_count": len(documents[i].split()),
"embedding_norm": float(np.linalg.norm(embeddings[i]))
})
# Train index if needed
if train and self.config.index_type in ["IVF", "IVFPQ"] and len(self.documents) > 0:
logger.info("Training FAISS index...")
existing_embeddings = np.array([self.documents[i]["embedding"] for i in range(len(self.documents))])
all_embeddings = np.vstack([existing_embeddings, embeddings])
self.index.reset()
self.index.train(all_embeddings)
self.index.add(all_embeddings)
# Add existing documents back
for i, (doc, meta, doc_id) in enumerate(zip(self.documents, self.metadata, self.ids)):
self.index.add(1, self.documents[i]["embedding"].reshape(1, -1))
else:
# Add new embeddings
self.index.add(embeddings)
# Store documents
for i, (doc, meta, doc_id, embedding) in enumerate(zip(documents, metadatas, ids, embeddings)):
self.documents.append({
"id": doc_id,
"content": doc,
"metadata": meta,
"embedding": embedding
})
self.metadata.append(meta)
self.ids.append(doc_id)
processing_time = time.time() - start_time
return {
"success": True,
"ids": ids,
"documents_added": len(documents),
"processing_time": processing_time,
"total_documents": len(self.documents),
"index_trained": train
}
except Exception as e:
logger.error(f"Error adding documents: {e}")
return {
"success": False,
"error": str(e),
"documents_attempted": len(documents)
}
def search(
self,
query: str,
k: int = 10,
nprobe: Optional[int] = None
) -> Dict[str, Any]:
"""Search for similar documents"""
start_time = time.time()
try:
# Get query embedding
query_embedding = self.embedding_manager.get_embedding(query)
query_embedding = query_embedding.reshape(1, -1)
# Set search parameters
search_k = min(k, len(self.documents))
# Perform search
if self.config.index_type in ["IVF", "IVFPQ"]:
search_params = {"k": search_k, "nprobe": nprobe or self.config.nprobe}
else:
search_params = {"k": search_k}
distances, indices = self.index.search(query_embedding, **search_params)
# Process results
results = []
for i in range(search_k):
idx = indices[0][i]
distance = distances[0][i]
if 0 <= idx < len(self.documents):
doc = self.documents[idx]
# Convert distance to similarity
if self.config.metric == "INNER_PRODUCT":
similarity = float(distance)
elif self.config.metric == "L2":
similarity = 1.0 / (1.0 + float(distance))
else: # COSINE
similarity = float(distance)
results.append({
"id": doc["id"],
"content": doc["content"],
"metadata": doc["metadata"],
"similarity": similarity,
"distance": float(distance),
"index": idx
})
processing_time = time.time() - start_time
return {
"query": query,
"results": results,
"processing_time": processing_time,
"k": k,
"total_found": len(results)
}
except Exception as e:
logger.error(f"Error during search: {e}")
return {
"query": query,
"error": str(e),
"processing_time": time.time() - start_time
}
def save_index(self, path: str):
"""Save FAISS index to disk"""
try:
save_path = Path(path)
save_path.parent.mkdir(parents=True, exist_ok=True)
# Save FAISS index
faiss.write_index(self.index, str(save_path.with_suffix(".faiss")))
# Save metadata
metadata = {
"config": {
"dimension": self.config.dimension,
"index_type": self.config.index_type,
"metric": self.config.metric,
"nlist": self.config.nlist,
"m": self.config.m,
"nbits": self.config.nbits
},
"documents_count": len(self.documents),
"saved_at": datetime.now().isoformat(),
"documents": self.documents[:1000] # Save only first 1000 for file size
}
with open(save_path.with_suffix(".json"), 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
logger.info(f"Saved FAISS index to {save_path}")
return {"success": True, "path": str(save_path)}
except Exception as e:
logger.error(f"Error saving index: {e}")
return {"error": str(e)}
def load_index(self, path: str):
"""Load FAISS index from disk"""
try:
load_path = Path(path)
# Load metadata
with open(load_path.with_suffix(".json"), 'r', encoding='utf-8') as f:
metadata = json.load(f)
# Load FAISS index
self.index = faiss.read_index(str(load_path.with_suffix(".faiss")))
# Update config
self.config.dimension = metadata["config"]["dimension"]
self.config.index_type = metadata["config"]["index_type"]
self.config.metric = metadata["config"]["metric"]
self.config.nlist = metadata["config"].get("nlist", 100)
self.config.m = metadata["config"].get("m", 16)
logger.info(f"Loaded FAISS index from {load_path}")
return {"success": True, "documents_count": metadata["documents_count"]}
except Exception as e:
logger.error(f"Error loading index: {e}")
return {"error": str(e)}
def get_stats(self) -> Dict[str, Any]:
"""Get index statistics"""
try:
cache_stats = self.embedding_manager.get_cache_stats()
stats = {
"index_type": self.config.index_type,
"dimension": self.config.dimension,
"metric": self.config.metric,
"total_documents": len(self.documents),
"index_ntotal": self.index.ntotal if hasattr(self.index, 'ntotal') else 0,
"embedding_cache": cache_stats
}
# Add type-specific stats
if self.config.index_type == "IVF":
stats["nlist"] = self.config.nlist
elif self.config.index_type == "HNSW":
stats["m"] = self.config.m
elif self.config.index_type == "IVFPQ":
stats["nbits"] = self.config.nbits
stats["nlist"] = self.config.nlist
# Calculate memory usage
if hasattr(self.index, 'ntotal'):
memory_bytes = self.index.ntotal * 4 # Assuming float32
stats["memory_usage_mb"] = memory_bytes / (1024 * 1024)
return stats
except Exception as e:
logger.error(f"Error getting stats: {e}")
return {"error": str(e)}
# 4. Annoy Implementation
class AnnoyVectorStore:
"""Annoy-based vector store for memory-efficient similarity search"""
def __init__(self, dimension: int = 384, n_trees: int = 10, metric: str = "angular"):
self.dimension = dimension
self.n_trees = n_trees
self.metric = metric
self.embedding_manager = EfficientEmbeddingManager()
# Initialize Annoy index
self.index = AnnoyIndex(dimension, metric=self.metric)
self.index.n_trees = self.n_trees
self.documents = []
self.metadata = []
self.ids = []
self.is_built = False
def add_documents(
self,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Add documents to Annoy index"""
if not documents:
return {"success": False, "error": "No documents provided"}
try:
# Generate IDs if not provided
if ids is None:
ids = [str(uuid.uuid4()) for _ in documents]
# Generate embeddings
embeddings = self.embedding_manager.get_batch_embeddings(documents)
# Prepare metadata
if metadatas is None:
metadatas = [{} for _ in documents]
elif len(metadatas) != len(documents):
metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]
# Add timestamps
current_time = datetime.now().isoformat()
for i in range(len(metadatas)):
metadatas[i].update({
"created_at": current_time,
"length": len(documents[i]),
"word_count": len(documents[i].split())
})
# Add items to index
for i, (embedding, doc, meta, doc_id) in enumerate(zip(embeddings, documents, metadatas, ids)):
self.index.add_item(i, embedding)
self.documents.append({
"id": doc_id,
"content": doc,
"metadata": meta
})
self.metadata.append(meta)
self.ids.append(doc_id)
# Mark index as unbuilt
self.is_built = False
return {
"success": True,
"ids": ids,
"documents_added": len(documents),
"total_documents": len(self.documents)
}
except Exception as e:
logger.error(f"Error adding documents to Annoy: {e}")
return {"error": str(e)}
def build_index(self, n_jobs: int = -1) -> Dict[str, Any]:
"""Build the Annoy index for search"""
try:
if self.is_built:
return {"success": True, "message": "Index already built"}
start_time = time.time()
# Build index
self.index.build(n_jobs=n_jobs)
self.is_built = True
build_time = time.time() - start_time
return {
"success": True,
"build_time": build_time,
"n_trees": self.n_trees,
"total_items": len(self.documents)
}
except Exception as e:
logger.error(f"Error building Annoy index: {e}")
return {"error": str(e)}
def search(
self,
query: str,
k: int = 10,
include_distances: bool = True
) -> Dict[str, Any]:
"""Search for similar documents"""
if not self.is_built:
# Auto-build if not built
build_result = self.build_index()
if not build_result["success"]:
return {"error": "Failed to build index", "details": build_result}
try:
# Get query embedding
query_embedding = self.embedding_manager.get_embedding(query)
# Perform search
indices, distances = self.index.get_nns_by_vector(query_embedding, k)
# Process results
results = []
for i in range(len(indices)):
idx = indices[i]
if 0 <= idx < len(self.documents):
doc = self.documents[idx]
result = {
"id": doc["id"],
"content": doc["content"],
"metadata": doc["metadata"],
"index": idx
}
if include_distances:
result["distance"] = float(distances[i])
# Convert to similarity for cosine/angular distance
if self.metric == "angular":
result["similarity"] = 1.0 - float(distances[i])
elif self.metric == "euclidean":
result["similarity"] = 1.0 / (1.0 + float(distances[i]))
else:
result["similarity"] = 1.0 - float(distances[i])
results.append(result)
return {
"query": query,
"results": results,
"k": k,
"total_found": len(results)
}
except Exception as e:
logger.error(f"Error searching Annoy index: {e}")
return {"error": str(e), "query": query}
def save_index(self, path: str):
"""Save Annoy index to disk"""
try:
save_path = Path(path)
save_path.parent.mkdir(parents=True, exist_ok=True)
# Save Annoy index
self.index.save(str(save_path.with_suffix(".annoy")))
# Save metadata
metadata = {
"dimension": self.dimension,
"n_trees": self.n_trees,
"metric": self.metric,
"is_built": self.is_built,
"documents_count": len(self.documents),
"saved_at": datetime.now().isoformat(),
"documents": self.documents[:1000] # Limit size
}
with open(save_path.with_suffix(".json"), 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
return {"success": True, "path": str(save_path)}
except Exception as e:
return {"error": str(e)}
def load_index(self, path: str):
"""Load Annoy index from disk"""
try:
load_path = Path(path)
# Load metadata
with open(load_path.with_suffix(".json"), 'r', encoding='utf-8') as f:
metadata = json.load(f)
# Load Annoy index
self.index.load(str(load_path.with_suffix(".annoy")))
# Update configuration
self.dimension = metadata["dimension"]
self.n_trees = metadata["n_trees"]
self.metric = metadata["metric"]
self.is_built = metadata["is_built"]
return {"success": True, "documents_count": metadata["documents_count"]}
except Exception as e:
return {"error": str(e)}
def get_stats(self) -> Dict[str, Any]:
"""Get index statistics"""
cache_stats = self.embedding_manager.get_cache_stats()
return {
"dimension": self.dimension,
"n_trees": self.n_trees,
"metric": self.metric,
"is_built": self.is_built,
"total_documents": len(self.documents),
"embedding_cache": cache_stats
}
# 5. Custom Vector Database with Compression
class CustomVectorDB:
"""Custom vector database with compression and advanced features"""
def __init__(self, config: CustomVectorDBConfig):
self.config = config
self.embedding_manager = EfficientEmbeddingManager(config.embedding_model)
self.documents = []
self.embeddings = None
self.metadata = []
self.ids = []
self._initialize_storage()
def _initialize_storage(self):
"""Initialize storage based on configuration"""
storage_path = Path(self.config.storage_path)
storage_path.mkdir(parents=True, exist_ok=True)
self.data_file = storage_path / "data.h5"
self.index_file = storage_path / "index.pkl"
self.metadata_file = storage_path / "metadata.json"
self._load_existing_data()
def _load_existing_data(self):
"""Load existing data from disk"""
try:
# Load metadata
if self.metadata_file.exists():
with open(self.metadata_file, 'r', encoding='utf-8') as f:
saved_metadata = json.load(f)
self.documents = saved_metadata.get("documents", [])
self.metadata = saved_metadata.get("metadata", [])
self.ids = saved_metadata.get("ids", [])
logger.info(f"Loaded {len(self.documents)} existing documents")
# Load embeddings
if self.data_file.exists():
with h5py.File(self.data_file, 'r') as f:
if "embeddings" in f:
self.embeddings = f["embeddings"][:]
logger.info(f"Loaded embeddings with shape: {self.embeddings.shape}")
# Load index if available
if self.index_file.exists():
with open(self.index_file, 'rb') as f:
self.index = pickle.load(f)
logger.info("Loaded search index")
except Exception as e:
logger.warning(f"Could not load existing data: {e}")
self.documents = []
self.embeddings = None
self.metadata = []
self.ids = []
self.index = None
def _save_data(self):
"""Save data to disk"""
try:
# Save metadata
metadata = {
"documents": self.documents,
"metadata": self.metadata,
"ids": self.ids,
"last_updated": datetime.now().isoformat()
}
with open(self.metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
# Save embeddings
if self.embeddings is not None:
with h5py.File(self.data_file, 'w') as f:
f.create_dataset("embeddings", data=self.embeddings, compression="gzip")
# Save index
if self.index is not None:
with open(self.index_file, 'wb') as f:
pickle.dump(self.index, f)
except Exception as e:
logger.error(f"Error saving data: {e}")
def add_documents(
self,
documents: List[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Add documents with compression"""
if not documents:
return {"success": False, "error": "No documents provided"}
try:
start_time = time.time()
# Generate IDs if not provided
if ids is None:
ids = [str(uuid.uuid4()) for _ in documents]
# Generate embeddings
embeddings = self.embedding_manager.get_batch_embeddings(documents)
# Prepare metadata
if metadatas is None:
metadatas = [{} for _ in documents]
elif len(metadatas) != len(documents):
metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]
# Add timestamps
current_time = datetime.now.now().isoformat()
for i in range(len(metadatas)):
metadatas[i].update({
"created_at": current_time,
"length": len(documents[i]),
"word_count": len(documents[i].split())
})
# Add to storage
for i, (doc, embedding, meta, doc_id) in enumerate(zip(documents, embeddings, metadatas, ids)):
self.documents.append({
"id": doc_id,
"content": doc,
"metadata": meta
})
self.metadata.append(meta)
self.ids.append(doc_id)
# Update embeddings matrix
if self.embeddings is None:
self.embeddings = embeddings
else:
self.embeddings = np.vstack([self.embeddings, embeddings])
# Rebuild index
self._build_search_index()
# Save data
self._save_data()
processing_time = time.time() - start_time
return {
"success": True,
"ids": ids,
"documents_added": len(documents),
"processing_time": processing_time,
"total_documents": len(self.documents),
"compression_enabled": self.config.enable_compression
}
except Exception as e:
logger.error(f"Error adding documents: {e}")
return {"error": str(e)}
def _build_search_index(self):
"""Build search index"""
try:
if self.embeddings is None or len(self.embeddings) == 0:
return
# Simple index using scipy for similarity search
# In production, consider more sophisticated indexing
from sklearn.neighbors import NearestNeighbors
# Use cosine similarity
self.index = NearestNeighbors(
metric='cosine',
algorithm='brute'
)
self.index.fit(self.embeddings)
except Exception as e:
logger.error(f"Error building search index: {e}")
self.index = None
def search(
self,
query: str,
k: int = 10,
similarity_threshold: float = 0.7
) -> Dict[str, Any]:
"""Search for similar documents"""
if self.index is None or self.embeddings is None:
return {"error": "No search index available"}
try:
start_time = time.time()
# Get query embedding
query_embedding = self.embedding_manager.get_embedding(query).reshape(1, -1)
# Perform search
distances, indices = self.index.kneighbors(
query_embedding,
n_neighbors=min(k, len(self.documents))
)
# Process results
results = []
for i, (distance, idx) in enumerate(zip(distances[0], indices[0])):
similarity = 1.0 - distance # Convert cosine distance to similarity
if similarity >= similarity_threshold:
doc = self.documents[idx]
results.append({
"id": doc["id"],
"content": doc["content"],
"metadata": doc["metadata"],
"similarity": similarity,
"distance": float(distance),
"index": idx
})
processing_time = time.time() - start_time
return {
"query": query,
"results": results,
"processing_time": processing_time,
"similarity_threshold": similarity_threshold,
"k": k,
"total_found": len(results)
}
except Exception as e:
logger.error(f"Error during search: {e}")
return {
"query": query,
"error": str(e),
"processing_time": time.time() - start_time
}
def get_stats(self) -> Dict[str, Any]:
"""Get database statistics"""
cache_stats = self.embedding_manager.get_cache_stats()
stats = {
"total_documents": len(self.documents),
"embedding_dimension": self.embeddings.shape[1] if self.embeddings is not None else 0,
"embedding_count": len(self.embeddings) if self.embeddings is not None else 0,
"storage_path": self.config.storage_path,
"enable_compression": self.config.enable_compression,
"enable_mmap": self.config.enable_mmap,
"embedding_cache": cache_stats
}
# Calculate storage size
try:
if self.data_file.exists():
stats["data_file_size_mb"] = self.data_file.stat().st_size / (1024 * 1024)
if self.metadata_file.exists():
stats["metadata_file_size_mb"] = self.metadata_file.stat().st_size / (1024 * 1024)
if self.index_file.exists():
stats["index_file_size_mb"] = self.index_file.stat().st_size / (1024 * 1024)
except Exception as e:
logger.warning(f"Could not calculate storage size: {e}")
return stats
# 6. Demonstration Functions
def demo_faiss_implementation():
"""Demonstrate FAISS vector store"""
print("=== FAISS Vector Store Demo ===")
config = FAISSConfig(
dimension=384, # MiniLM dimension
index_type="HNSW",
metric="INNER_PRODUCT",
m=16
)
vector_store = FAISSVectorStore(config)
# Sample documents
documents = [
"FAISS is a library for efficient similarity search and clustering of dense vectors.",
"Facebook AI developed FAISS for large-scale similarity search in high-dimensional spaces.",
"The library provides implementations of various indexing algorithms for efficient search.",
"FAISS supports both CPU and GPU acceleration for improved performance.",
"It includes algorithms like IVF, HNSW, and Product Quantization for different use cases."
]
# Add documents
result = vector_store.add_documents(documents, train=True)
print(f"Added {result['documents_added']} documents (trained: {result['index_trained']})")
# Search
search_result = vector_store.search("similarity search algorithms", k=3)
print(f"\nSearch results:")
for result_item in search_result["results"]:
print(f"- Similarity: {result_item['similarity']:.3f}")
print(f" Content: {result_item['content'][:100]}...")
# Get stats
stats = vector_store.get_stats()
print(f"\nIndex stats:")
print(f"- Index type: {stats['index_type']}")
print(f"- Total documents: {stats['total_documents']}")
print(f"- Memory usage: {stats.get('memory_usage_mb', 'N/A')} MB")
# Save index
save_result = vector_store.save_index("./faiss_index")
print(f"\nSaved index: {save_result}")
def demo_annoy_implementation():
"""Demonstrate Annoy vector store"""
print("\n=== Annoy Vector Store Demo ===")
vector_store = AnnoyVectorStore(dimension=384, n_trees=20)
# Add documents
documents = [
"Annoy provides approximate nearest neighbor search for large-scale applications.",
"It builds forests of random projection trees to partition the vector space.",
"The library is designed to be memory-efficient and fast for high-dimensional vectors.",
"Annoy supports various distance metrics including Euclidean, Manhattan, and angular."
"It's particularly well-suited for recommendation systems and similarity search."
]
result = vector_store.add_documents(documents)
print(f"Added {result['documents_added']} documents")
# Build index
build_result = vector_store.build_index()
print(f"Built index: {build_result}")
# Search
search_result = vector_store.search("recommendation systems", k=3)
print(f"\nSearch results:")
for result_item in search_result["results"]:
print(f"- Similarity: {result_item.get('similarity', 'N/A')}")
print(f" Content: {result_item['content'][:100]}...")
# Get stats
stats = vector_store.get_stats()
print(f"\nIndex stats:")
print(f"- Dimension: {stats['dimension']}")
print(f"- Total documents: {stats['total_documents']}")
print(f"- Index built: {stats['is_built']}")
def demo_custom_vector_db():
"""Demonstrate custom vector database with compression"""
print("\n=== Custom Vector Database Demo ===")
config = CustomVectorDBConfig(
storage_path="./custom_vector_db",
embedding_model="all-MiniLM-L6-v2",
enable_compression=True
)
vector_db = CustomVectorDB(config)
# Sample documents
documents = [
"Custom vector databases provide flexibility for specialized use cases.",
"Compression techniques allow storing more vectors in limited memory space.",
"Memory-mapped files enable efficient access to large vector collections.",
"Custom indexing algorithms can be optimized for specific data distributions.",
"Integration with existing systems is simplified with custom implementations."
]
# Add documents
result = vector_db.add_documents(documents)
print(f"Added {result['documents_added']} documents")
# Search
search_result = vector_db.search("custom implementations", k=3)
print(f"\nSearch results:")
for result_item in search_result["results"]:
print(f"- Similarity: {result_item['similarity']:.3f}")
print(f" Content: {result_item['content'][:100]}...")
# Get stats
stats = vector_db.get_stats()
print(f"\nDatabase stats:")
print(f"- Total documents: {stats['total_documents']}")
print(f"- Compression enabled: {stats['enable_compression']}")
print(f"- Storage path: {stats['storage_path']}")
def demo_performance_comparison():
"""Demonstrate performance comparison between implementations"""
print("\n=== Performance Comparison Demo ===")
# Sample data for benchmarking
documents = [
f"Vector database document {i}: Demonstrates semantic search capabilities."
for i in range(100)
]
queries = [
"vector search performance",
"semantic similarity algorithms",
"efficient indexing methods"
]
# Test FAISS
try:
print("Testing FAISS...")
faiss_config = FAISSConfig(dimension=384, index_type="Flat")
faiss_store = FAISSVectorStore(faiss_config)
# Add documents
faiss_result = faiss_store.add_documents(documents[:50])
# Benchmark searches
faiss_times = []
for query in queries:
start_time = time.time()
search_result = faiss_store.search(query, k=5)
search_time = time.time() - start_time
faiss_times.append(search_time)
print(f"FAISS - Avg search time: {np.mean(faiss_times):.4f}s")
except Exception as e:
print(f"FAISS test error: {e}")
faiss_times = []
# Test Annoy
try:
print("\nTesting Annoy...")
annoy_store = AnnoyVectorStore(dimension=384, n_trees=20)
# Add documents
annoy_result = annoy_store.add_documents(documents[:50])
annoy_store.build_index()
# Benchmark searches
annoy_times = []
for query in queries:
start_time = time.time()
search_result = annoy_store.search(query, k=5)
search_time = time.time() - start_time
annoy_times.append(search_time)
print(f"Annoy - Avg search time: {np.mean(annoy_times):.4f}s")
except Exception as e:
print(f"Annoy test error: {e}")
annoy_times = []
# Test Custom
try:
print("\nTesting Custom Vector DB...")
custom_config = CustomVectorDBConfig(
storage_path="./perf_test_db",
enable_compression=True
)
custom_store = CustomVectorDB(custom_config)
# Add documents
custom_result = custom_store.add_documents(documents[:50])
# Benchmark searches
custom_times = []
for query in queries:
start_time = time.time()
search_result = custom_store.search(query, k=5)
search_time = time.time() - start_time
custom_times.append(search_time)
print(f"Custom DB - Avg search time: {np.mean(custom_times):.4f}s")
except Exception as e:
print(f"Custom DB test error: {e}")
custom_times = []
# Print comparison
if faiss_times and annoy_times and custom_times:
print("\nPerformance Comparison:")
print(f"FAISS: {np.mean(faiss_times):.4f}s ± {np.std(faiss_times):.4f}s")
print(f"Annoy: {np.mean(annoy_times):.4f}s ± {np.std(annoy_times):.4f}s")
print(f"Custom: {np.mean(custom_times):.4f}s ± {np.std(custom_times):.4f}s")
# Find fastest
all_times = [
("FAISS", faiss_times),
("Annoy", annoy_times),
("Custom", custom_times)
]
fastest = min(all_times, key=lambda x: np.mean(x[1]))
slowest = max(all_times, key=lambda x: np.mean(x[1]))
print(f"\nFastest: {fastest[0]} ({np.mean(fastest[1]):.4f}s)")
print(f"Slowest: {slowest[0]} ({np.mean(slowest[1]):.4f}s)")
print(f"Speed ratio: {np.mean(slowest[1]) / np.mean(fastest[1]):.2f}x")
# Main execution
if __name__ == "__main__":
try:
demo_faiss_implementation()
demo_annoy_implementation()
demo_custom_vector_db()
demo_performance_comparison()
except Exception as e:
print(f"Demo error: {e}")
logger.exception("Demo failed")