Vector Database Samples

Comprehensive vector database examples including Chroma, Pinecone, Weaviate, FAISS, and custom vector solutions for semantic search and RAG applications

💻 Chroma Vector Database python

🟡 intermediate ⭐⭐⭐⭐

Complete Chroma vector database implementation with collections, metadata filtering, similarity search, and production-ready features

⏱️ 40 min 🏷️ chroma, vector-database, embeddings, search
Prerequisites: Python, ChromaDB, sentence-transformers, numpy
# Chroma Vector Database Implementation
# Complete semantic search and document retrieval system

import os
import json
import time
import uuid
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, asdict
from datetime import datetime
import hashlib
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions
import numpy as np
from sentence_transformers import SentenceTransformer
import logging
from pathlib import Path
import pickle
import threading

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 1. Configuration and Setup
@dataclass
class ChromaConfig:
    """Configuration for Chroma vector database"""
    persist_directory: str = "./chroma_db"
    collection_name: str = "default"
    embedding_model: str = "all-MiniLM-L6-v2"
    distance_metric: str = "cosine"  # cosine, euclidean, manhattan
    max_results: int = 10
    batch_size: int = 100
    auto_persist: bool = True
    enable_gpu: bool = False

class EmbeddingManager:
    """Advanced embedding management with multiple models"""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = "cpu"):
        self.model_name = model_name
        self.device = device
        self.model = None
        self.embedding_cache = {}
        self.cache_hits = 0
        self.cache_misses = 0

        self._load_model()

    def _load_model(self):
        """Load the embedding model"""
        try:
            self.model = SentenceTransformer(self.model_name, device=self.device)
            logger.info(f"Loaded embedding model: {self.model_name}")
        except Exception as e:
            logger.error(f"Failed to load model {self.model_name}: {e}")
            raise

    def get_embedding(self, text: str, use_cache: bool = True) -> List[float]:
        """Get embedding for a single text"""
        if not text or not text.strip():
            return [0.0] * 384  # Default dimension for MiniLM

        # Check cache first
        cache_key = hashlib.md5(text.encode()).hexdigest()
        if use_cache and cache_key in self.embedding_cache:
            self.cache_hits += 1
            return self.embedding_cache[cache_key]

        self.cache_misses += 1

        try:
            embedding = self.model.encode(text, convert_to_numpy=True)
            embedding_list = embedding.tolist()

            # Cache the result
            if use_cache:
                self.embedding_cache[cache_key] = embedding_list

                # Limit cache size
                if len(self.embedding_cache) > 10000:
                    # Remove oldest 1000 entries
                    keys_to_remove = list(self.embedding_cache.keys())[:1000]
                    for key in keys_to_remove:
                        del self.embedding_cache[key]

            return embedding_list

        except Exception as e:
            logger.error(f"Error generating embedding: {e}")
            return [0.0] * 384

    def get_batch_embeddings(self, texts: List[str], show_progress: bool = False) -> List[List[float]]:
        """Get embeddings for multiple texts efficiently"""
        if not texts:
            return []

        # Check cache for each text
        uncached_texts = []
        uncached_indices = []
        cached_embeddings = [[] for _ in texts]

        for i, text in enumerate(texts):
            cache_key = hashlib.md5(text.encode()).hexdigest()
            if cache_key in self.embedding_cache:
                cached_embeddings[i] = self.embedding_cache[cache_key]
                self.cache_hits += 1
            else:
                uncached_texts.append(text)
                uncached_indices.append(i)
                self.cache_misses += 1

        # Generate embeddings for uncached texts
        if uncached_texts:
            try:
                if show_progress:
                    logger.info(f"Generating embeddings for {len(uncached_texts)} uncached texts")

                new_embeddings = self.model.encode(
                    uncached_texts,
                    convert_to_numpy=True,
                    show_progress_bar=show_progress
                )

                for i, embedding in enumerate(new_embeddings):
                    embedding_list = embedding.tolist()
                    original_index = uncached_indices[i]
                    cached_embeddings[original_index] = embedding_list

                    # Cache the result
                    cache_key = hashlib.md5(uncached_texts[i].encode()).hexdigest()
                    self.embedding_cache[cache_key] = embedding_list

            except Exception as e:
                logger.error(f"Error generating batch embeddings: {e}")
                # Fill with zeros
                for i, original_index in enumerate(uncached_indices):
                    cached_embeddings[original_index] = [0.0] * 384

        return cached_embeddings

    def get_cache_stats(self) -> Dict[str, Any]:
        """Get cache performance statistics"""
        total_requests = self.cache_hits + self.cache_misses
        hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0

        return {
            "cache_hits": self.cache_hits,
            "cache_misses": self.cache_misses,
            "hit_rate_percent": round(hit_rate, 2),
            "cache_size": len(self.embedding_cache)
        }

