Vector database abstraction layer with production-ready Qdrant integration for hybrid dense+sparse retrieval.
The database module provides a clean abstraction over vector databases, currently implementing Qdrant as the primary backend. It supports:
- Hybrid Indexing: Dense + sparse vectors in the same collection
- Production Configuration: Environment variables, API keys, cloud deployment
- Automatic Collection Management: Schema creation, versioning, cleanup
- LangChain Integration: Seamless compatibility with LangChain VectorStore
database/
├── base.py # Abstract interfaces
├── qdrant_controller.py # Qdrant implementation
└── README.md # This file
BaseVectorDB (Abstract)
↓
QdrantVectorDB (Concrete)
↓
LangChain VectorStore Integrationfrom database.qdrant_controller import QdrantVectorDB
# Initialize with defaults (localhost)
db = QdrantVectorDB(strategy="hybrid")
# Initialize with custom config
config = {
"qdrant": {
"host": "your-qdrant-cloud.com",
"api_key": "your-api-key",
"collection": "my_collection"
}
}
db = QdrantVectorDB(strategy="hybrid", config=config)
# Initialize collection for 1024-dimensional vectors
db.init_collection(dense_vector_size=1024)from langchain_core.documents import Document
documents = [
Document(
page_content="Renewable energy is sustainable...",
metadata={"source": "energy_paper.pdf", "page": 1}
),
Document(
page_content="Solar panels convert sunlight...",
metadata={"source": "solar_guide.pdf", "page": 3}
)
]
# Insert with embeddings
db.insert_documents(
documents=documents,
dense_embedder=your_dense_embedder,
sparse_embedder=your_sparse_embedder
)# Get LangChain-compatible vectorstore
vectorstore = db.as_langchain_vectorstore(
dense_embedding=dense_embedder,
sparse_embedding=sparse_embedder,
strategy="hybrid" # or "dense", "sparse"
)
# Use with LangChain
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
results = retriever.get_relevant_documents("your query")| Variable | Description | Default | Required |
|---|---|---|---|
QDRANT_HOST |
Qdrant server host | localhost |
✅ |
QDRANT_PORT |
Qdrant server port | 6333 |
No |
QDRANT_API_KEY |
API key (for cloud) | None |
Cloud only |
QDRANT_COLLECTION |
Collection name | default_collection |
✅ |
DENSE_VECTOR_NAME |
Dense vector field name | dense |
No |
SPARSE_VECTOR_NAME |
Sparse vector field name | sparse |
No |
config = {
"qdrant": {
"host": "localhost",
"port": 6333,
"api_key": None, # Optional
"collection": "my_collection",
"dense_vector_name": "dense",
"sparse_vector_name": "sparse"
}
}dense: Dense vector search only (semantic similarity)sparse: Sparse vector search only (keyword matching)hybrid: Combined dense + sparse with score fusion
# Check if collection exists
if db.client.collection_exists("my_collection"):
print("Collection exists")
# Recreate collection (deletes existing data)
db.init_collection(dense_vector_size=1024)
# Get raw Qdrant client for advanced operations
client = db.get_client()
collections = client.get_collections()# Large batch insertion
large_documents = [...] # 10,000+ documents
db.insert_documents(
documents=large_documents,
dense_embedder=embedder,
sparse_embedder=sparse_embedder
)
# Automatically handles batching and memory management# Documents with external IDs (for updates/deduplication)
documents = [
Document(
page_content="Content here",
metadata={
"external_id": "doc_123", # Will be used as vector ID
"source": "file.pdf"
}
)
]
db.insert_documents(documents, dense_embedder=embedder)
# Uses "doc_123" as the vector ID in Qdranttry:
db = QdrantVectorDB()
print("✅ Database connection successful")
except Exception as e:
print(f"❌ Database connection failed: {e}")client = db.get_client()
collection_info = client.get_collection("my_collection")
print(f"Vectors: {collection_info.vectors_count}")
print(f"Status: {collection_info.status}")# Start Qdrant with Docker
docker run -p 6333:6333 qdrant/qdrant:latest
# Or use docker-compose
docker-compose up -d qdrant# Cloud configuration
config = {
"qdrant": {
"host": "xyz-abc.qdrant.tech",
"port": 6333,
"api_key": "your-api-key",
"collection": "production_collection"
}
}
db = QdrantVectorDB(config=config)export QDRANT_HOST=your-qdrant-instance.com
export QDRANT_API_KEY=your-api-key
export QDRANT_COLLECTION=production_collection
# No .env file needed - uses environment variables directly# Run database unit tests
pytest tests/pipeline/test_qdrant.py -v # Start Qdrant first
docker-compose up -d qdrant
# Quick connectivity test
python -c "
from database.qdrant_controller import QdrantVectorDB
try:
db = QdrantVectorDB()
print('✅ Database OK')
except Exception as e:
print(f'❌ Database Error: {e}')
"-
Implement Base Interface
from database.base import BaseVectorDB class MyVectorDB(BaseVectorDB): def init_collection(self, dense_vector_size: int) -> None: # Implementation here pass def insert_documents(self, documents, dense_embedder, sparse_embedder) -> None: # Implementation here pass
-
Register in Factory (if using factory pattern)
DATABASE_REGISTRY["my_db"] = MyVectorDB
# Add custom metadata processing
class CustomQdrantDB(QdrantVectorDB):
def insert_documents(self, documents, dense_embedder, sparse_embedder):
# Custom preprocessing
for doc in documents:
doc.metadata["processed_at"] = datetime.now().isoformat()
doc.metadata["vector_version"] = "v2.0"
# Call parent implementation
super().insert_documents(documents, dense_embedder, sparse_embedder)-
Connection Refused
Error: Connection refused to localhost:6333Solution: Start Qdrant with
docker-compose up -d qdrant -
API Key Authentication
Error: Unauthorized accessSolution: Set
QDRANT_API_KEYenvironment variable -
Collection Already Exists
Error: Collection 'my_collection' already existsSolution: Use
init_collection()to recreate or choose different name -
Vector Dimension Mismatch
Error: Vector dimension mismatchSolution: Ensure embedder output matches
dense_vector_sizein collection
import logging
logging.basicConfig(level=logging.DEBUG)
# Enables detailed Qdrant operation logging
db = QdrantVectorDB()from logs.utils.logger import get_logger
logger = get_logger(__name__)
# Database operations are automatically logged
db.insert_documents(...) # Logs: "Inserted 100 documents"- Pipelines README: Data ingestion pipeline
- Embedding README: Embedding generation
- Retrievers README: Search and retrieval
- Main README: System overview