feat: add Milvus storage backend (Lite / self-hosted / Zilliz Cloud)#721
feat: add Milvus storage backend (Lite / self-hosted / Zilliz Cloud)#721zc277584121 wants to merge 2 commits intodoobidoo:mainfrom
Conversation
Introduces MilvusMemoryStorage, a pluggable backend that uses pymilvus's
MilvusClient API. One configuration supports three deployment targets:
* Milvus Lite (default): single local file, zero external dependencies
* Self-hosted Milvus server: http(s) endpoint
* Zilliz Cloud: managed endpoint with a token
Schema and semantics:
* Primary key is the memory's content hash, so exact-hash dedup is free
and cleanup_duplicates() is a no-op.
* Tags are stored as ",tag1,tag2," with leading/trailing commas and
matched with "tags like '%,tag,%'" — exact match (no substring).
* AUTOINDEX + COSINE metric work identically on Lite, server, and Cloud.
* Semantic deduplication honours the same MCP_SEMANTIC_DEDUP_* env vars
as sqlite_vec, so the service layer sees consistent behaviour.
* Hard delete (no tombstone table); soft-delete/sync scenarios should
continue to use the hybrid backend.
Wiring:
* storage/factory.py routes backend == "milvus" to the new class and
reads MCP_MILVUS_URI / MCP_MILVUS_TOKEN / MCP_MILVUS_COLLECTION_NAME.
* config.py exposes those three variables and extends SUPPORTED_BACKENDS.
The MCP_ prefix is deliberate — pymilvus reserves MILVUS_URI and
validates it at import time, so a Lite file path in the unprefixed
env var would error before our code runs.
* pyproject.toml ships a new "milvus" optional-dependency group that
pulls in pymilvus>=2.5.0, milvus-lite>=2.4.10, and pins setuptools<81
on Python 3.13+ (milvus-lite still imports pkg_resources).
Tests:
* tests/storage/test_milvus.py (23 tests) covers initialize, store,
retrieve, batch, exact-tag match, AND/OR tag search, delete paths,
update, stats, and semantic dedup. Uses Milvus Lite with a shared DB
file + per-test collection to avoid spawning a daemon per test.
* Full suite: 1611 passed (baseline 1588 + 23 new).
Docs:
* docs/milvus-backend.md — per-target deployment guide, env vars,
schema, limitations, troubleshooting.
* docs/guides/STORAGE_BACKENDS.md — adds Milvus to the comparison
table, selection flowchart, and config reference.
* README.md / .env.example mention the new backend option.
There was a problem hiding this comment.
Code Review
This pull request introduces a Milvus storage backend to the MCP Memory Service, supporting Milvus Lite, self-hosted Milvus, and Zilliz Cloud. The implementation includes a new MilvusMemoryStorage class, updated configuration handling, and comprehensive unit tests. Feedback focuses on improving the scalability and correctness of the Milvus implementation, specifically addressing issues with server-side filtering for substring searches, fixing broken pagination and sorting in query methods, ensuring consistent timestamp metadata, and optimizing fetch limits during vector searches.
| async def get_by_exact_content(self, content: str) -> List[Memory]: | ||
| """Case-insensitive substring match on content. | ||
|
|
||
| Matches sqlite_vec's semantics (which uses ``LIKE '%...%' COLLATE NOCASE``). | ||
| Milvus ``like`` is case-sensitive, so we scan candidates and filter in | ||
| Python. For expected collection sizes (thousands to low millions) this | ||
| is acceptable; production users with huge collections should prefer | ||
| full-text search on a dedicated field instead. | ||
| """ | ||
| if not self._ensure_initialized(): | ||
| return [] | ||
|
|
||
| memories = await self.get_all_memories() | ||
| needle = content.lower() | ||
| return [m for m in memories if needle in (m.content or "").lower()] |
There was a problem hiding this comment.
This method performs a full scan of the collection (up to _MILVUS_MAX_LIMIT) and filters in Python, which has significant drawbacks:
- Incompleteness: It only searches the first 16,384 records. In a collection with millions of memories, matches beyond this limit will never be found.
- Performance/Memory Risk: Fetching thousands of records into memory as Python objects to perform a substring search is inefficient and poses an OOM (Out of Memory) risk for large collections.
Consider using the Milvus like operator for server-side filtering to narrow down candidates. Even if case-insensitivity requires a final Python check, the number of records fetched would be drastically reduced. For true case-insensitive scalability, consider storing a normalized lowercase field or using a Milvus analyzer (available for TEXT fields in 2.4+).
| async def _query_memories( | ||
| self, | ||
| filter_expr: str, | ||
| limit: int, | ||
| offset: int = 0, | ||
| sort_desc_key: Optional[str] = None, | ||
| ) -> List[Memory]: | ||
| """Run a scalar ``query`` and return Memory objects, sorted client-side.""" | ||
| limit = max(1, min(limit, _MILVUS_MAX_LIMIT)) | ||
| # Milvus has a limit on offset (~16384), but total rows < offset+limit still works. | ||
| fetch_limit = min(offset + limit, _MILVUS_MAX_LIMIT) | ||
|
|
||
| try: | ||
| rows = await self._call_client( | ||
| "query", | ||
| collection_name=self.collection_name, | ||
| filter=filter_expr or "", | ||
| output_fields=list(self._OUTPUT_FIELDS), | ||
| limit=fetch_limit, | ||
| ) | ||
| except Exception as exc: # noqa: BLE001 | ||
| logger.warning("Milvus query failed (filter=%r): %s", filter_expr, exc) | ||
| return [] | ||
|
|
||
| memories: List[Memory] = [] | ||
| for row in rows: | ||
| mem = self._entity_to_memory(row) | ||
| if mem is not None: | ||
| memories.append(mem) | ||
|
|
||
| if sort_desc_key: | ||
| memories.sort(key=lambda m: getattr(m, sort_desc_key) or 0.0, reverse=True) | ||
|
|
||
| if offset: | ||
| memories = memories[offset:] | ||
| return memories[:limit] | ||
|
|
There was a problem hiding this comment.
The current implementation of _query_memories has several issues that affect correctness and scalability:
- Incorrect Sorting: Sorting is performed in Python after fetching a subset of records. Without an order_by clause in the Milvus query, the database returns an arbitrary set of records. This means get_recent_memories will not reliably return the most recent entries.
- Broken Pagination: Slicing is done in Python on a result set capped by _MILVUS_MAX_LIMIT. If offset + limit exceeds this limit, the function will return fewer records than requested or even an empty list (e.g., if offset >= 16384), even if more records exist.
- Efficiency: Milvus supports server-side sorting and pagination via order_by, limit, and offset. Using these parameters directly in the query call is significantly more efficient and correct.
Note: While Milvus has a default max_query_limit (typically 16384), using the offset parameter allows paginating through the entire collection.
async def _query_memories(
self,
filter_expr: str,
limit: int,
offset: int = 0,
sort_desc_key: Optional[str] = None,
) -> List[Memory]:
"""Run a scalar query and return Memory objects, sorted server-side."""
query_params = {
"collection_name": self.collection_name,
"filter": filter_expr or "",
"output_fields": list(self._OUTPUT_FIELDS),
"limit": limit,
"offset": offset,
}
if sort_desc_key:
query_params["order_by"] = [f"{sort_desc_key} desc"]
try:
rows = await self._call_client("query", **query_params)
except Exception as exc: # noqa: BLE001
logger.warning("Milvus query failed (filter=%r): %s", filter_expr, exc)
return []
memories: List[Memory] = []
for row in rows:
mem = self._entity_to_memory(row)
if mem is not None:
memories.append(mem)
return memories| def _memory_to_entity(self, memory: Memory, embedding: List[float]) -> Dict[str, Any]: | ||
| return { | ||
| "id": memory.content_hash, | ||
| "vector": embedding, | ||
| "content": memory.content or "", | ||
| "tags": _tags_to_string(memory.tags), | ||
| "memory_type": memory.memory_type or "", | ||
| "metadata": json.dumps(memory.metadata) if memory.metadata else "{}", | ||
| "created_at": float(memory.created_at) if memory.created_at is not None else time.time(), | ||
| "updated_at": float(memory.updated_at) if memory.updated_at is not None else time.time(), | ||
| "created_at_iso": memory.created_at_iso or "", | ||
| "updated_at_iso": memory.updated_at_iso or "", | ||
| } |
There was a problem hiding this comment.
There is an inconsistency in timestamp handling when fields are missing from the Memory object. If memory.created_at is None, it is initialized with time.time(), but created_at_iso is stored as an empty string. This results in records having a valid float timestamp but an empty ISO string, which may break consumers expecting consistent temporal metadata. Both fields should be populated using the same timestamp value. A similar pattern should be applied to _build_update_entity.
def _memory_to_entity(self, memory: Memory, embedding: List[float]) -> Dict[str, Any]:
now = time.time()
created_at = float(memory.created_at) if memory.created_at is not None else now
updated_at = float(memory.updated_at) if memory.updated_at is not None else now
def to_iso(ts):
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat().replace("+00:00", "Z")
return {
"id": memory.content_hash,
"vector": embedding,
"content": memory.content or "",
"tags": _tags_to_string(memory.tags),
"memory_type": memory.memory_type or "",
"metadata": json.dumps(memory.metadata) if memory.metadata else "{}",
"created_at": created_at,
"updated_at": updated_at,
"created_at_iso": memory.created_at_iso or to_iso(created_at),
"updated_at_iso": memory.updated_at_iso or to_iso(updated_at),
}| def _retrieve_fetch_limit(n_results: int, tag_filter: str) -> int: | ||
| # Over-fetch when a tag filter is active — Milvus applies filters during | ||
| # search but effective selectivity can vary. | ||
| base = n_results * 3 if tag_filter else n_results | ||
| return max(1, min(base, _MILVUS_MAX_LIMIT)) | ||
|
|
There was a problem hiding this comment.
Over-fetching by a factor of 3 when a tag filter is active is unnecessary in Milvus. Milvus performs integrated filtering during the vector search process, ensuring that the limit requested is satisfied by matching entities (if they exist). Unlike systems that perform post-filtering, Milvus's search with a filter expression is efficient and exact. Reducing fetch_n to n_results would reduce network overhead and database load.
| def _retrieve_fetch_limit(n_results: int, tag_filter: str) -> int: | |
| # Over-fetch when a tag filter is active — Milvus applies filters during | |
| # search but effective selectivity can vary. | |
| base = n_results * 3 if tag_filter else n_results | |
| return max(1, min(base, _MILVUS_MAX_LIMIT)) | |
| @staticmethod | |
| def _retrieve_fetch_limit(n_results: int, tag_filter: str) -> int: | |
| # Milvus applies filters during search, so over-fetching is not required. | |
| return max(1, min(n_results, _MILVUS_MAX_LIMIT)) |
Review findings — Gemini cross-check + additionsRead the four Gemini comments and verified each against Gemini #1 —
|
…ery ordering, timestamp consistency, bounded embedding cache
- Add a mirrored ``content_lower`` VARCHAR field on the collection schema
and populate it on every insert/upsert. ``get_by_exact_content`` now
pushes a case-insensitive ``like`` filter down to Milvus instead of
fetching every row into Python. Pre-existing collections without the
field fall back to a client-side scan with a startup warning.
- Replace the plain ``client.query(limit=...)`` path in ``_query_memories``
with ``query_iterator`` so sorting and pagination apply to the full
matching set rather than to a random 16384-row window. Fixes silent
truncation on any collection larger than the per-RPC cap.
- Derive ``created_at`` / ``created_at_iso`` (and their ``updated_at``
counterparts) from a single ``time.time()`` reading in
``_memory_to_entity`` and ``_build_update_entity`` so the epoch and ISO
columns never reference different moments when a field is missing.
- Replace the module-level ``Dict[int, List[float]]`` embedding cache with
a bounded LRU keyed by ``f"{model_name}::{text}"``. Eliminates the
silent-wrong-embedding risk from 64-bit ``hash(text)`` collisions and
caps size at 1024 entries to prevent unbounded growth in long-lived
processes.
- Document the 3× over-fetch on tag-filtered ANN retrieval: HNSW filtered
ANN can drop candidates after the filter, so we over-fetch to guarantee
the caller receives ``n_results`` hits.
Tests: 1627 passed (7 new), 0 failed. All functions ≤ B complexity.
|
Thanks for the thorough review! Pushed fixes for all four findings as a new commit (
Also added the one-liner explaining the 3× over-fetch on tag-filtered search — HNSW filtered ANN can eliminate candidates after the filter, so we over-fetch to guarantee Validation:
|
Follow-up review — ready to merge with two asksVerified commit
Complexity still A-B across the board (avg A), 1627 tests green (7 new regression tests for ordering, cross-page consistency, cache eviction, legacy fallback). Approving on code quality. Before merge, two asks that are about long-term sustainability rather than code correctness: 1. CI integration test against Docker MilvusThe 39 unit tests exercise Milvus Lite only. Self-hosted Milvus is the recommended mode for MCP deployments per the docs, but it's the one path with zero automated coverage today. Could you add a CI job that spins up Happy to help wire this into 2. Maintenance commitment
If yes, I'll add a Neither blocks merge on code grounds; both are about making sure this lands in a sustainable way. Let me know your take on both and we can move this forward. |
Summary
Adds a new
milvusstorage backend to the Strategy Pattern understorage/, alongsidesqlite-vec,cloudflare, andhybrid. Implemented against the existingMemoryStorageabstract interface. Supports three deployment modes from the same code path — Milvus Lite (local.dbfile, zero deps, good for scripts and tests), self-hosted Milvus via Docker (recommended for MCP servers and long-lived services), and Zilliz Cloud (managed, at-scale).What's in this PR
src/mcp_memory_service/storage/milvus.py(new) —MilvusMemoryStoragebuilt onpymilvus.MilvusClient.AUTOINDEX+COSINE, dynamic fields for tags/metadata, oneMilvusClientper storage instance,asyncio.to_threadto run the sync pymilvus client from async handlers. All functions graded ≤ B (cyclomatic complexity ≤ 8).storage/factory.py,server_impl.py(eager + lazy init paths),cli/utils.py,web/api/configuration.py,utils/db_utils.py— four dispatch points in the codebase all now recognizemilvus.MCP_MILVUS_URI,MCP_MILVUS_TOKEN,MCP_MILVUS_COLLECTION_NAME. TheMCP_prefix is intentional; pymilvus reserves unprefixedMILVUS_URIand validates it at import time.pyproject.tomladds[project.optional-dependencies].milvus = [pymilvus, milvus-lite, <conditional setuptools<81 on Py 3.13>]. No impact on default installs.uri.endswith(".db"); remote backends never take the reconnect path. See upstream milvus-lite#334 for context.tests/storage/test_milvus.py. Full suite: 1620 passed.docs/milvus-backend.md(new dedicated guide),docs/guides/STORAGE_BACKENDS.mdextended with a Milvus column and decision flowchart,README.mdand.env.exampleboth reference the new backend with appropriate caveats.Which URI to use
./milvus.db(Milvus Lite)http://host:19530milvusdb/milvusDocker image.https://<cluster>.zillizcloud.com+ tokenVerification
pytest -q tests/ --timeout=180→ 1620 passed, 0 failed (baseline 1588 + 32 Milvus).uri=http://localhost:19530): health, 3 stores, semantic search (Milvus memory ranked add MCP server badge #1 for "vector database"), tag filter (exactly 1 match for "user-preference"),total_memories=3— all green.store→ sleep 75 s →store— both succeed,retrievereturns 2 hits, thanks to the reconnect helper.Migration / compatibility
sqlite_vec,hybrid,cloudflareare untouched.MCP_MEMORY_STORAGE_BACKEND=milvusand the[milvus]optional-dependency group.MilvusMemoryStoragesomemory_healthand the web dashboard report correct backend identity.