# 2. Enhanced Chroma Client
class ChromaVectorDB:
    """Enhanced Chroma vector database with advanced features"""

    def __init__(self, config: ChromaConfig):
        self.config = config
        self.client = None
        self.collection = None
        self.embedding_manager = EmbeddingManager(
            config.embedding_model,
            "cuda" if config.enable_gpu else "cpu"
        )
        self.document_stats = {
            "total_documents": 0,
            "total_embeddings": 0,
            "last_updated": None
        }
        self._initialize_client()
        self._load_or_create_collection()

    def _initialize_client(self):
        """Initialize Chroma client"""
        try:
            # Create persist directory if it doesn't exist
            Path(self.config.persist_directory).mkdir(parents=True, exist_ok=True)

            self.client = chromadb.PersistentClient(
                path=self.config.persist_directory,
                settings=Settings(
                    anonymized_telemetry=False,
                    allow_reset=False
                )
            )
            logger.info(f"Chroma client initialized at {self.config.persist_directory}")

        except Exception as e:
            logger.error(f"Failed to initialize Chroma client: {e}")
            raise

    def _load_or_create_collection(self):
        """Load existing collection or create new one"""
        try:
            # Try to get existing collection
            self.collection = self.client.get_collection(
                name=self.config.collection_name
            )
            logger.info(f"Loaded existing collection: {self.config.collection_name}")

            # Update stats
            self._update_document_stats()

        except Exception:
            # Create new collection with embedding function
            self.collection = self.client.create_collection(
                name=self.config.collection_name,
                embedding_function=None,  # We'll provide our own embeddings
                metadata={
                    "created_at": datetime.now().isoformat(),
                    "embedding_model": self.config.embedding_model,
                    "distance_metric": self.config.distance_metric
                }
            )
            logger.info(f"Created new collection: {self.config.collection_name}")

    def add_documents(
        self,
        documents: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        ids: Optional[List[str]] = None,
        batch_size: Optional[int] = None
    ) -> Dict[str, Any]:
        """Add documents to the collection with advanced features"""
        if not documents:
            return {"success": False, "error": "No documents provided"}

        batch_size = batch_size or self.config.batch_size
        start_time = time.time()

        try:
            # Generate IDs if not provided
            if ids is None:
                ids = [str(uuid.uuid4()) for _ in documents]

            # Prepare metadata
            if metadatas is None:
                metadatas = [{} for _ in documents]
            elif len(metadatas) != len(documents):
                # Pad metadata
                metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]

            # Add timestamps and other metadata
            current_time = datetime.now().isoformat()
            for i in range(len(metadatas)):
                metadatas[i].update({
                    "created_at": current_time,
                    "length": len(documents[i]),
                    "word_count": len(documents[i].split()),
                    "char_count": len(documents[i])
                })

            # Process in batches
            all_ids = []
            total_batches = (len(documents) + batch_size - 1) // batch_size

            for batch_idx in range(total_batches):
                start_idx = batch_idx * batch_size
                end_idx = min(start_idx + batch_size, len(documents))

                batch_docs = documents[start_idx:end_idx]
                batch_metas = metadatas[start_idx:end_idx]
                batch_ids = ids[start_idx:end_idx]

                # Generate embeddings
                embeddings = self.embedding_manager.get_batch_embeddings(
                    batch_docs,
                    show_progress=(batch_idx == 0 and len(documents) > batch_size)
                )

                # Add to Chroma
                self.collection.add(
                    embeddings=embeddings,
                    documents=batch_docs,
                    metadatas=batch_metas,
                    ids=batch_ids
                )

                all_ids.extend(batch_ids)

                logger.info(f"Processed batch {batch_idx + 1}/{total_batches}")

                # Small delay to prevent overwhelming
                if batch_idx < total_batches - 1:
                    time.sleep(0.1)

            # Persist if enabled
            if self.config.auto_persist:
                self.persist()

            # Update stats
            self._update_document_stats()

            processing_time = time.time() - start_time

            return {
                "success": True,
                "ids": all_ids,
                "documents_added": len(documents),
                "processing_time": processing_time,
                "avg_time_per_document": processing_time / len(documents)
            }

        except Exception as e:
            logger.error(f"Error adding documents: {e}")
            return {
                "success": False,
                "error": str(e),
                "documents_attempted": len(documents)
            }

    def search(
        self,
        query: str,
        n_results: Optional[int] = None,
        where: Optional[Dict[str, Any]] = None,
        where_document: Optional[Dict[str, Any]] = None,
        include: List[str] = None,
        include_distances: bool = True
    ) -> Dict[str, Any]:
        """Advanced search with filtering and multiple result formats"""
        start_time = time.time()
        n_results = n_results or self.config.max_results

        if include is None:
            include = ["metadatas", "documents", "distances"] if include_distances else ["metadatas", "documents"]

        try:
            # Get query embedding
            query_embedding = self.embedding_manager.get_embedding(query)

            # Perform search
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=n_results,
                where=where,
                where_document=where_document,
                include=include
            )

            # Process results
            processed_results = {
                "query": query,
                "n_results": n_results,
                "processing_time": time.time() - start_time,
                "total_found": len(results["ids"][0]) if "ids" in results else 0
            }

            # Add requested result types
            if "documents" in results:
                processed_results["documents"] = results["documents"][0]

            if "metadatas" in results:
                processed_results["metadatas"] = results["metadatas"][0]

            if "ids" in results:
                processed_results["ids"] = results["ids"][0]

            if "distances" in results:
                processed_results["distances"] = results["distances"][0]
                # Convert distances to similarities (cosine similarity = 1 - cosine distance)
                similarities = [1 - dist for dist in results["distances"][0]]
                processed_results["similarities"] = similarities

            return processed_results

        except Exception as e:
            logger.error(f"Error during search: {e}")
            return {
                "query": query,
                "error": str(e),
                "processing_time": time.time() - start_time
            }

    def get_similar_documents(
        self,
        document_id: str,
        n_results: int = 5
    ) -> Dict[str, Any]:
        """Find documents similar to a given document"""
        try:
            # Get the document
            doc_result = self.collection.get(ids=[document_id])

            if not doc_result["ids"]:
                return {"error": f"Document with ID {document_id} not found"}

            # Use the document content as query
            document_text = doc_result["documents"][0]
            return self.search(document_text, n_results=n_results)

        except Exception as e:
            logger.error(f"Error finding similar documents: {e}")
            return {"error": str(e)}

    def update_document(
        self,
        document_id: str,
        new_text: Optional[str] = None,
        new_metadata: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Update an existing document"""
        try:
            # Get current document
            current_result = self.collection.get(ids=[document_id])

            if not current_result["ids"]:
                return {"error": f"Document with ID {document_id} not found"}

            # Prepare updates
            updated_text = new_text or current_result["documents"][0]
            updated_metadata = current_result["metadatas"][0].copy()

            if new_metadata:
                updated_metadata.update(new_metadata)

            # Add updated timestamp
            updated_metadata["updated_at"] = datetime.now().isoformat()
            updated_metadata["length"] = len(updated_text)
            updated_metadata["word_count"] = len(updated_text.split())

            # Delete old document
            self.collection.delete(ids=[document_id])

            # Add updated document
            new_embedding = self.embedding_manager.get_embedding(updated_text)

            self.collection.add(
                embeddings=[new_embedding],
                documents=[updated_text],
                metadatas=[updated_metadata],
                ids=[document_id]
            )

            if self.config.auto_persist:
                self.persist()

            self._update_document_stats()

            return {
                "success": True,
                "document_id": document_id,
                "updated_at": updated_metadata["updated_at"]
            }

        except Exception as e:
            logger.error(f"Error updating document: {e}")
            return {"error": str(e)}

    def delete_documents(
        self,
        ids: Optional[List[str]] = None,
        where: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Delete documents by ID or metadata filter"""
        try:
            if ids:
                # Delete by IDs
                deleted_count = len(ids)
                self.collection.delete(ids=ids)

            elif where:
                # Get matching IDs first
                matching_docs = self.collection.get(where=where)
                if matching_docs["ids"]:
                    self.collection.delete(ids=matching_docs["ids"])
                    deleted_count = len(matching_docs["ids"])
                else:
                    deleted_count = 0
            else:
                return {"error": "Either ids or where must be provided"}

            if self.config.auto_persist:
                self.persist()

            self._update_document_stats()

            return {
                "success": True,
                "deleted_count": deleted_count
            }

        except Exception as e:
            logger.error(f"Error deleting documents: {e}")
            return {"error": str(e)}

    def get_collection_stats(self) -> Dict[str, Any]:
        """Get comprehensive collection statistics"""
        try:
            # Basic stats
            count = self.collection.count()

            # Sample some documents to analyze
            sample_docs = self.collection.get(limit=min(100, count))

            # Calculate statistics
            doc_lengths = [len(doc) for doc in sample_docs.get("documents", [])]
            word_counts = [len(doc.split()) for doc in sample_docs.get("documents", [])]

            # Metadata analysis
            all_metadata = sample_docs.get("metadatas", [])
            metadata_keys = set()
            for meta in all_metadata:
                metadata_keys.update(meta.keys())

            # Embedding cache stats
            cache_stats = self.embedding_manager.get_cache_stats()

            stats = {
                "collection_name": self.config.collection_name,
                "total_documents": count,
                "embedding_model": self.config.embedding_model,
                "distance_metric": self.config.distance_metric,
                "sample_stats": {
                    "avg_doc_length": np.mean(doc_lengths) if doc_lengths else 0,
                    "avg_word_count": np.mean(word_counts) if word_counts else 0,
                    "min_doc_length": min(doc_lengths) if doc_lengths else 0,
                    "max_doc_length": max(doc_lengths) if doc_lengths else 0
                },
                "metadata_keys": list(metadata_keys),
                "embedding_cache": cache_stats,
                "document_stats": self.document_stats
            }

            return stats

        except Exception as e:
            logger.error(f"Error getting collection stats: {e}")
            return {"error": str(e)}

    def _update_document_stats(self):
        """Update internal document statistics"""
        try:
            self.document_stats["total_documents"] = self.collection.count()
            self.document_stats["total_embeddings"] = self.document_stats["total_documents"]
            self.document_stats["last_updated"] = datetime.now().isoformat()
        except Exception as e:
            logger.error(f"Error updating stats: {e}")

    def persist(self):
        """Persist the collection to disk"""
        try:
            # Chroma automatically persists with PersistentClient
            # This is more of a consistency check
            logger.info("Collection persisted to disk")
        except Exception as e:
            logger.error(f"Error persisting collection: {e}")

    def reset(self) -> Dict[str, Any]:
        """Reset the entire collection (use with caution!)"""
        try:
            # Delete the collection
            self.client.delete_collection(name=self.config.collection_name)

            # Recreate it
            self._load_or_create_collection()

            # Reset stats
            self.document_stats = {
                "total_documents": 0,
                "total_embeddings": 0,
                "last_updated": None
            }

            return {"success": True, "message": "Collection reset successfully"}

        except Exception as e:
            logger.error(f"Error resetting collection: {e}")
            return {"error": str(e)}

# 3. Advanced Query Operations
class AdvancedQueryOperations:
    """Advanced query operations and semantic search techniques"""

    def __init__(self, vector_db: ChromaVectorDB):
        self.vector_db = vector_db

    def hybrid_search(
        self,
        query: str,
        semantic_weight: float = 0.7,
        keyword_weight: float = 0.3,
        n_results: int = 10,
        keywords: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Hybrid search combining semantic and keyword matching"""
        try:
            # Semantic search
            semantic_results = self.vector_db.search(query, n_results=n_results * 2)

            if not semantic_results.get("documents"):
                return {"error": "No semantic search results found"}

            # Process semantic results
            processed_results = []
            for i, doc in enumerate(semantic_results["documents"]):
                score = semantic_results["similarities"][i] if "similarities" in semantic_results else 1.0

                # Calculate keyword score
                keyword_score = 0.0
                if keywords:
                    doc_lower = doc.lower()
                    keyword_matches = sum(1 for kw in keywords if kw.lower() in doc_lower)
                    keyword_score = keyword_matches / len(keywords)

                # Combine scores
                combined_score = (semantic_weight * score + keyword_weight * keyword_score)

                processed_results.append({
                    "document": doc,
                    "metadata": semantic_results["metadatas"][i],
                    "id": semantic_results["ids"][i],
                    "semantic_score": score,
                    "keyword_score": keyword_score,
                    "combined_score": combined_score
                })

            # Sort by combined score
            processed_results.sort(key=lambda x: x["combined_score"], reverse=True)

            return {
                "query": query,
                "results": processed_results[:n_results],
                "search_weights": {
                    "semantic": semantic_weight,
                    "keyword": keyword_weight
                }
            }

        except Exception as e:
            logger.error(f"Error in hybrid search: {e}")
            return {"error": str(e)}

    def multi_query_search(
        self,
        queries: List[str],
        strategy: str = "average",  # average, best, union
        n_results: int = 10
    ) -> Dict[str, Any]:
        """Search with multiple queries and combine results"""
        try:
            all_results = {}

            # Perform individual searches
            for query in queries:
                results = self.vector_db.search(query, n_results=n_results)
                if "documents" in results:
                    all_results[query] = results

            # Combine results based on strategy
            if strategy == "average":
                return self._average_combine_results(all_results, n_results)
            elif strategy == "best":
                return self._best_combine_results(all_results, n_results)
            elif strategy == "union":
                return self._union_combine_results(all_results, n_results)
            else:
                return {"error": f"Unknown strategy: {strategy}"}

        except Exception as e:
            logger.error(f"Error in multi-query search: {e}")
            return {"error": str(e)}

    def _average_combine_results(self, all_results: Dict, n_results: int) -> Dict[str, Any]:
        """Combine results by averaging similarity scores"""
        doc_scores = {}
        doc_data = {}

        # Collect all scores
        for query, results in all_results.items():
            if "documents" not in results:
                continue

            for i, doc in enumerate(results["documents"]):
                doc_id = results["ids"][i]
                similarity = results["similarities"][i] if "similarities" in results else 1.0

                if doc_id not in doc_scores:
                    doc_scores[doc_id] = []
                    doc_data[doc_id] = {
                        "document": doc,
                        "metadata": results["metadatas"][i],
                        "id": doc_id
                    }

                doc_scores[doc_id].append(similarity)

        # Calculate average scores
        final_results = []
        for doc_id, scores in doc_scores.items():
            avg_score = sum(scores) / len(scores)
            final_results.append({
                **doc_data[doc_id],
                "average_score": avg_score,
                "score_count": len(scores)
            })

        # Sort and return top results
        final_results.sort(key=lambda x: x["average_score"], reverse=True)

        return {
            "queries": list(all_results.keys()),
            "strategy": "average",
            "results": final_results[:n_results]
        }

    def _best_combine_results(self, all_results: Dict, n_results: int) -> Dict[str, Any]:
        """Combine results by taking best score per document"""
        doc_best_score = {}
        doc_data = {}

        for query, results in all_results.items():
            if "documents" not in results:
                continue

            for i, doc in enumerate(results["documents"]):
                doc_id = results["ids"][i]
                similarity = results["similarities"][i] if "similarities" in results else 1.0

                if doc_id not in doc_best_score or similarity > doc_best_score[doc_id]:
                    doc_best_score[doc_id] = similarity
                    doc_data[doc_id] = {
                        "document": doc,
                        "metadata": results["metadatas"][i],
                        "id": doc_id,
                        "best_query": query,
                        "best_score": similarity
                    }

        # Sort and return top results
        final_results = []
        for doc_id, score in doc_best_score.items():
            final_results.append({
                **doc_data[doc_id],
                "best_score": score
            })

        final_results.sort(key=lambda x: x["best_score"], reverse=True)

        return {
            "queries": list(all_results.keys()),
            "strategy": "best",
            "results": final_results[:n_results]
        }

    def _union_combine_results(self, all_results: Dict, n_results: int) -> Dict[str, Any]:
        """Combine all results without deduplication"""
        all_docs = []

        for query, results in all_results.items():
            if "documents" not in results:
                continue

            for i, doc in enumerate(results["documents"]):
                similarity = results["similarities"][i] if "similarities" in results else 1.0

                all_docs.append({
                    "document": doc,
                    "metadata": results["metadatas"][i],
                    "id": results["ids"][i],
                    "source_query": query,
                    "similarity": similarity
                })

        # Sort and return top results
        all_docs.sort(key=lambda x: x["similarity"], reverse=True)

        return {
            "queries": list(all_results.keys()),
            "strategy": "union",
            "results": all_docs[:n_results]
        }

# 4. Document Management and Analytics
class DocumentManager:
    """Advanced document management and analytics"""

    def __init__(self, vector_db: ChromaVectorDB):
        self.vector_db = vector_db
        self.analytics = DocumentAnalytics(vector_db)

    def batch_import_from_directory(
        self,
        directory_path: str,
        file_types: List[str] = None,
        recursive: bool = True,
        batch_size: int = 100
    ) -> Dict[str, Any]:
        """Batch import documents from directory"""
        if file_types is None:
            file_types = [".txt", ".md", ".py", ".js", ".html", ".css"]

        try:
            from pathlib import Path

            directory = Path(directory_path)
            if not directory.exists():
                return {"error": f"Directory not found: {directory_path}"}

            # Find all files
            files_found = []
            if recursive:
                for file_type in file_types:
                    files_found.extend(directory.rglob(f"*{file_type}"))
            else:
                for file_type in file_types:
                    files_found.extend(directory.glob(f"*{file_type}"))

            if not files_found:
                return {"message": "No files found matching the criteria"}

            # Process files
            documents = []
            metadatas = []

            for file_path in files_found:
                try:
                    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                        content = f.read()

                    if content.strip():
                        documents.append(content)
                        metadatas.append({
                            "file_path": str(file_path),
                            "file_name": file_path.name,
                            "file_type": file_path.suffix,
                            "file_size": file_path.stat().st_size,
                            "imported_at": datetime.now().isoformat()
                        })

                except Exception as e:
                    logger.warning(f"Could not read file {file_path}: {e}")

            if not documents:
                return {"message": "No readable content found in files"}

            # Add to vector database
            import_result = self.vector_db.add_documents(
                documents,
                metadatas,
                batch_size=batch_size
            )

            import_result.update({
                "files_scanned": len(files_found),
                "files_imported": len(documents),
                "directory": directory_path
            })

            return import_result

        except Exception as e:
            logger.error(f"Error in batch import: {e}")
            return {"error": str(e)}

    def export_collection(
        self,
        output_path: str,
        include_embeddings: bool = False
    ) -> Dict[str, Any]:
        """Export entire collection to file"""
        try:
            # Get all documents
            all_docs = self.vector_db.collection.get()

            export_data = {
                "metadata": {
                    "collection_name": self.vector_db.config.collection_name,
                    "exported_at": datetime.now().isoformat(),
                    "total_documents": len(all_docs["ids"]),
                    "include_embeddings": include_embeddings
                },
                "documents": []
            }

            # Process each document
            for i in range(len(all_docs["ids"])):
                doc_data = {
                    "id": all_docs["ids"][i],
                    "content": all_docs["documents"][i],
                    "metadata": all_docs["metadatas"][i]
                }

                if include_embeddings:
                    # Get embedding for this document
                    embedding = self.vector_db.embedding_manager.get_embedding(
                        all_docs["documents"][i],
                        use_cache=True
                    )
                    doc_data["embedding"] = embedding

                export_data["documents"].append(doc_data)

            # Save to file
            output_file = Path(output_path)
            output_file.parent.mkdir(parents=True, exist_ok=True)

            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(export_data, f, indent=2, ensure_ascii=False)

            return {
                "success": True,
                "output_path": str(output_file),
                "documents_exported": len(export_data["documents"]),
                "file_size_mb": output_file.stat().st_size / (1024 * 1024)
            }

        except Exception as e:
            logger.error(f"Error exporting collection: {e}")
            return {"error": str(e)}

class DocumentAnalytics:
    """Analytics and insights for document collections"""

    def __init__(self, vector_db: ChromaVectorDB):
        self.vector_db = vector_db

    def generate_insights(self) -> Dict[str, Any]:
        """Generate comprehensive analytics insights"""
        try:
            # Get collection stats
            stats = self.vector_db.get_collection_stats()

            # Get sample documents for analysis
            sample_size = min(1000, stats.get("total_documents", 0))
            sample_docs = self.vector_db.collection.get(limit=sample_size)

            insights = {
                "collection_overview": stats,
                "content_analysis": self._analyze_content(sample_docs),
                "metadata_analysis": self._analyze_metadata(sample_docs),
                "performance_metrics": self._analyze_performance(),
                "recommendations": self._generate_recommendations(stats)
            }

            return insights

        except Exception as e:
            logger.error(f"Error generating insights: {e}")
            return {"error": str(e)}

    def _analyze_content(self, sample_docs: Dict) -> Dict[str, Any]:
        """Analyze document content"""
        try:
            documents = sample_docs.get("documents", [])

            if not documents:
                return {"message": "No documents to analyze"}

            # Content statistics
            lengths = [len(doc) for doc in documents]
            word_counts = [len(doc.split()) for doc in documents]

            # Most common words (simple analysis)
            all_words = []
            for doc in documents:
                all_words.extend(doc.lower().split())

            word_freq = {}
            for word in all_words:
                if len(word) > 3:  # Ignore very short words
                    word_freq[word] = word_freq.get(word, 0) + 1

            top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:20]

            return {
                "document_count": len(documents),
                "content_stats": {
                    "avg_length": sum(lengths) / len(lengths),
                    "median_length": sorted(lengths)[len(lengths) // 2],
                    "min_length": min(lengths),
                    "max_length": max(lengths),
                    "total_characters": sum(lengths)
                },
                "word_stats": {
                    "avg_words_per_doc": sum(word_counts) / len(word_counts),
                    "median_words_per_doc": sorted(word_counts)[len(word_counts) // 2],
                    "min_words_per_doc": min(word_counts),
                    "max_words_per_doc": max(word_counts),
                    "total_words": sum(word_counts)
                },
                "top_words": top_words
            }

        except Exception as e:
            logger.error(f"Error analyzing content: {e}")
            return {"error": str(e)}

    def _analyze_metadata(self, sample_docs: Dict) -> Dict[str, Any]:
        """Analyze document metadata"""
        try:
            metadatas = sample_docs.get("metadatas", [])

            if not metadatas:
                return {"message": "No metadata to analyze"}

            # Analyze metadata fields
            all_keys = set()
            for meta in metadatas:
                all_keys.update(meta.keys())

            # Analyze value types and patterns
            field_analysis = {}
            for key in all_keys:
                values = [meta.get(key) for meta in metadatas if key in meta]
                non_null_values = [v for v in values if v is not None]

                field_analysis[key] = {
                    "present_in_percent": (len(non_null_values) / len(metadatas)) * 100,
                    "unique_values": len(set(str(v) for v in non_null_values)),
                    "sample_values": list(set(str(v) for v in non_null_values[:5]))
                }

                # Try to determine field type
                if all(isinstance(v, str) for v in non_null_values if v is not None):
                    field_analysis[key]["type"] = "string"
                elif all(isinstance(v, (int, float)) for v in non_null_values if v is not None):
                    field_analysis[key]["type"] = "numeric"
                elif all(isinstance(v, bool) for v in non_null_values if v is not None):
                    field_analysis[key]["type"] = "boolean"
                elif all(isinstance(v, list) for v in non_null_values if v is not None):
                    field_analysis[key]["type"] = "list"
                else:
                    field_analysis[key]["type"] = "mixed"

            return {
                "total_fields": len(all_keys),
                "field_names": list(all_keys),
                "field_analysis": field_analysis
            }

        except Exception as e:
            logger.error(f"Error analyzing metadata: {e}")
            return {"error": str(e)}

    def _analyze_performance(self) -> Dict[str, Any]:
        """Analyze performance metrics"""
        try:
            cache_stats = self.vector_db.embedding_manager.get_cache_stats()

            return {
                "embedding_cache": cache_stats,
                "collection_size_mb": self._estimate_collection_size()
            }

        except Exception as e:
            logger.error(f"Error analyzing performance: {e}")
            return {"error": str(e)}

    def _estimate_collection_size(self) -> float:
        """Estimate collection size in MB"""
        try:
            # This is a rough estimation
            doc_count = self.vector_db.collection.count()
            # Assume average document size of 2KB + metadata + embedding (384 * 4 bytes for float32)
            estimated_size_bytes = doc_count * (2048 + 1536)
            return estimated_size_bytes / (1024 * 1024)  # Convert to MB

        except Exception as e:
            logger.error(f"Error estimating collection size: {e}")
            return 0.0

    def _generate_recommendations(self, stats: Dict) -> List[str]:
        """Generate optimization recommendations"""
        recommendations = []

        doc_count = stats.get("total_documents", 0)

        if doc_count > 100000:
            recommendations.append("Consider sharding the collection for better performance")

        if doc_count > 10000:
            recommendations.append("Implement pagination for large result sets")

        cache_hit_rate = stats.get("embedding_cache", {}).get("hit_rate_percent", 0)
        if cache_hit_rate < 50:
            recommendations.append("Consider implementing query result caching for frequently searched content")

        # Check for missing metadata
        if "sample_stats" in stats:
            avg_doc_length = stats["sample_stats"].get("avg_doc_length", 0)
            if avg_doc_length > 5000:
                recommendations.append("Consider text chunking for very long documents to improve search precision")

        if not recommendations:
            recommendations.append("Collection appears to be well-optimized")

        return recommendations

# 5. Demonstration Functions
def demo_basic_chroma_usage():
    """Demonstrate basic Chroma usage"""
    print("=== Basic Chroma Usage Demo ===")

    # Configuration
    config = ChromaConfig(
        persist_directory="./demo_chroma_db",
        collection_name="demo_collection",
        embedding_model="all-MiniLM-L6-v2"
    )

    # Initialize vector database
    vector_db = ChromaVectorDB(config)

    # Sample documents
    documents = [
        "Python is a high-level programming language known for its simplicity and readability.",
        "Machine learning algorithms can automatically learn from data without being explicitly programmed.",
        "React is a JavaScript library for building user interfaces, particularly web applications.",
        "Artificial intelligence aims to create machines that can perform tasks requiring human intelligence.",
        "Data science combines statistics, mathematics, and computer science to extract insights from data."
    ]

    # Add documents
    result = vector_db.add_documents(documents)
    print(f"Added {result['documents_added']} documents successfully")

    # Search
    search_result = vector_db.search("programming languages", n_results=3)
    print(f"\nSearch results for 'programming languages':")
    for i, doc in enumerate(search_result.get("documents", [])):
        similarity = search_result.get("similarities", [])[i]
        print(f"{i+1}. {doc[:100]}... (similarity: {similarity:.3f})")

    # Get collection stats
    stats = vector_db.get_collection_stats()
    print(f"\nCollection stats:")
    print(f"- Total documents: {stats['total_documents']}")
    print(f"- Embedding model: {stats['embedding_model']}")

def demo_advanced_search():
    """Demonstrate advanced search operations"""
    print("\n=== Advanced Search Demo ===")

    config = ChromaConfig(
        persist_directory="./demo_chroma_db",
        collection_name="advanced_demo"
    )

    vector_db = ChromaVectorDB(config)
    advanced_ops = AdvancedQueryOperations(vector_db)

    # Add sample documents with metadata
    documents = [
        "Machine learning is a subset of artificial intelligence that enables systems to learn and improve from experience.",
        "Deep learning uses neural networks with multiple layers to progressively extract higher-level features from raw input.",
        "Natural language processing allows computers to understand, interpret, and generate human language.",
        "Computer vision enables machines to interpret and make decisions based on visual data from the world.",
        "Reinforcement learning trains agents to make sequences of decisions in an environment to maximize reward."
    ]

    metadatas = [
        {"category": "ML", "difficulty": "intermediate", "tags": ["AI", "learning"]},
        {"category": "DL", "difficulty": "advanced", "tags": ["AI", "neural"]},
        {"category": "NLP", "difficulty": "intermediate", "tags": ["AI", "language"]},
        {"category": "CV", "difficulty": "advanced", "tags": ["AI", "vision"]},
        {"category": "RL", "difficulty": "advanced", "tags": ["AI", "decision"]}
    ]

    vector_db.add_documents(documents, metadatas)

    # Hybrid search
    hybrid_result = advanced_ops.hybrid_search(
        "artificial intelligence",
        keywords=["learning", "systems"],
        n_results=5
    )

    print("Hybrid search results:")
    for result in hybrid_result["results"]:
        print(f"- Combined score: {result['combined_score']:.3f}")
        print(f"  Semantic: {result['semantic_score']:.3f}, Keyword: {result['keyword_score']:.3f}")
        print(f"  {result['document'][:100]}...\n")

    # Multi-query search
    multi_result = advanced_ops.multi_query_search(
        ["machine learning", "neural networks", "computer algorithms"],
        strategy="average"
    )

    print("Multi-query search results (average strategy):")
    for result in multi_result["results"][:3]:
        print(f"- Score: {result['average_score']:.3f}")
        print(f"  {result['document'][:100]}...\n")

def demo_document_management():
    """Demonstrate document management features"""
    print("\n=== Document Management Demo ===")

    config = ChromaConfig(
        persist_directory="./demo_chroma_db",
        collection_name="management_demo"
    )

    vector_db = ChromaVectorDB(config)
    doc_manager = DocumentManager(vector_db)

    # Add some documents
    documents = [
        "Initial document about web development.",
        "Another document about database systems."
    ]

    result = vector_db.add_documents(documents)
    doc_ids = result["ids"]

    print(f"Added documents with IDs: {doc_ids}")

    # Update first document
    update_result = vector_db.update_document(
        doc_ids[0],
        new_text="Updated document about modern web development frameworks including React, Vue, and Angular.",
        new_metadata={"category": "web", "updated": True}
    )

    print(f"Document update result: {update_result}")

    # Search similar documents
    similar_result = vector_db.get_similar_documents(doc_ids[0])
    print(f"Similar documents to updated doc:")
    for doc in similar_result.get("documents", [])[:3]:
        print(f"- {doc[:100]}...")

    # Get analytics
    analytics = DocumentAnalytics(vector_db)
    insights = analytics.generate_insights()

    print(f"\nAnalytics insights:")
    print(f"- Total documents: {insights['collection_overview']['total_documents']}")
    print(f"- Recommendations: {insights['recommendations']}")

def demo_batch_import():
    """Demonstrate batch import functionality"""
    print("\n=== Batch Import Demo ===")

    config = ChromaConfig(
        persist_directory="./demo_chroma_db",
        collection_name="batch_import_demo"
    )

    vector_db = ChromaVectorDB(config)
    doc_manager = DocumentManager(vector_db)

    # Create sample directory with files
    import tempfile
    import os

    with tempfile.TemporaryDirectory() as temp_dir:
        # Create sample files
        sample_files = {
            "python_basics.txt": "Python is a versatile programming language used for web development, data science, AI, and automation.",
            "javascript_intro.md": "JavaScript is the language of the web, enabling interactive and dynamic web applications.",
            "data_science.py": "Data science combines statistics, programming, and domain knowledge to extract insights from data."
        }

        for filename, content in sample_files.items():
            file_path = os.path.join(temp_dir, filename)
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(content)

        # Import from directory
        import_result = doc_manager.batch_import_from_directory(
            temp_dir,
            file_types=[".txt", ".md", ".py"]
        )

        print(f"Batch import result:")
        print(f"- Files scanned: {import_result['files_scanned']}")
        print(f"- Files imported: {import_result['files_imported']}")
        print(f"- Documents added: {import_result.get('documents_added', 0)}")

        # Search the imported content
        search_result = vector_db.search("programming language", n_results=3)
        print(f"\nSearch results:")
        for doc in search_result.get("documents", []):
            print(f"- {doc}")

# Main execution
if __name__ == "__main__":
    try:
        demo_basic_chroma_usage()
        demo_advanced_search()
        demo_document_management()
        demo_batch_import()
    except Exception as e:
        print(f"Demo error: {e}")
        logger.exception("Demo failed")

💻 Pinecone and Weaviate Vector Databases python

🔴 complex ⭐⭐⭐⭐⭐

Cloud-native vector database implementations with Pinecone and Weaviate for production-scale semantic search and AI applications

⏱️ 50 min 🏷️ pinecone, weaviate, vector-database, cloud
Prerequisites: Python, Pinecone API, Weaviate, sentence-transformers
# Pinecone and Weaviate Vector Database Implementation
# Production-ready cloud-native vector databases for semantic search

import os
import json
import time
import uuid
import hashlib
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
import asyncio
import aiohttp
import requests
from sentence_transformers import SentenceTransformer
import logging
from pathlib import Path
import pickle
import threading
from concurrent.futures import ThreadPoolExecutor
import backoff

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 1. Configuration Classes
@dataclass
class PineconeConfig:
    """Configuration for Pinecone vector database"""
    api_key: str
    environment: str = "us-west1-gcp-free"
    project_name: str = "default"
    index_name: str = "semantic-search"
    dimension: int = 1536  # OpenAI embedding dimension
    metric: str = "cosine"  # cosine, euclidean, dotproduct
    pods: int = 1
    replicas: int = 1
    pod_type: str = "p1.x1"
    metadata_config: Optional[Dict[str, Any]] = None

@dataclass
class WeaviateConfig:
    """Configuration for Weaviate vector database"""
    url: str = "http://localhost:8080"
    api_key: Optional[str] = None
    batch_size: int = 100
    timeout: int = 30
    grpc_port: int = 50051
    class_name: str = "Document"
    vectorizer: str = "none"  # We'll provide our own embeddings

class EmbeddingManager:
    """Thread-safe embedding management with caching and batch processing"""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = "cpu"):
        self.model_name = model_name
        self.device = device
        self.model = None
        self.embedding_cache = {}
        self.cache_lock = threading.Lock()
        self.cache_hits = 0
        self.cache_misses = 0
        self.batch_size = 32

        self._load_model()

    def _load_model(self):
        """Load the embedding model"""
        try:
            self.model = SentenceTransformer(self.model_name, device=self.device)
            logger.info(f"Loaded embedding model: {self.model_name}")
        except Exception as e:
            logger.error(f"Failed to load model {self.model_name}: {e}")
            raise

    def get_embedding(self, text: str, use_cache: bool = True) -> List[float]:
        """Get embedding for a single text (thread-safe)"""
        if not text or not text.strip():
            return [0.0] * self.model.get_sentence_embedding_dimension()

        # Check cache
        if use_cache:
            cache_key = hashlib.md5(text.encode()).hexdigest()
            with self.cache_lock:
                if cache_key in self.embedding_cache:
                    self.cache_hits += 1
                    return self.embedding_cache[cache_key]

        self.cache_misses += 1

        try:
            embedding = self.model.encode(text, convert_to_numpy=True)
            embedding_list = embedding.tolist()

            # Cache result
            if use_cache:
                with self.cache_lock:
                    cache_key = hashlib.md5(text.encode()).hexdigest()
                    self.embedding_cache[cache_key] = embedding_list

                    # Limit cache size
                    if len(self.embedding_cache) > 10000:
                        keys_to_remove = list(self.embedding_cache.keys())[:1000]
                        for key in keys_to_remove:
                            del self.embedding_cache[key]

            return embedding_list

        except Exception as e:
            logger.error(f"Error generating embedding: {e}")
            dim = self.model.get_sentence_embedding_dimension()
            return [0.0] * dim

    def get_batch_embeddings(self, texts: List[str], show_progress: bool = False) -> List[List[float]]:
        """Get embeddings for multiple texts efficiently"""
        if not texts:
            return []

        # Check cache for each text
        uncached_texts = []
        uncached_indices = []
        cached_embeddings = [[] for _ in texts]

        with self.cache_lock:
            for i, text in enumerate(texts):
                cache_key = hashlib.md5(text.encode()).hexdigest()
                if cache_key in self.embedding_cache:
                    cached_embeddings[i] = self.embedding_cache[cache_key]
                    self.cache_hits += 1
                else:
                    uncached_texts.append(text)
                    uncached_indices.append(i)
                    self.cache_misses += 1

        # Generate embeddings for uncached texts in batches
        if uncached_texts:
            try:
                all_embeddings = []

                # Process in smaller batches for memory efficiency
                for i in range(0, len(uncached_texts), self.batch_size):
                    batch_texts = uncached_texts[i:i + self.batch_size]

                    if show_progress:
                        logger.info(f"Processing batch {i//self.batch_size + 1}/{(len(uncached_texts) + self.batch_size - 1)//self.batch_size}")

                    batch_embeddings = self.model.encode(
                        batch_texts,
                        convert_to_numpy=True,
                        show_progress_bar=False
                    )

                    for embedding in batch_embeddings:
                        all_embeddings.append(embedding.tolist())

                    # Small delay to prevent overwhelming
                    time.sleep(0.01)

                # Place embeddings back in correct positions
                for i, embedding in enumerate(all_embeddings):
                    original_index = uncached_indices[i]
                    cached_embeddings[original_index] = embedding

                    # Cache the result
                    with self.cache_lock:
                        cache_key = hashlib.md5(uncached_texts[i].encode()).hexdigest()
                        self.embedding_cache[cache_key] = embedding

            except Exception as e:
                logger.error(f"Error generating batch embeddings: {e}")
                # Fill with zeros
                dim = self.model.get_sentence_embedding_dimension()
                for original_index in uncached_indices:
                    cached_embeddings[original_index] = [0.0] * dim

        return cached_embeddings

    def get_cache_stats(self) -> Dict[str, Any]:
        """Get cache performance statistics"""
        with self.cache_lock:
            total_requests = self.cache_hits + self.cache_misses
            hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0

            return {
                "cache_hits": self.cache_hits,
                "cache_misses": self.cache_misses,
                "hit_rate_percent": round(hit_rate, 2),
                "cache_size": len(self.embedding_cache)
            }

# 2. Pinecone Implementation
class PineconeVectorDB:
    """Production-ready Pinecone vector database implementation"""

    def __init__(self, config: PineconeConfig):
        self.config = config
        self.embedding_manager = EmbeddingManager()
        self.index = None
        self._initialize_client()
        self._ensure_index_exists()

    def _initialize_client(self):
        """Initialize Pinecone client"""
        try:
            import pinecone

            pinecone.init(
                api_key=self.config.api_key,
                environment=self.config.environment
            )

            logger.info("Pinecone client initialized")

        except Exception as e:
            logger.error(f"Failed to initialize Pinecone client: {e}")
            raise

    def _ensure_index_exists(self):
        """Ensure index exists, create if it doesn't"""
        try:
            import pinecone

            if self.config.index_name not in pinecone.list_indexes():
                logger.info(f"Creating new index: {self.config.index_name}")

                pinecone.create_index(
                    name=self.config.index_name,
                    dimension=self.config.dimension,
                    metric=self.config.metric,
                    pods=self.config.pods,
                    replicas=self.config.replicas,
                    pod_type=self.config.pod_type,
                    metadata_config=self.config.metadata_config
                )

                # Wait for index to be ready
                self._wait_for_index_ready()
            else:
                logger.info(f"Using existing index: {self.config.index_name}")

            # Connect to index
            self.index = pinecone.Index(self.config.index_name)

        except Exception as e:
            logger.error(f"Failed to ensure index exists: {e}")
            raise

    @backoff.on_exception(backoff.expo, Exception, max_tries=5)
    def _wait_for_index_ready(self):
        """Wait for index to be ready with exponential backoff"""
        import pinecone

        while self.config.index_name not in pinecone.list_indexes():
            logger.info("Waiting for index to be ready...")
            time.sleep(5)

        # Check index stats
        stats = pinecone.describe_index(self.config.index_name)
        logger.info(f"Index ready: {stats}")

    def upsert_documents(
        self,
        documents: List[str],
        ids: Optional[List[str]] = None,
        metadatas: Optional[List[Dict[str, Any]]] = None,
        namespace: str = "",
        batch_size: int = 100
    ) -> Dict[str, Any]:
        """Upsert documents to Pinecone index"""
        if not documents:
            return {"success": False, "error": "No documents provided"}

        start_time = time.time()

        try:
            # Generate IDs if not provided
            if ids is None:
                ids = [str(uuid.uuid4()) for _ in documents]

            # Generate embeddings
            embeddings = self.embedding_manager.get_batch_embeddings(documents)

            # Prepare metadata
            if metadatas is None:
                metadatas = [{} for _ in documents]
            elif len(metadatas) != len(documents):
                metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]

            # Add timestamps
            current_time = datetime.now().isoformat()
            for i in range(len(metadatas)):
                metadatas[i].update({
                    "created_at": current_time,
                    "length": len(documents[i]),
                    "word_count": len(documents[i].split())
                })

            # Prepare vectors for upsert
            vectors = []
            for i, (doc_id, embedding, doc_text, metadata) in enumerate(zip(ids, embeddings, documents, metadatas)):
                vectors.append({
                    "id": doc_id,
                    "values": embedding,
                    "metadata": {
                        **metadata,
                        "text": doc_text
                    }
                })

            # Upsert in batches
            total_upserted = 0
            all_ids = []

            for i in range(0, len(vectors), batch_size):
                batch_vectors = vectors[i:i + batch_size]

                # Upsert batch
                self.index.upsert(
                    vectors=batch_vectors,
                    namespace=namespace
                )

                total_upserted += len(batch_vectors)
                all_ids.extend([v["id"] for v in batch_vectors])

                logger.info(f"Upserted batch {i//batch_size + 1}/{(len(vectors) + batch_size - 1)//batch_size}")

                # Small delay to avoid rate limits
                if i < len(vectors) - batch_size:
                    time.sleep(0.1)

            processing_time = time.time() - start_time

            return {
                "success": True,
                "ids": all_ids,
                "documents_upserted": total_upserted,
                "processing_time": processing_time,
                "avg_time_per_document": processing_time / len(documents),
                "namespace": namespace
            }

        except Exception as e:
            logger.error(f"Error upserting documents: {e}")
            return {
                "success": False,
                "error": str(e),
                "documents_attempted": len(documents)
            }

    def search(
        self,
        query: str,
        top_k: int = 10,
        namespace: str = "",
        include_metadata: bool = True,
        filter_dict: Optional[Dict[str, Any]] = None,
        include_values: bool = True
    ) -> Dict[str, Any]:
        """Search for similar documents"""
        start_time = time.time()

        try:
            # Get query embedding
            query_embedding = self.embedding_manager.get_embedding(query)

            # Prepare search parameters
            search_params = {
                "vector": query_embedding,
                "top_k": top_k,
                "include_metadata": include_metadata,
                "namespace": namespace
            }

            if include_values:
                search_params["include_values"] = include_values

            if filter_dict:
                search_params["filter"] = filter_dict

            # Perform search
            results = self.index.query(**search_params)

            # Process results
            processing_time = time.time() - start_time

            processed_results = {
                "query": query,
                "namespace": namespace,
                "processing_time": processing_time,
                "results": []
            }

            if "matches" in results and results["matches"]:
                for match in results["matches"]:
                    result_item = {
                        "id": match["id"],
                        "score": match["score"],
                        "metadata": match.get("metadata", {})
                    }

                    if "values" in match:
                        result_item["values"] = match["values"]

                    processed_results["results"].append(result_item)

            processed_results["total_found"] = len(processed_results["results"])

            return processed_results

        except Exception as e:
            logger.error(f"Error during search: {e}")
            return {
                "query": query,
                "error": str(e),
                "processing_time": time.time() - start_time
            }

    def delete_documents(
        self,
        ids: List[str],
        namespace: str = "",
        delete_all: bool = False
    ) -> Dict[str, Any]:
        """Delete documents from index"""
        try:
            if delete_all:
                # Delete all documents in namespace
                self.index.delete(namespace=namespace, delete_all=True)
                deleted_count = "all"
            else:
                # Delete specific documents
                self.index.delete(ids=ids, namespace=namespace)
                deleted_count = len(ids)

            return {
                "success": True,
                "deleted_count": deleted_count,
                "namespace": namespace
            }

        except Exception as e:
            logger.error(f"Error deleting documents: {e}")
            return {"error": str(e)}

    def get_index_stats(self) -> Dict[str, Any]:
        """Get index statistics"""
        try:
            import pinecone

            # Describe index
            index_description = pinecone.describe_index(self.config.index_name)

            # Get index statistics
            index_stats = self.index.describe_index_stats()

            # Get embedding cache stats
            cache_stats = self.embedding_manager.get_cache_stats()

            return {
                "index_name": self.config.index_name,
                "dimension": self.config.dimension,
                "metric": self.config.metric,
                "pods": self.config.pods,
                "replicas": self.config.replicas,
                "pod_type": self.config.pod_type,
                "total_vector_count": index_stats.get("totalVectorCount", 0),
                "vector_count_per_pod": index_stats.get("dimension", 0),
                "index_fullness": index_stats.get("indexFullness", 0),
                "description": index_description,
                "embedding_cache": cache_stats
            }

        except Exception as e:
            logger.error(f"Error getting index stats: {e}")
            return {"error": str(e)}

# 3. Weaviate Implementation
class WeaviateVectorDB:
    """Production-ready Weaviate vector database implementation"""

    def __init__(self, config: WeaviateConfig):
        self.config = config
        self.embedding_manager = EmbeddingManager()
        self.client = None
        self._initialize_client()
        self._ensure_schema_exists()

    def _initialize_client(self):
        """Initialize Weaviate client"""
        try:
            import weaviate

            client_config = {
                "url": self.config.url,
                "timeout_config": (5, 15, 300, 300),  # (connect, read, write, init)
            }

            if self.config.api_key:
                client_config["auth_client_secret"] = weaviate.AuthApiKey(
                    api_key=self.config.api_key
                )

            self.client = weaviate.Client(client_config)
            logger.info("Weaviate client initialized")

        except Exception as e:
            logger.error(f"Failed to initialize Weaviate client: {e}")
            raise

    def _ensure_schema_exists(self):
        """Ensure schema exists, create if it doesn't"""
        try:
            # Check if class exists
            if not self.client.schema.exists(self.config.class_name):
                logger.info(f"Creating new class: {self.config.class_name}")

                # Define schema
                class_obj = {
                    "class": self.config.class_name,
                    "description": "Document class for semantic search",
                    "vectorizer": self.config.vectorizer,
                    "moduleConfig": {
                        "text2vec-openai": None  # We're providing our own embeddings
                    },
                    "properties": [
                        {
                            "name": "content",
                            "dataType": ["text"],
                            "description": "Document content"
                        },
                        {
                            "name": "title",
                            "dataType": ["text"],
                            "description": "Document title"
                        },
                        {
                            "name": "category",
                            "dataType": ["string"],
                            "description": "Document category"
                        },
                        {
                            "name": "created_at",
                            "dataType": ["date"],
                            "description": "Creation timestamp"
                        },
                        {
                            "name": "length",
                            "dataType": ["int"],
                            "description": "Document length"
                        },
                        {
                            "name": "word_count",
                            "dataType": ["int"],
                            "description": "Word count"
                        }
                    ]
                }

                self.client.schema.create_class(class_obj)
                logger.info(f"Created class: {self.config.class_name}")
            else:
                logger.info(f"Using existing class: {self.config.class_name}")

        except Exception as e:
            logger.error(f"Failed to ensure schema exists: {e}")
            raise

    def add_documents(
        self,
        documents: List[str],
        titles: Optional[List[str]] = None,
        categories: Optional[List[str]] = None,
        batch_size: Optional[int] = None
    ) -> Dict[str, Any]:
        """Add documents to Weaviate"""
        if not documents:
            return {"success": False, "error": "No documents provided"}

        batch_size = batch_size or self.config.batch_size
        start_time = time.time()

        try:
            # Generate embeddings
            embeddings = self.embedding_manager.get_batch_embeddings(documents)

            # Prepare data objects
            data_objects = []
            for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
                data_object = {
                    "class": self.config.class_name,
                    "vector": embedding,
                    "properties": {
                        "content": doc,
                        "created_at": datetime.now().isoformat(),
                        "length": len(doc),
                        "word_count": len(doc.split())
                    }
                }

                # Add title if provided
                if titles and i < len(titles):
                    data_object["properties"]["title"] = titles[i]

                # Add category if provided
                if categories and i < len(categories):
                    data_object["properties"]["category"] = categories[i]

                data_objects.append(data_object)

            # Batch import
            all_ids = []
            total_imported = 0

            for i in range(0, len(data_objects), batch_size):
                batch_data = data_objects[i:i + batch_size]

                # Import batch
                result = self.client.batch.create_objects(batch_data)

                if result and len(result) > 0:
                    batch_ids = []
                    for item in result:
                        if item.get("class") == self.config.class_name:
                            batch_ids.append(item.get("id"))

                    all_ids.extend(batch_ids)
                    total_imported += len(batch_ids)

                logger.info(f"Imported batch {i//batch_size + 1}/{(len(data_objects) + batch_size - 1)//batch_size}")

                # Small delay to avoid overwhelming
                if i < len(data_objects) - batch_size:
                    time.sleep(0.1)

            processing_time = time.time() - start_time

            return {
                "success": True,
                "ids": all_ids,
                "documents_imported": total_imported,
                "processing_time": processing_time,
                "avg_time_per_document": processing_time / len(documents)
            }

        except Exception as e:
            logger.error(f"Error adding documents: {e}")
            return {
                "success": False,
                "error": str(e),
                "documents_attempted": len(documents)
            }

    def search(
        self,
        query: str,
        limit: int = 10,
        filters: Optional[Dict[str, Any]] = None,
        return_properties: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Search for similar documents"""
        start_time = time.time()

        try:
            # Get query embedding
            query_embedding = self.embedding_manager.get_embedding(query)

            # Build near_text query
            search_query = {
                "class": self.config.class_name,
                "nearText": {
                    "concepts": [query],
                    "distance": 0.7
                },
                "limit": limit,
                "autocut": 1,
                "returnMetadata": ["creationTime", "lastUpdateTime"],
                "returnProperties": return_properties or ["content", "title", "category"]
            }

            # Add filters if provided
            if filters:
                search_query["where"] = {
                    "operator": "And",
                    "operands": [
                        {"path": [prop], "operator": "Equal", "valueText": value}
                        for prop, value in filters.items()
                    ]
                }

            # Perform search
            result = self.client.query.get(search_query)

            processing_time = time.time() - start_time

            # Process results
            processed_results = {
                "query": query,
                "processing_time": processing_time,
                "results": []
            }

            if result and "data" in result:
                for item in result["data"]:
                    result_item = {
                        "id": item.get("id"),
                        "class": item.get("class"),
                        "properties": item.get("properties", {}),
                        "metadata": item.get("metadata", {}),
                        "certainty": item.get("certainty", 0.0)
                    }
                    processed_results["results"].append(result_item)

            processed_results["total_found"] = len(processed_results["results"])

            return processed_results

        except Exception as e:
            logger.error(f"Error during search: {e}")
            return {
                "query": query,
                "error": str(e),
                "processing_time": time.time() - start_time
            }

    def update_document(
        self,
        document_id: str,
        new_content: Optional[str] = None,
        new_properties: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Update an existing document"""
        try:
            # Get current document
            current_result = self.client.data_object.get_by_id(
                document_id,
                class_name=self.config.class_name
            )

            if not current_result:
                return {"error": f"Document with ID {document_id} not found"}

            # Prepare update data
            update_data = {
                "id": document_id,
                "class": self.config.class_name
            }

            # Update content if provided
            if new_content:
                new_embedding = self.embedding_manager.get_embedding(new_content)
                update_data["vector"] = new_embedding
                update_data["properties"] = {
                    "content": new_content,
                    "updated_at": datetime.now().isoformat(),
                    "length": len(new_content),
                    "word_count": len(new_content.split())
                }

            # Merge new properties
            if new_properties:
                if "properties" not in update_data:
                    update_data["properties"] = {}
                update_data["properties"].update(new_properties)

            # Perform update
            result = self.client.data_object.update(update_data)

            return {
                "success": True,
                "document_id": document_id,
                "updated_at": datetime.now().isoformat()
            }

        except Exception as e:
            logger.error(f"Error updating document: {e}")
            return {"error": str(e)}

    def delete_document(self, document_id: str) -> Dict[str, Any]:
        """Delete a document"""
        try:
            result = self.client.data_object.delete(
                document_id,
                class_name=self.config.class_name
            )

            return {
                "success": True,
                "document_id": document_id,
                "deleted": True
            }

        except Exception as e:
            logger.error(f"Error deleting document: {e}")
            return {"error": str(e)}

    def get_schema_stats(self) -> Dict[str, Any]:
        """Get schema statistics"""
        try:
            schema = self.client.schema.get()

            # Find our class
            our_class = None
            for class_obj in schema.get("classes", []):
                if class_obj.get("class") == self.config.class_name:
                    our_class = class_obj
                    break

            if not our_class:
                return {"error": f"Class {self.config.class_name} not found"}

            # Get class statistics
            stats = self.client.query.aggregate(self.config.class_name)

            # Get embedding cache stats
            cache_stats = self.embedding_manager.get_cache_stats()

            return {
                "class_name": self.config.class_name,
                "vectorizer": self.config.vectorizer,
                "module_config": our_class.get("moduleConfig", {}),
                "properties": our_class.get("properties", []),
                "total_objects": stats.get("data", {}).get("count", 0),
                "embedding_cache": cache_stats
            }

        except Exception as e:
            logger.error(f"Error getting schema stats: {e}")
            return {"error": str(e)}

# 4. Vector Database Comparison and Benchmarking
class VectorDatabaseComparator:
    """Compare and benchmark different vector databases"""

    def __init__(self):
        self.benchmark_results = {}

    def benchmark_operations(
        self,
        vector_dbs: Dict[str, Any],
        documents: List[str],
        queries: List[str]
    ) -> Dict[str, Any]:
        """Benchmark operations across multiple vector databases"""
        results = {}

        for db_name, db in vector_dbs.items():
            logger.info(f"Benchmarking {db_name}...")

            try:
                db_results = self._benchmark_single_db(db, documents, queries)
                results[db_name] = db_results

            except Exception as e:
                logger.error(f"Error benchmarking {db_name}: {e}")
                results[db_name] = {"error": str(e)}

        self.benchmark_results = results
        return results

    def _benchmark_single_db(self, db: Any, documents: List[str], queries: List[str]) -> Dict[str, Any]:
        """Benchmark a single vector database"""
        results = {}

        # Benchmark indexing
        indexing_start = time.time()

        if hasattr(db, 'upsert_documents'):  # Pinecone
            index_result = db.upsert_documents(documents[:100])  # Limit for benchmark
        elif hasattr(db, 'add_documents'):  # Weaviate
            index_result = db.add_documents(documents[:100])
        else:
            index_result = {"success": False}

        indexing_time = time.time() - indexing_start

        # Benchmark search
        search_times = []
        successful_searches = 0

        for query in queries[:10]:  # Limit queries
            search_start = time.time()

            try:
                if hasattr(db, 'search'):
                    search_result = db.search(query, top_k=5)
                    if search_result and "error" not in search_result:
                        successful_searches += 1

                search_times.append(time.time() - search_start)

            except Exception as e:
                logger.warning(f"Search error for query '{query}': {e}")

        # Calculate statistics
        results = {
            "indexing": {
                "success": index_result.get("success", False),
                "time": indexing_time,
                "docs_per_second": len(documents[:100]) / indexing_time if indexing_time > 0 else 0
            },
            "search": {
                "successful_searches": successful_searches,
                "total_searches": len(queries[:10]),
                "success_rate": successful_searches / len(queries[:10]) * 100,
                "avg_search_time": np.mean(search_times) if search_times else 0,
                "min_search_time": min(search_times) if search_times else 0,
                "max_search_time": max(search_times) if search_times else 0
            }
        }

        return results

    def generate_comparison_report(self) -> Dict[str, Any]:
        """Generate comparison report from benchmark results"""
        if not self.benchmark_results:
            return {"error": "No benchmark results available"}

        report = {
            "summary": {},
            "detailed_results": self.benchmark_results,
            "recommendations": []
        }

        # Analyze results
        indexing_times = []
        search_times = []

        for db_name, results in self.benchmark_results.items():
            if "error" not in results:
                if "indexing" in results:
                    indexing_times.append((db_name, results["indexing"]["time"]))
                if "search" in results:
                    search_times.append((db_name, results["search"]["avg_search_time"]))

        # Find best performing databases
        if indexing_times:
            best_indexing = min(indexing_times, key=lambda x: x[1])
            worst_indexing = max(indexing_times, key=lambda x: x[1])

            report["summary"]["best_indexing"] = best_indexing[0]
            report["summary"]["worst_indexing"] = worst_indexing[0]
            report["summary"]["indexing_speed_ratio"] = worst_indexing[1] / best_indexing[1]

        if search_times:
            best_search = min(search_times, key=lambda x: x[1])
            worst_search = max(search_times, key=lambda x: x[1])

            report["summary"]["best_search"] = best_search[0]
            report["summary"]["worst_search"] = worst_search[0]
            report["summary"]["search_speed_ratio"] = worst_search[1] / best_search[1]

        # Generate recommendations
        recommendations = []

        if indexing_times and search_times:
            # Check if any database is consistently better
            best_overall = None
            best_scores = {}

            for db_name in set([x[0] for x in indexing_times + search_times]):
                idx_time = next((x[1] for x in indexing_times if x[0] == db_name), float('inf'))
                search_time = next((x[1] for x in search_times if x[0] == db_name), float('inf'))

                if idx_time != float('inf') and search_time != float('inf'):
                    score = (1 / idx_time) + (1 / search_time)  # Higher is better
                    best_scores[db_name] = score

            if best_scores:
                best_overall = max(best_scores.items(), key=lambda x: x[1])
                recommendations.append(
                    f"Best overall performance: {best_overall[0]}"
                )

        if recommendations:
            report["recommendations"] = recommendations

        return report

# 5. Production Features
class ProductionFeatures:
    """Production-ready features for vector databases"""

    @staticmethod
    def create_backup(vector_db, backup_path: str) -> Dict[str, Any]:
        """Create backup of vector database"""
        try:
            backup_data = {
                "timestamp": datetime.now().isoformat(),
                "config": vector_db.config if hasattr(vector_db, 'config') else {},
                "documents": [],
                "metadata": {}
            }

            # This is a simplified backup implementation
            # In production, use database-specific backup mechanisms

            if hasattr(vector_db, 'get_index_stats'):  # Pinecone
                stats = vector_db.get_index_stats()
                backup_data["stats"] = stats
            elif hasattr(vector_db, 'get_schema_stats'):  # Weaviate
                stats = vector_db.get_schema_stats()
                backup_data["stats"] = stats

            # Save backup
            backup_file = Path(backup_path)
            backup_file.parent.mkdir(parents=True, exist_ok=True)

            with open(backup_file, 'w', encoding='utf-8') as f:
                json.dump(backup_data, f, indent=2, ensure_ascii=False)

            return {
                "success": True,
                "backup_path": str(backup_file),
                "file_size_mb": backup_file.stat().st_size / (1024 * 1024)
            }

        except Exception as e:
            logger.error(f"Error creating backup: {e}")
            return {"error": str(e)}

    @staticmethod
    def monitor_performance(vector_db, duration_minutes: int = 5) -> Dict[str, Any]:
        """Monitor vector database performance"""
        try:
            start_time = time.time()
            end_time = start_time + (duration_minutes * 60)

            metrics = {
                "query_times": [],
                "error_count": 0,
                "success_count": 0,
                "monitoring_duration": duration_minutes
            }

            test_queries = [
                "test query for performance monitoring",
                "another test query",
                "performance test query"
            ]

            while time.time() < end_time:
                query_start = time.time()

                try:
                    if hasattr(vector_db, 'search'):
                        result = vector_db.search(
                            test_queries[0],
                            top_k=5
                        )

                        if result and "error" not in result:
                            metrics["success_count"] += 1
                        else:
                            metrics["error_count"] += 1

                    query_time = time.time() - query_start
                    metrics["query_times"].append(query_time)

                except Exception as e:
                    metrics["error_count"] += 1
                    logger.warning(f"Monitoring query error: {e}")

                # Wait between queries
                time.sleep(10)

            # Calculate statistics
            if metrics["query_times"]:
                metrics.update({
                    "avg_query_time": np.mean(metrics["query_times"]),
                    "min_query_time": min(metrics["query_times"]),
                    "max_query_time": max(metrics["query_times"]),
                    "p95_query_time": np.percentile(metrics["query_times"], 95),
                    "queries_per_minute": (metrics["success_count"] + metrics["error_count"]) / duration_minutes,
                    "success_rate": (metrics["success_count"] / (metrics["success_count"] + metrics["error_count"]) * 100) if (metrics["success_count"] + metrics["error_count"]) > 0 else 0
                })

            return metrics

        except Exception as e:
            logger.error(f"Error monitoring performance: {e}")
            return {"error": str(e)}

# 6. Demonstration Functions
def demo_pinecone():
    """Demonstrate Pinecone vector database"""
    print("=== Pinecone Vector Database Demo ===")

    # Configuration
    config = PineconeConfig(
        api_key=os.getenv("PINECONE_API_KEY", "your-pinecone-api-key"),
        environment=os.getenv("PINECONE_ENVIRONMENT", "us-west1-gcp-free"),
        index_name="demo-index"
    )

    # Initialize
    vector_db = PineconeVectorDB(config)

    # Sample documents
    documents = [
        "Pinecone is a vector database service for machine learning applications.",
        "Vector databases enable efficient similarity search and recommendation systems.",
        "Embedding-based search powers modern AI applications including semantic search and RAG.",
        "Cloud-native vector databases provide scalability and managed infrastructure for production use.",
        "Real-time search capabilities enable instant similarity matching in large datasets."
    ]

    # Upsert documents
    result = vector_db.upsert_documents(documents)
    print(f"Upserted {result['documents_upserted']} documents")

    # Search
    search_result = vector_db.search("AI applications", top_k=3)
    print(f"\nSearch results:")
    for result_item in search_result["results"]:
        print(f"- Score: {result_item['score']:.3f}")
        print(f"  Content: {result_item['metadata']['text'][:100]}...")

    # Get stats
    stats = vector_db.get_index_stats()
    print(f"\nIndex stats:")
    print(f"- Total vectors: {stats['total_vector_count']}")
    print(f"- Dimension: {stats['dimension']}")

def demo_weaviate():
    """Demonstrate Weaviate vector database"""
    print("\n=== Weaviate Vector Database Demo ===")

    # Configuration
    config = WeaviateConfig(
        url="http://localhost:8080",  # Replace with your Weaviate URL
        class_name="Document"
    )

    try:
        # Initialize
        vector_db = WeaviateVectorDB(config)

        # Sample documents with metadata
        documents = [
            "Weaviate is an open-source vector database that scales with machine learning.",
            "GraphQL API provides flexible querying capabilities for vector databases.",
            "Schema-first design ensures type-safe data management in Weaviate.",
            "Hybrid search combines vector and keyword search for comprehensive results.",
            "Real-time collaboration features enable multiple users to work with vector databases simultaneously."
        ]

        titles = [
            "Weaviate Overview",
            "GraphQL in Weaviate",
            "Schema Design",
            "Search Capabilities",
            "Collaboration Features"
        ]

        categories = [
            "database",
            "api",
            "design",
            "search",
            "features"
        ]

        # Add documents
        result = vector_db.add_documents(documents, titles, categories)
        print(f"Added {result['documents_imported']} documents")

        # Search
        search_result = vector_db.search("vector database", limit=3)
        print(f"\nSearch results:")
        for result_item in search_result["results"]:
            print(f"- Certainty: {result_item['certainty']:.3f}")
            print(f"  Title: {result_item['properties'].get('title', 'No title')}")
            print(f"  Category: {result_item['properties'].get('category', 'No category')}")
            print(f"  Content: {result_item['properties'].get('content', '')[:100]}...")

        # Get schema stats
        stats = vector_db.get_schema_stats()
        print(f"\nSchema stats:")
        print(f"- Total objects: {stats['total_objects']}")
        print(f"- Properties: {len(stats['properties'])}")

    except Exception as e:
        print(f"Weaviate demo error: {e}")
        print("Make sure Weaviate is running at http://localhost:8080")

def demo_comparison():
    """Demonstrate vector database comparison"""
    print("\n=== Vector Database Comparison Demo ===")

    try:
        # Initialize databases (only if credentials are available)
        vector_dbs = {}

        # Pinecone
        if os.getenv("PINECONE_API_KEY"):
            pinecone_config = PineconeConfig(
                api_key=os.getenv("PINECONE_API_KEY"),
                environment="us-west1-gcp-free",
                index_name="comparison-index"
            )
            vector_dbs["Pinecone"] = PineconeVectorDB(pinecone_config)

        # Weaviate (only if running)
        try:
            weaviate_config = WeaviateConfig(
                url="http://localhost:8080",
                class_name="ComparisonDocument"
            )
            vector_dbs["Weaviate"] = WeaviateVectorDB(weaviate_config)
        except:
            print("Weaviate not available for comparison")

        if not vector_dbs:
            print("No vector databases available for comparison")
            return

        # Sample data
        documents = [
            "Vector databases enable efficient similarity search and retrieval in high-dimensional spaces.",
            "Machine learning applications benefit from vector search capabilities for recommendation systems.",
            "Semantic search uses embeddings to understand the meaning behind queries and documents.",
            "Real-time vector search powers modern applications with instant response times.",
            "Scalable vector databases handle millions of vectors with consistent performance."
        ]

        queries = [
            "vector database performance",
            "machine learning applications",
            "semantic search technology"
        ]

        # Benchmark
        comparator = VectorDatabaseComparator()
        results = comparator.benchmark_operations(vector_dbs, documents, queries)

        # Generate report
        report = comparator.generate_comparison_report()

        print("Comparison Results:")
        print(json.dumps(report, indent=2))

    except Exception as e:
        print(f"Comparison demo error: {e}")

def demo_production_features():
    """Demonstrate production features"""
    print("\n=== Production Features Demo ===")

    # Create sample vector DB for demo
    config = PineconeConfig(
        api_key=os.getenv("PINECONE_API_KEY", "demo-key"),
        environment="us-west1-gcp-free",
        index_name="prod-demo-index"
    )

    try:
        vector_db = PineconeVectorDB(config)

        # Performance monitoring
        print("Starting performance monitoring (2 minutes)...")

        # Note: This would run for 2 minutes in a real scenario
        # For demo, we'll just show the structure
        print("Monitoring features include:")
        print("- Query time tracking")
        print("- Success/error rate monitoring")
        print("- Performance statistics")
        print("- Automated alerting")

        # Backup demonstration
        backup_result = ProductionFeatures.create_backup(
            vector_db,
            "./vector_db_backup.json"
        )

        print(f"\nBackup result: {backup_result}")

    except Exception as e:
        print(f"Production features demo error: {e}")

# Main execution
if __name__ == "__main__":
    try:
        demo_pinecone()
        demo_weaviate()
        demo_comparison()
        demo_production_features()
    except Exception as e:
        print(f"Demo error: {e}")
        logger.exception("Demo failed")

💻 FAISS and Custom Vector Solutions python

🔴 complex ⭐⭐⭐⭐⭐

Local vector database implementations using FAISS, Annoy, and custom solutions for privacy-focused applications

⏱️ 45 min 🏷️ faiss, annoy, vector-database, local, embedding
Prerequisites: Python, NumPy, FAISS, Annoy, sentence-transformers
# FAISS and Custom Vector Solutions
# Local vector database implementations for privacy-focused applications

import os
import json
import time
import uuid
import pickle
import numpy as np
import faiss
import threading
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
from pathlib import Path
import logging
import mmap
from sentence_transformers import SentenceTransformer
import h5py
from annoy import AnnoyIndex
import scipy.sparse as sp
import scipy.spatial.distance as spd
import gc

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 1. Configuration Classes
@dataclass
class FAISSConfig:
    """Configuration for FAISS vector index"""
    dimension: int = 384  # MiniLM dimension
    index_type: str = "IVF"  # IVF, HNSW, Flat, IVFPQ, SCANN
    metric: str = "INNER_PRODUCT"  # INNER_PRODUCT, L2, COSINE
    nlist: int = 100  # For IVF
    m: int = 16  # For HNSW
    nbits: int = 8  # For IVFPQ
    use_gpu: bool = False
    gpu_id: int = 0
    batch_size: int = 1000
    nprobe: int = 10  # Search parameter for IVF

@dataclass
class CustomVectorDBConfig:
    """Configuration for custom vector database"""
    storage_path: str = "./vector_db"
    embedding_model: str = "all-MiniLM-L6-v2"
    max_memory_mb: int = 1024
    enable_compression: bool = True
    enable_mmap: bool = True
    auto_save_interval: int = 100  # Save after N operations
    backup_enabled: bool = True
    index_type: str = "faiss"  # faiss, annoy, custom
    distance_metric: str = "cosine"  # cosine, euclidean, manhattan

# 2. Embedding Management
class EfficientEmbeddingManager:
    """Memory-efficient embedding management with advanced caching"""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2", device: str = "cpu"):
        self.model_name = model_name
        self.device = device
        self.model = None
        self.embedding_cache = {}
        self.cache_lock = threading.RLock()
        self.cache_hits = 0
        self.cache_misses = 0
        self.cache_size_mb = 0
        self.max_cache_size_mb = 512  # 512MB cache

        self._load_model()

    def _load_model(self):
        """Load embedding model with memory optimization"""
        try:
            self.model = SentenceTransformer(
                self.model_name,
                device=self.device
            )
            logger.info(f"Loaded embedding model: {self.model_name}")

            # Optimize model for inference
            self.model.eval()
            if hasattr(self.model, "max_seq_length"):
                self.model.max_seq_length = 512  # Limit sequence length

        except Exception as e:
            logger.error(f"Failed to load model {self.model_name}: {e}")
            raise

    def get_embedding(self, text: str, use_cache: bool = True) -> np.ndarray:
        """Get embedding with memory-efficient caching"""
        if not text or not text.strip():
            return np.zeros(self.model.get_sentence_embedding_dimension(), dtype=np.float32)

        # Cache key
        cache_key = hashlib.md5(text.encode('utf-8')).hexdigest()

        # Check cache
        if use_cache:
            with self.cache_lock:
                if cache_key in self.embedding_cache:
                    self.cache_hits += 1
                    return self.embedding_cache[cache_key]

        self.cache_misses += 1

        try:
            # Generate embedding
            embedding = self.model.encode(
                text,
                convert_to_numpy=True,
                show_progress_bar=False,
                normalize_embeddings=True
            )

            # Ensure float32 for memory efficiency
            if embedding.dtype != np.float32:
                embedding = embedding.astype(np.float32)

            # Cache result if within memory limits
            if use_cache:
                with self.cache_lock:
                    self.embedding_cache[cache_key] = embedding
                    self.cache_size_mb += embedding.nbytes / (1024 * 1024)

                    # Clean cache if over limit
                    if self.cache_size_mb > self.max_cache_size_mb:
                        self._clean_cache()

            return embedding

        except Exception as e:
            logger.error(f"Error generating embedding: {e}")
            dim = self.model.get_sentence_embedding_dimension()
            return np.zeros(dim, dtype=np.float32)

    def get_batch_embeddings(self, texts: List[str], show_progress: bool = False) -> np.ndarray:
        """Batch embedding generation with memory management"""
        if not texts:
            return np.array([])

        # Check cache first
        cached_embeddings = []
        uncached_texts = []
        uncached_indices = []

        with self.cache_lock:
            for i, text in enumerate(texts):
                cache_key = hashlib.md5(text.encode('utf-8')).hexdigest()

                if cache_key in self.embedding_cache:
                    cached_embeddings.append(self.embedding_cache[cache_key])
                    self.cache_hits += 1
                else:
                    cached_embeddings.append(None)  # Placeholder
                    uncached_texts.append(text)
                    uncached_indices.append(i)
                    self.cache_misses += 1

        # Generate uncached embeddings
        if uncached_texts:
            try:
                # Process in batches for memory efficiency
                batch_size = 64
                all_uncached_embeddings = []

                for i in range(0, len(uncached_texts), batch_size):
                    batch_texts = uncached_texts[i:i + batch_size]

                    if show_progress:
                        logger.info(f"Processing embedding batch {i//batch_size + 1}/{(len(uncached_texts) + batch_size - 1)//batch_size}")

                    batch_embeddings = self.model.encode(
                        batch_texts,
                        convert_to_numpy=True,
                        show_progress_bar=False,
                        normalize_embeddings=True,
                        batch_size=len(batch_texts)
                    )

                    # Ensure float32
                    if batch_embeddings.dtype != np.float32:
                        batch_embeddings = batch_embeddings.astype(np.float32)

                    all_uncached_embeddings.append(batch_embeddings)

                    # Clear memory
                    del batch_embeddings
                    gc.collect()

                # Combine all uncached embeddings
                if all_uncached_embeddings:
                    all_uncached_embeddings = np.vstack(all_uncached_embeddings)
                else:
                    all_uncached_embeddings = np.array([]).reshape(0, 0)

                # Cache results
                for i, embedding in enumerate(all_uncached_embeddings):
                    original_index = uncached_indices[i]
                    cache_key = hashlib.md5(uncached_texts[i].encode('utf-8')).hexdigest()

                    with self.cache_lock:
                        self.embedding_cache[cache_key] = embedding
                        self.cache_size_mb += embedding.nbytes / (1024 * 1024)

                        # Check cache size
                        if self.cache_size_mb > self.max_cache_size_mb:
                            self._clean_cache()

                # Reconstruct full result array
                full_embeddings = np.zeros((len(texts), all_uncached_embeddings.shape[1] if all_uncached_embeddings.size > 0 else self.model.get_sentence_embedding_dimension()), dtype=np.float32)

                # Fill with cached and newly generated embeddings
                cached_idx = 0
                uncached_idx = 0

                for i in range(len(texts)):
                    if cached_embeddings[i] is not None:
                        full_embeddings[i] = cached_embeddings[i]
                    else:
                        full_embeddings[i] = all_uncached_embeddings[uncached_idx]
                        uncached_idx += 1

                return full_embeddings

            except Exception as e:
                logger.error(f"Error in batch embedding generation: {e}")
                # Return zeros as fallback
                dim = self.model.get_sentence_embedding_dimension()
                return np.zeros((len(texts), dim), dtype=np.float32)

        # All embeddings were cached
        return np.array([emb for emb in cached_embeddings if emb is not None])

    def _clean_cache(self):
        """Clean cache to free memory"""
        # Remove oldest entries
        entries_to_remove = len(self.embedding_cache) // 2
        keys_to_remove = list(self.embedding_cache.keys())[:entries_to_remove]

        removed_bytes = 0
        for key in keys_to_remove:
            if key in self.embedding_cache:
                removed_bytes += self.embedding_cache[key].nbytes
                del self.embedding_cache[key]

        self.cache_size_mb -= removed_bytes / (1024 * 1024)
        logger.info(f"Cleaned cache, removed {len(keys_to_remove)} entries")

    def get_cache_stats(self) -> Dict[str, Any]:
        """Get cache statistics"""
        with self.cache_lock:
            total_requests = self.cache_hits + self.cache_misses
            hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0

            return {
                "cache_hits": self.cache_hits,
                "cache_misses": self.cache_misses,
                "hit_rate_percent": round(hit_rate, 2),
                "cache_size_mb": round(self.cache_size_mb, 2),
                "cache_entries": len(self.embedding_cache),
                "max_cache_size_mb": self.max_cache_size_mb
            }

# 3. FAISS Implementation
class FAISSVectorStore:
    """Production-ready FAISS vector store with advanced features"""

    def __init__(self, config: FAISSConfig):
        self.config = config
        self.embedding_manager = EfficientEmbeddingManager()
        self.index = None
        self.documents = []
        self.metadata = []
        self.ids = []
        self.index_path = None
        self._initialize_index()

    def _initialize_index(self):
        """Initialize FAISS index based on configuration"""
        try:
            dimension = self.config.dimension

            if self.config.index_type == "IVF":
                # Inverted File Index
                quantizer = faiss.IndexFlatIP(dimension)
                self.index = faiss.IndexIVFFlat(quantizer, dimension, self.config.nlist)

            elif self.config.index_type == "HNSW":
                # Hierarchical Navigable Small World
                self.index = faiss.IndexHNSWFlat(dimension, self.config.m)

            elif self.config.index_type == "Flat":
                # Simple flat index
                self.index = faiss.IndexFlatIP(dimension)

            elif self.config.index_type == "IVFPQ":
                # Product Quantization
                quantizer = faiss.IndexFlatIP(dimension)
                self.index = faiss.IndexIVFPQ(quantizer, dimension, self.config.nbits, self.config.nlist)

            else:
                raise ValueError(f"Unsupported index type: {self.config.index_type}")

            logger.info(f"Initialized FAISS index: {self.config.index_type}")

        except Exception as e:
            logger.error(f"Failed to initialize FAISS index: {e}")
            raise

    def add_documents(
        self,
        documents: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        ids: Optional[List[str]] = None,
        train: bool = False
    ) -> Dict[str, Any]:
        """Add documents to the FAISS index"""
        if not documents:
            return {"success": False, "error": "No documents provided"}

        start_time = time.time()

        try:
            # Generate IDs if not provided
            if ids is None:
                ids = [str(uuid.uuid4()) for _ in documents]

            # Generate embeddings
            embeddings = self.embedding_manager.get_batch_embeddings(documents)

            # Prepare metadata
            if metadatas is None:
                metadatas = [{} for _ in documents]
            elif len(metadatas) != len(documents):
                metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]

            # Add timestamps and document info
            current_time = datetime.now().isoformat()
            for i in range(len(metadatas)):
                metadatas[i].update({
                    "created_at": current_time,
                    "length": len(documents[i]),
                    "word_count": len(documents[i].split()),
                    "embedding_norm": float(np.linalg.norm(embeddings[i]))
                })

            # Train index if needed
            if train and self.config.index_type in ["IVF", "IVFPQ"] and len(self.documents) > 0:
                logger.info("Training FAISS index...")
                existing_embeddings = np.array([self.documents[i]["embedding"] for i in range(len(self.documents))])
                all_embeddings = np.vstack([existing_embeddings, embeddings])

                self.index.reset()
                self.index.train(all_embeddings)
                self.index.add(all_embeddings)

                # Add existing documents back
                for i, (doc, meta, doc_id) in enumerate(zip(self.documents, self.metadata, self.ids)):
                    self.index.add(1, self.documents[i]["embedding"].reshape(1, -1))

            else:
                # Add new embeddings
                self.index.add(embeddings)

            # Store documents
            for i, (doc, meta, doc_id, embedding) in enumerate(zip(documents, metadatas, ids, embeddings)):
                self.documents.append({
                    "id": doc_id,
                    "content": doc,
                    "metadata": meta,
                    "embedding": embedding
                })
                self.metadata.append(meta)
                self.ids.append(doc_id)

            processing_time = time.time() - start_time

            return {
                "success": True,
                "ids": ids,
                "documents_added": len(documents),
                "processing_time": processing_time,
                "total_documents": len(self.documents),
                "index_trained": train
            }

        except Exception as e:
            logger.error(f"Error adding documents: {e}")
            return {
                "success": False,
                "error": str(e),
                "documents_attempted": len(documents)
            }

    def search(
        self,
        query: str,
        k: int = 10,
        nprobe: Optional[int] = None
    ) -> Dict[str, Any]:
        """Search for similar documents"""
        start_time = time.time()

        try:
            # Get query embedding
            query_embedding = self.embedding_manager.get_embedding(query)
            query_embedding = query_embedding.reshape(1, -1)

            # Set search parameters
            search_k = min(k, len(self.documents))

            # Perform search
            if self.config.index_type in ["IVF", "IVFPQ"]:
                search_params = {"k": search_k, "nprobe": nprobe or self.config.nprobe}
            else:
                search_params = {"k": search_k}

            distances, indices = self.index.search(query_embedding, **search_params)

            # Process results
            results = []
            for i in range(search_k):
                idx = indices[0][i]
                distance = distances[0][i]

                if 0 <= idx < len(self.documents):
                    doc = self.documents[idx]

                    # Convert distance to similarity
                    if self.config.metric == "INNER_PRODUCT":
                        similarity = float(distance)
                    elif self.config.metric == "L2":
                        similarity = 1.0 / (1.0 + float(distance))
                    else:  # COSINE
                        similarity = float(distance)

                    results.append({
                        "id": doc["id"],
                        "content": doc["content"],
                        "metadata": doc["metadata"],
                        "similarity": similarity,
                        "distance": float(distance),
                        "index": idx
                    })

            processing_time = time.time() - start_time

            return {
                "query": query,
                "results": results,
                "processing_time": processing_time,
                "k": k,
                "total_found": len(results)
            }

        except Exception as e:
            logger.error(f"Error during search: {e}")
            return {
                "query": query,
                "error": str(e),
                "processing_time": time.time() - start_time
            }

    def save_index(self, path: str):
        """Save FAISS index to disk"""
        try:
            save_path = Path(path)
            save_path.parent.mkdir(parents=True, exist_ok=True)

            # Save FAISS index
            faiss.write_index(self.index, str(save_path.with_suffix(".faiss")))

            # Save metadata
            metadata = {
                "config": {
                    "dimension": self.config.dimension,
                    "index_type": self.config.index_type,
                    "metric": self.config.metric,
                    "nlist": self.config.nlist,
                    "m": self.config.m,
                    "nbits": self.config.nbits
                },
                "documents_count": len(self.documents),
                "saved_at": datetime.now().isoformat(),
                "documents": self.documents[:1000]  # Save only first 1000 for file size
            }

            with open(save_path.with_suffix(".json"), 'w', encoding='utf-8') as f:
                json.dump(metadata, f, indent=2, ensure_ascii=False)

            logger.info(f"Saved FAISS index to {save_path}")
            return {"success": True, "path": str(save_path)}

        except Exception as e:
            logger.error(f"Error saving index: {e}")
            return {"error": str(e)}

    def load_index(self, path: str):
        """Load FAISS index from disk"""
        try:
            load_path = Path(path)

            # Load metadata
            with open(load_path.with_suffix(".json"), 'r', encoding='utf-8') as f:
                metadata = json.load(f)

            # Load FAISS index
            self.index = faiss.read_index(str(load_path.with_suffix(".faiss")))

            # Update config
            self.config.dimension = metadata["config"]["dimension"]
            self.config.index_type = metadata["config"]["index_type"]
            self.config.metric = metadata["config"]["metric"]
            self.config.nlist = metadata["config"].get("nlist", 100)
            self.config.m = metadata["config"].get("m", 16)

            logger.info(f"Loaded FAISS index from {load_path}")
            return {"success": True, "documents_count": metadata["documents_count"]}

        except Exception as e:
            logger.error(f"Error loading index: {e}")
            return {"error": str(e)}

    def get_stats(self) -> Dict[str, Any]:
        """Get index statistics"""
        try:
            cache_stats = self.embedding_manager.get_cache_stats()

            stats = {
                "index_type": self.config.index_type,
                "dimension": self.config.dimension,
                "metric": self.config.metric,
                "total_documents": len(self.documents),
                "index_ntotal": self.index.ntotal if hasattr(self.index, 'ntotal') else 0,
                "embedding_cache": cache_stats
            }

            # Add type-specific stats
            if self.config.index_type == "IVF":
                stats["nlist"] = self.config.nlist
            elif self.config.index_type == "HNSW":
                stats["m"] = self.config.m
            elif self.config.index_type == "IVFPQ":
                stats["nbits"] = self.config.nbits
                stats["nlist"] = self.config.nlist

            # Calculate memory usage
            if hasattr(self.index, 'ntotal'):
                memory_bytes = self.index.ntotal * 4  # Assuming float32
                stats["memory_usage_mb"] = memory_bytes / (1024 * 1024)

            return stats

        except Exception as e:
            logger.error(f"Error getting stats: {e}")
            return {"error": str(e)}

# 4. Annoy Implementation
class AnnoyVectorStore:
    """Annoy-based vector store for memory-efficient similarity search"""

    def __init__(self, dimension: int = 384, n_trees: int = 10, metric: str = "angular"):
        self.dimension = dimension
        self.n_trees = n_trees
        self.metric = metric
        self.embedding_manager = EfficientEmbeddingManager()

        # Initialize Annoy index
        self.index = AnnoyIndex(dimension, metric=self.metric)
        self.index.n_trees = self.n_trees

        self.documents = []
        self.metadata = []
        self.ids = []
        self.is_built = False

    def add_documents(
        self,
        documents: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        ids: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Add documents to Annoy index"""
        if not documents:
            return {"success": False, "error": "No documents provided"}

        try:
            # Generate IDs if not provided
            if ids is None:
                ids = [str(uuid.uuid4()) for _ in documents]

            # Generate embeddings
            embeddings = self.embedding_manager.get_batch_embeddings(documents)

            # Prepare metadata
            if metadatas is None:
                metadatas = [{} for _ in documents]
            elif len(metadatas) != len(documents):
                metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]

            # Add timestamps
            current_time = datetime.now().isoformat()
            for i in range(len(metadatas)):
                metadatas[i].update({
                    "created_at": current_time,
                    "length": len(documents[i]),
                    "word_count": len(documents[i].split())
                })

            # Add items to index
            for i, (embedding, doc, meta, doc_id) in enumerate(zip(embeddings, documents, metadatas, ids)):
                self.index.add_item(i, embedding)
                self.documents.append({
                    "id": doc_id,
                    "content": doc,
                    "metadata": meta
                })
                self.metadata.append(meta)
                self.ids.append(doc_id)

            # Mark index as unbuilt
            self.is_built = False

            return {
                "success": True,
                "ids": ids,
                "documents_added": len(documents),
                "total_documents": len(self.documents)
            }

        except Exception as e:
            logger.error(f"Error adding documents to Annoy: {e}")
            return {"error": str(e)}

    def build_index(self, n_jobs: int = -1) -> Dict[str, Any]:
        """Build the Annoy index for search"""
        try:
            if self.is_built:
                return {"success": True, "message": "Index already built"}

            start_time = time.time()

            # Build index
            self.index.build(n_jobs=n_jobs)
            self.is_built = True

            build_time = time.time() - start_time

            return {
                "success": True,
                "build_time": build_time,
                "n_trees": self.n_trees,
                "total_items": len(self.documents)
            }

        except Exception as e:
            logger.error(f"Error building Annoy index: {e}")
            return {"error": str(e)}

    def search(
        self,
        query: str,
        k: int = 10,
        include_distances: bool = True
    ) -> Dict[str, Any]:
        """Search for similar documents"""
        if not self.is_built:
            # Auto-build if not built
            build_result = self.build_index()
            if not build_result["success"]:
                return {"error": "Failed to build index", "details": build_result}

        try:
            # Get query embedding
            query_embedding = self.embedding_manager.get_embedding(query)

            # Perform search
            indices, distances = self.index.get_nns_by_vector(query_embedding, k)

            # Process results
            results = []
            for i in range(len(indices)):
                idx = indices[i]

                if 0 <= idx < len(self.documents):
                    doc = self.documents[idx]

                    result = {
                        "id": doc["id"],
                        "content": doc["content"],
                        "metadata": doc["metadata"],
                        "index": idx
                    }

                    if include_distances:
                        result["distance"] = float(distances[i])
                        # Convert to similarity for cosine/angular distance
                        if self.metric == "angular":
                            result["similarity"] = 1.0 - float(distances[i])
                        elif self.metric == "euclidean":
                            result["similarity"] = 1.0 / (1.0 + float(distances[i]))
                        else:
                            result["similarity"] = 1.0 - float(distances[i])

                    results.append(result)

            return {
                "query": query,
                "results": results,
                "k": k,
                "total_found": len(results)
            }

        except Exception as e:
            logger.error(f"Error searching Annoy index: {e}")
            return {"error": str(e), "query": query}

    def save_index(self, path: str):
        """Save Annoy index to disk"""
        try:
            save_path = Path(path)
            save_path.parent.mkdir(parents=True, exist_ok=True)

            # Save Annoy index
            self.index.save(str(save_path.with_suffix(".annoy")))

            # Save metadata
            metadata = {
                "dimension": self.dimension,
                "n_trees": self.n_trees,
                "metric": self.metric,
                "is_built": self.is_built,
                "documents_count": len(self.documents),
                "saved_at": datetime.now().isoformat(),
                "documents": self.documents[:1000]  # Limit size
            }

            with open(save_path.with_suffix(".json"), 'w', encoding='utf-8') as f:
                json.dump(metadata, f, indent=2, ensure_ascii=False)

            return {"success": True, "path": str(save_path)}

        except Exception as e:
            return {"error": str(e)}

    def load_index(self, path: str):
        """Load Annoy index from disk"""
        try:
            load_path = Path(path)

            # Load metadata
            with open(load_path.with_suffix(".json"), 'r', encoding='utf-8') as f:
                metadata = json.load(f)

            # Load Annoy index
            self.index.load(str(load_path.with_suffix(".annoy")))

            # Update configuration
            self.dimension = metadata["dimension"]
            self.n_trees = metadata["n_trees"]
            self.metric = metadata["metric"]
            self.is_built = metadata["is_built"]

            return {"success": True, "documents_count": metadata["documents_count"]}

        except Exception as e:
            return {"error": str(e)}

    def get_stats(self) -> Dict[str, Any]:
        """Get index statistics"""
        cache_stats = self.embedding_manager.get_cache_stats()

        return {
            "dimension": self.dimension,
            "n_trees": self.n_trees,
            "metric": self.metric,
            "is_built": self.is_built,
            "total_documents": len(self.documents),
            "embedding_cache": cache_stats
        }

# 5. Custom Vector Database with Compression
class CustomVectorDB:
    """Custom vector database with compression and advanced features"""

    def __init__(self, config: CustomVectorDBConfig):
        self.config = config
        self.embedding_manager = EfficientEmbeddingManager(config.embedding_model)
        self.documents = []
        self.embeddings = None
        self.metadata = []
        self.ids = []

        self._initialize_storage()

    def _initialize_storage(self):
        """Initialize storage based on configuration"""
        storage_path = Path(self.config.storage_path)
        storage_path.mkdir(parents=True, exist_ok=True)

        self.data_file = storage_path / "data.h5"
        self.index_file = storage_path / "index.pkl"
        self.metadata_file = storage_path / "metadata.json"

        self._load_existing_data()

    def _load_existing_data(self):
        """Load existing data from disk"""
        try:
            # Load metadata
            if self.metadata_file.exists():
                with open(self.metadata_file, 'r', encoding='utf-8') as f:
                    saved_metadata = json.load(f)

                self.documents = saved_metadata.get("documents", [])
                self.metadata = saved_metadata.get("metadata", [])
                self.ids = saved_metadata.get("ids", [])

                logger.info(f"Loaded {len(self.documents)} existing documents")

            # Load embeddings
            if self.data_file.exists():
                with h5py.File(self.data_file, 'r') as f:
                    if "embeddings" in f:
                        self.embeddings = f["embeddings"][:]
                        logger.info(f"Loaded embeddings with shape: {self.embeddings.shape}")

            # Load index if available
            if self.index_file.exists():
                with open(self.index_file, 'rb') as f:
                    self.index = pickle.load(f)
                    logger.info("Loaded search index")

        except Exception as e:
            logger.warning(f"Could not load existing data: {e}")
            self.documents = []
            self.embeddings = None
            self.metadata = []
            self.ids = []
            self.index = None

    def _save_data(self):
        """Save data to disk"""
        try:
            # Save metadata
            metadata = {
                "documents": self.documents,
                "metadata": self.metadata,
                "ids": self.ids,
                "last_updated": datetime.now().isoformat()
            }

            with open(self.metadata_file, 'w', encoding='utf-8') as f:
                json.dump(metadata, f, indent=2, ensure_ascii=False)

            # Save embeddings
            if self.embeddings is not None:
                with h5py.File(self.data_file, 'w') as f:
                    f.create_dataset("embeddings", data=self.embeddings, compression="gzip")

            # Save index
            if self.index is not None:
                with open(self.index_file, 'wb') as f:
                    pickle.dump(self.index, f)

        except Exception as e:
            logger.error(f"Error saving data: {e}")

    def add_documents(
        self,
        documents: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        ids: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """Add documents with compression"""
        if not documents:
            return {"success": False, "error": "No documents provided"}

        try:
            start_time = time.time()

            # Generate IDs if not provided
            if ids is None:
                ids = [str(uuid.uuid4()) for _ in documents]

            # Generate embeddings
            embeddings = self.embedding_manager.get_batch_embeddings(documents)

            # Prepare metadata
            if metadatas is None:
                metadatas = [{} for _ in documents]
            elif len(metadatas) != len(documents):
                metadatas = metadatas + [{} for _ in range(len(documents) - len(metadatas))]

            # Add timestamps
            current_time = datetime.now.now().isoformat()
            for i in range(len(metadatas)):
                metadatas[i].update({
                    "created_at": current_time,
                    "length": len(documents[i]),
                    "word_count": len(documents[i].split())
                })

            # Add to storage
            for i, (doc, embedding, meta, doc_id) in enumerate(zip(documents, embeddings, metadatas, ids)):
                self.documents.append({
                    "id": doc_id,
                    "content": doc,
                    "metadata": meta
                })
                self.metadata.append(meta)
                self.ids.append(doc_id)

            # Update embeddings matrix
            if self.embeddings is None:
                self.embeddings = embeddings
            else:
                self.embeddings = np.vstack([self.embeddings, embeddings])

            # Rebuild index
            self._build_search_index()

            # Save data
            self._save_data()

            processing_time = time.time() - start_time

            return {
                "success": True,
                "ids": ids,
                "documents_added": len(documents),
                "processing_time": processing_time,
                "total_documents": len(self.documents),
                "compression_enabled": self.config.enable_compression
            }

        except Exception as e:
            logger.error(f"Error adding documents: {e}")
            return {"error": str(e)}

    def _build_search_index(self):
        """Build search index"""
        try:
            if self.embeddings is None or len(self.embeddings) == 0:
                return

            # Simple index using scipy for similarity search
            # In production, consider more sophisticated indexing
            from sklearn.neighbors import NearestNeighbors

            # Use cosine similarity
            self.index = NearestNeighbors(
                metric='cosine',
                algorithm='brute'
            )
            self.index.fit(self.embeddings)

        except Exception as e:
            logger.error(f"Error building search index: {e}")
            self.index = None

    def search(
        self,
        query: str,
        k: int = 10,
        similarity_threshold: float = 0.7
    ) -> Dict[str, Any]:
        """Search for similar documents"""
        if self.index is None or self.embeddings is None:
            return {"error": "No search index available"}

        try:
            start_time = time.time()

            # Get query embedding
            query_embedding = self.embedding_manager.get_embedding(query).reshape(1, -1)

            # Perform search
            distances, indices = self.index.kneighbors(
                query_embedding,
                n_neighbors=min(k, len(self.documents))
            )

            # Process results
            results = []
            for i, (distance, idx) in enumerate(zip(distances[0], indices[0])):
                similarity = 1.0 - distance  # Convert cosine distance to similarity

                if similarity >= similarity_threshold:
                    doc = self.documents[idx]

                    results.append({
                        "id": doc["id"],
                        "content": doc["content"],
                        "metadata": doc["metadata"],
                        "similarity": similarity,
                        "distance": float(distance),
                        "index": idx
                    })

            processing_time = time.time() - start_time

            return {
                "query": query,
                "results": results,
                "processing_time": processing_time,
                "similarity_threshold": similarity_threshold,
                "k": k,
                "total_found": len(results)
            }

        except Exception as e:
            logger.error(f"Error during search: {e}")
            return {
                "query": query,
                "error": str(e),
                "processing_time": time.time() - start_time
            }

    def get_stats(self) -> Dict[str, Any]:
        """Get database statistics"""
        cache_stats = self.embedding_manager.get_cache_stats()

        stats = {
            "total_documents": len(self.documents),
            "embedding_dimension": self.embeddings.shape[1] if self.embeddings is not None else 0,
            "embedding_count": len(self.embeddings) if self.embeddings is not None else 0,
            "storage_path": self.config.storage_path,
            "enable_compression": self.config.enable_compression,
            "enable_mmap": self.config.enable_mmap,
            "embedding_cache": cache_stats
        }

        # Calculate storage size
        try:
            if self.data_file.exists():
                stats["data_file_size_mb"] = self.data_file.stat().st_size / (1024 * 1024)

            if self.metadata_file.exists():
                stats["metadata_file_size_mb"] = self.metadata_file.stat().st_size / (1024 * 1024)

            if self.index_file.exists():
                stats["index_file_size_mb"] = self.index_file.stat().st_size / (1024 * 1024)

        except Exception as e:
            logger.warning(f"Could not calculate storage size: {e}")

        return stats

# 6. Demonstration Functions
def demo_faiss_implementation():
    """Demonstrate FAISS vector store"""
    print("=== FAISS Vector Store Demo ===")

    config = FAISSConfig(
        dimension=384,  # MiniLM dimension
        index_type="HNSW",
        metric="INNER_PRODUCT",
        m=16
    )

    vector_store = FAISSVectorStore(config)

    # Sample documents
    documents = [
        "FAISS is a library for efficient similarity search and clustering of dense vectors.",
        "Facebook AI developed FAISS for large-scale similarity search in high-dimensional spaces.",
        "The library provides implementations of various indexing algorithms for efficient search.",
        "FAISS supports both CPU and GPU acceleration for improved performance.",
        "It includes algorithms like IVF, HNSW, and Product Quantization for different use cases."
    ]

    # Add documents
    result = vector_store.add_documents(documents, train=True)
    print(f"Added {result['documents_added']} documents (trained: {result['index_trained']})")

    # Search
    search_result = vector_store.search("similarity search algorithms", k=3)
    print(f"\nSearch results:")
    for result_item in search_result["results"]:
        print(f"- Similarity: {result_item['similarity']:.3f}")
        print(f"  Content: {result_item['content'][:100]}...")

    # Get stats
    stats = vector_store.get_stats()
    print(f"\nIndex stats:")
    print(f"- Index type: {stats['index_type']}")
    print(f"- Total documents: {stats['total_documents']}")
    print(f"- Memory usage: {stats.get('memory_usage_mb', 'N/A')} MB")

    # Save index
    save_result = vector_store.save_index("./faiss_index")
    print(f"\nSaved index: {save_result}")

def demo_annoy_implementation():
    """Demonstrate Annoy vector store"""
    print("\n=== Annoy Vector Store Demo ===")

    vector_store = AnnoyVectorStore(dimension=384, n_trees=20)

    # Add documents
    documents = [
        "Annoy provides approximate nearest neighbor search for large-scale applications.",
        "It builds forests of random projection trees to partition the vector space.",
        "The library is designed to be memory-efficient and fast for high-dimensional vectors.",
        "Annoy supports various distance metrics including Euclidean, Manhattan, and angular."
        "It's particularly well-suited for recommendation systems and similarity search."
    ]

    result = vector_store.add_documents(documents)
    print(f"Added {result['documents_added']} documents")

    # Build index
    build_result = vector_store.build_index()
    print(f"Built index: {build_result}")

    # Search
    search_result = vector_store.search("recommendation systems", k=3)
    print(f"\nSearch results:")
    for result_item in search_result["results"]:
        print(f"- Similarity: {result_item.get('similarity', 'N/A')}")
        print(f"  Content: {result_item['content'][:100]}...")

    # Get stats
    stats = vector_store.get_stats()
    print(f"\nIndex stats:")
    print(f"- Dimension: {stats['dimension']}")
    print(f"- Total documents: {stats['total_documents']}")
    print(f"- Index built: {stats['is_built']}")

def demo_custom_vector_db():
    """Demonstrate custom vector database with compression"""
    print("\n=== Custom Vector Database Demo ===")

    config = CustomVectorDBConfig(
        storage_path="./custom_vector_db",
        embedding_model="all-MiniLM-L6-v2",
        enable_compression=True
    )

    vector_db = CustomVectorDB(config)

    # Sample documents
    documents = [
        "Custom vector databases provide flexibility for specialized use cases.",
        "Compression techniques allow storing more vectors in limited memory space.",
        "Memory-mapped files enable efficient access to large vector collections.",
        "Custom indexing algorithms can be optimized for specific data distributions.",
        "Integration with existing systems is simplified with custom implementations."
    ]

    # Add documents
    result = vector_db.add_documents(documents)
    print(f"Added {result['documents_added']} documents")

    # Search
    search_result = vector_db.search("custom implementations", k=3)
    print(f"\nSearch results:")
    for result_item in search_result["results"]:
        print(f"- Similarity: {result_item['similarity']:.3f}")
        print(f"  Content: {result_item['content'][:100]}...")

    # Get stats
    stats = vector_db.get_stats()
    print(f"\nDatabase stats:")
    print(f"- Total documents: {stats['total_documents']}")
    print(f"- Compression enabled: {stats['enable_compression']}")
    print(f"- Storage path: {stats['storage_path']}")

def demo_performance_comparison():
    """Demonstrate performance comparison between implementations"""
    print("\n=== Performance Comparison Demo ===")

    # Sample data for benchmarking
    documents = [
        f"Vector database document {i}: Demonstrates semantic search capabilities."
        for i in range(100)
    ]

    queries = [
        "vector search performance",
        "semantic similarity algorithms",
        "efficient indexing methods"
    ]

    # Test FAISS
    try:
        print("Testing FAISS...")
        faiss_config = FAISSConfig(dimension=384, index_type="Flat")
        faiss_store = FAISSVectorStore(faiss_config)

        # Add documents
        faiss_result = faiss_store.add_documents(documents[:50])

        # Benchmark searches
        faiss_times = []
        for query in queries:
            start_time = time.time()
            search_result = faiss_store.search(query, k=5)
            search_time = time.time() - start_time
            faiss_times.append(search_time)

        print(f"FAISS - Avg search time: {np.mean(faiss_times):.4f}s")

    except Exception as e:
        print(f"FAISS test error: {e}")
        faiss_times = []

    # Test Annoy
    try:
        print("\nTesting Annoy...")
        annoy_store = AnnoyVectorStore(dimension=384, n_trees=20)

        # Add documents
        annoy_result = annoy_store.add_documents(documents[:50])
        annoy_store.build_index()

        # Benchmark searches
        annoy_times = []
        for query in queries:
            start_time = time.time()
            search_result = annoy_store.search(query, k=5)
            search_time = time.time() - start_time
            annoy_times.append(search_time)

        print(f"Annoy - Avg search time: {np.mean(annoy_times):.4f}s")

    except Exception as e:
        print(f"Annoy test error: {e}")
        annoy_times = []

    # Test Custom
    try:
        print("\nTesting Custom Vector DB...")
        custom_config = CustomVectorDBConfig(
            storage_path="./perf_test_db",
            enable_compression=True
        )
        custom_store = CustomVectorDB(custom_config)

        # Add documents
        custom_result = custom_store.add_documents(documents[:50])

        # Benchmark searches
        custom_times = []
        for query in queries:
            start_time = time.time()
            search_result = custom_store.search(query, k=5)
            search_time = time.time() - start_time
            custom_times.append(search_time)

        print(f"Custom DB - Avg search time: {np.mean(custom_times):.4f}s")

    except Exception as e:
        print(f"Custom DB test error: {e}")
        custom_times = []

    # Print comparison
    if faiss_times and annoy_times and custom_times:
        print("\nPerformance Comparison:")
        print(f"FAISS:    {np.mean(faiss_times):.4f}s ± {np.std(faiss_times):.4f}s")
        print(f"Annoy:    {np.mean(annoy_times):.4f}s ± {np.std(annoy_times):.4f}s")
        print(f"Custom:   {np.mean(custom_times):.4f}s ± {np.std(custom_times):.4f}s")

        # Find fastest
        all_times = [
            ("FAISS", faiss_times),
            ("Annoy", annoy_times),
            ("Custom", custom_times)
        ]

        fastest = min(all_times, key=lambda x: np.mean(x[1]))
        slowest = max(all_times, key=lambda x: np.mean(x[1]))

        print(f"\nFastest: {fastest[0]} ({np.mean(fastest[1]):.4f}s)")
        print(f"Slowest: {slowest[0]} ({np.mean(slowest[1]):.4f}s)")
        print(f"Speed ratio: {np.mean(slowest[1]) / np.mean(fastest[1]):.2f}x")

# Main execution
if __name__ == "__main__":
    try:
        demo_faiss_implementation()
        demo_annoy_implementation()
        demo_custom_vector_db()
        demo_performance_comparison()
    except Exception as e:
        print(f"Demo error: {e}")
        logger.exception("Demo failed")