Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 7 additions & 87 deletions notebooks/rag_library_lite_usage.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -252,106 +252,24 @@
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import logging\n",
"import os\n",
"import traceback\n",
"import warnings\n",
"\n",
"# Set the log level via environment variable before importing nvidia_rag\n",
"# This ensures the package respects our log level setting\n",
"LOGLEVEL = logging.WARNING # Set to INFO, DEBUG, WARNING or ERROR\n",
"os.environ[\"LOGLEVEL\"] = logging.getLevelName(LOGLEVEL)\n",
"\n",
"# Configure logging\n",
"logging.basicConfig(level=LOGLEVEL, force=True)\n",
"\n",
"# Keep known benign Milvus Lite/PyMilvus compatibility noise out of notebook output.\n",
"# These messages can appear during collection creation even when the operation succeeds.\n",
"warnings.filterwarnings(\n",
" \"ignore\",\n",
" message=r\"`connections\\.(has_connection|connect)` is an ORM-style PyMilvus API.*\",\n",
")\n",
"\n",
"\n",
"class _GrpcAllocTimestampFilter(logging.Filter):\n",
" def filter(self, record: logging.LogRecord) -> bool:\n",
" if record.name != \"grpc._server\":\n",
" return True\n",
" benign_message = \"Exception calling application: Method not implemented!\"\n",
" if benign_message not in record.getMessage():\n",
" return True\n",
" if not record.exc_info:\n",
" return True\n",
" exc_text = \"\".join(traceback.format_exception(*record.exc_info))\n",
" return \"AllocTimestamp\" not in exc_text\n",
"\n",
"\n",
"grpc_logger = logging.getLogger(\"grpc._server\")\n",
"if not any(\n",
" item.__class__.__name__ == \"_GrpcAllocTimestampFilter\"\n",
" for item in grpc_logger.filters\n",
"):\n",
" grpc_logger.addFilter(_GrpcAllocTimestampFilter())\n",
"\n",
"# Set log levels for specific loggers used by the notebook.\n",
"for name in (\"nvidia_rag\", \"nv_ingest_client\"):\n",
" logging.getLogger(name).setLevel(LOGLEVEL)\n",
"\n",
"for name in list(logging.root.manager.loggerDict):\n",
" if name.startswith(\"nvidia_rag.\") or name.startswith(\"nv_ingest_client.\"):\n",
" logging.getLogger(name).setLevel(LOGLEVEL)\n",
"\n",
"print(\n",
" f\"Notebook logging set to {logging.getLevelName(LOGLEVEL)}. \"\n",
" \"Known benign Milvus Lite compatibility messages will be hidden.\"\n",
")"
]
"source": "import logging\nimport os\nimport traceback\nimport warnings\n\n# Set the log level via environment variable before importing nvidia_rag\n# This ensures the package respects our log level setting\nLOGLEVEL = logging.WARNING # Set to INFO, DEBUG, WARNING or ERROR\nos.environ[\"LOGLEVEL\"] = logging.getLevelName(LOGLEVEL)\n\n# Configure logging\nlogging.basicConfig(level=LOGLEVEL, force=True)\n\n# Keep known benign Milvus Lite/PyMilvus compatibility noise out of notebook output.\n# These messages can appear during collection creation even when the operation succeeds.\nwarnings.filterwarnings(\n \"ignore\",\n message=r\"`connections\\.(has_connection|connect)` is an ORM-style PyMilvus API.*\",\n)\n\n\nclass _GrpcAllocTimestampFilter(logging.Filter):\n def filter(self, record: logging.LogRecord) -> bool:\n if record.name != \"grpc._server\":\n return True\n benign_message = \"Exception calling application: Method not implemented!\"\n if benign_message not in record.getMessage():\n return True\n if not record.exc_info:\n return True\n exc_text = \"\".join(traceback.format_exception(*record.exc_info))\n return \"AllocTimestamp\" not in exc_text\n\n\ngrpc_logger = logging.getLogger(\"grpc._server\")\nif not any(\n item.__class__.__name__ == \"_GrpcAllocTimestampFilter\"\n for item in grpc_logger.filters\n):\n grpc_logger.addFilter(_GrpcAllocTimestampFilter())\n\n# Set log levels for specific loggers used by the notebook.\nfor name in (\"nvidia_rag\", \"nv_ingest_client\"):\n logging.getLogger(name).setLevel(LOGLEVEL)\n\nfor name in list(logging.root.manager.loggerDict):\n if name.startswith(\"nvidia_rag.\") or name.startswith(\"nv_ingest_client.\"):\n logging.getLogger(name).setLevel(LOGLEVEL)\n\nprint(\n f\"Notebook logging set to {logging.getLevelName(LOGLEVEL)}. \"\n \"Known benign Milvus Lite compatibility messages will be hidden.\"\n)"
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize the NvidiaRAGIngestor Package in Lite Mode\n",
"\n",
"Import `NvidiaRAGIngestor` to access APIs for document upload and management operations."
]
"source": "## Initialize the NvidiaRAGIngestor Package in Lite Mode\n\nImport `NvidiaRAGIngestor` to access APIs for document upload and management operations.\n\n> **⚠️ Re-running this cell starts from a clean Milvus Lite database.**\n>\n> This cell intentionally resets Milvus Lite state on every execution to avoid a milvus-lite layout-selection bug that corrupts the on-disk WAL on rerun (manifests during document upload as `No such file or directory: '.../milvus-lite.db/collections/<name>/wal/wal_data_*.arrow'`).\n>\n> Concretely:\n> - **Default (no `MILVUS_LITE_DB_PATH` set):** a fresh per-session directory under the OS temp dir is used. Any collections and documents created in a previous run remain on disk at the old path but are no longer reachable through the new `ingestor`.\n> - **Override (`MILVUS_LITE_DB_PATH` set):** if a database already exists at that path it is **wiped** before the new milvus-lite server starts.\n>\n> **What this means for you:** if you re-run this cell after uploading documents (for example, to retry a failed upload), the collection will appear to \"no longer exist\". That is expected — it is the prior session's database; the new ingestor is pointed at a fresh one. To keep working with the same collection within a session, **do not re-run this cell** — reuse the existing `ingestor` object across the cells below."
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from nvidia_rag import NvidiaRAGIngestor\n",
"from nvidia_rag.utils.configuration import NvidiaRAGConfig\n",
"\n",
"config_ingestor = NvidiaRAGConfig.from_yaml(\"config.yaml\")\n",
"# You can update the config object to use different models and endpoints like below\n",
"# Example: switch to the text-only embedding model\n",
"# config_ingestor.embeddings.model_name = \"nvidia/llama-nemotron-embed-1b-v2\"\n",
"# config_ingestor.embeddings.server_url = \"https://integrate.api.nvidia.com/v1\"\n",
"\n",
"# Set config for rag lite library mode\n",
"# Lite mode uses the embedded Milvus Lite backend; override `name` so it\n",
"# doesn't fall through to the default backend from config.yaml.\n",
"config_ingestor.vector_store.name = \"milvus\"\n",
"config_ingestor.vector_store.url = \"./milvus-lite.db\"\n",
"config_ingestor.nv_ingest.message_client_port = 7671 # Port for NV-Ingest libary mode\n",
"\n",
"# Set config for cloud API endpoints\n",
"config_ingestor.embeddings.server_url = \"https://integrate.api.nvidia.com/v1\"\n",
"\n",
"ingestor = NvidiaRAGIngestor(config=config_ingestor, mode=\"lite\")"
]
"source": "import fcntl\nimport os\nimport shutil\nimport tempfile\nimport uuid\nfrom pathlib import Path\n\nfrom nvidia_rag import NvidiaRAGIngestor\nfrom nvidia_rag.utils.configuration import NvidiaRAGConfig\n\n\ndef _milvus_lite_lock_files(db_path: Path) -> list[Path]:\n \"\"\"Return possible lock-file locations for either milvus-lite layout.\"\"\"\n candidates = [\n db_path.with_name(f\".{db_path.name}.lock\"),\n db_path.with_name(f\"{db_path.name}.lock\"),\n ]\n if db_path.is_dir():\n candidates.append(db_path / \"LOCK\")\n return candidates\n\n\ndef _milvus_lite_path_is_locked(db_path: Path) -> bool:\n \"\"\"True iff another process holds the milvus-lite fcntl lock on this path.\"\"\"\n for lock_file in _milvus_lite_lock_files(db_path):\n if not lock_file.exists():\n continue\n try:\n with open(lock_file, \"rb\") as f:\n fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)\n fcntl.flock(f, fcntl.LOCK_UN)\n except BlockingIOError:\n return True\n except OSError:\n continue\n return False\n\n\ndef _wipe_milvus_lite_path(db_path: Path) -> None:\n \"\"\"Delete the milvus-lite data and any sibling lock files.\"\"\"\n if db_path.is_dir():\n shutil.rmtree(db_path)\n elif db_path.exists():\n db_path.unlink()\n for lock_file in _milvus_lite_lock_files(db_path):\n if lock_file.exists():\n try:\n lock_file.unlink()\n except OSError:\n pass\n\n\ndef _resolve_milvus_lite_path() -> str:\n \"\"\"Pick a milvus-lite database path that is safe to use on every rerun.\n\n Default: a fresh, per-call directory under the system temp dir. This\n guarantees:\n - clean state every time this cell runs, so stale files or directories\n from a previous session can't trigger milvus-lite's layout-selection\n bug, which surfaces during document upload as:\n ``[Errno 2] No such file or directory:\n '.../milvus-lite.db/collections/<name>/wal/wal_data_*.arrow'``\n - a local filesystem, avoiding issues seen on network/shared mounts\n (NFS, Lustre, etc.) where SQLite WAL locking and lazy directory\n creation can break.\n\n Override by setting the ``MILVUS_LITE_DB_PATH`` env var before running this\n cell if you need to control where the database lives. On override, if a\n stale db is found at that path it is wiped first so the new milvus-lite\n server starts from a clean layout. If something is still holding the lock\n (e.g. a previous kernel that hasn't exited) we refuse to wipe and ask\n the user to restart the kernel -- silently moving live data aside is what\n was corrupting the WAL on shared filesystems.\n \"\"\"\n # Stop any milvus-lite server already running in this Python process before\n # we point at a new (or the same overridden) path, so the previous\n # subprocess releases its file handles cleanly.\n try:\n from milvus_lite.server_manager import server_manager_instance\n\n server_manager_instance.release_all()\n except Exception:\n pass\n\n # Drop stale pymilvus ORM aliases left from the previous run. nv_ingest_client\n # writes via connections.connect(uri=..., token=...) without an alias, so it\n # binds to \"default\"; on the next run pymilvus refuses to rebind \"default\" to\n # the new milvus-lite path and raises ConnectionConfigException(ConnDiffConf).\n # Every milvus-lite subprocess was just released above, so existing aliases\n # point to dead handlers and are safe to remove.\n try:\n from pymilvus import connections as _pm_connections\n\n for _alias_name, _ in list(_pm_connections.list_connections()):\n try:\n _pm_connections.remove_connection(_alias_name)\n except Exception:\n pass\n except Exception:\n pass\n\n override = os.environ.get(\"MILVUS_LITE_DB_PATH\", \"\").strip()\n if override:\n db_path = Path(override).expanduser().absolute()\n db_path.parent.mkdir(parents=True, exist_ok=True)\n\n if db_path.exists():\n if _milvus_lite_path_is_locked(db_path):\n raise RuntimeError(\n f\"{db_path} is locked by another process. Restart the \"\n \"Jupyter kernel before re-running this cell, or unset \"\n \"MILVUS_LITE_DB_PATH to use a fresh per-session path.\"\n )\n _wipe_milvus_lite_path(db_path)\n print(f\"Reset stale milvus-lite state at {db_path}\")\n return str(db_path)\n\n # Per-call UUID-suffixed directory so rapid reruns inside the same second\n # in the same process can't collide on an existing path.\n session_dir = (\n Path(tempfile.gettempdir())\n / f\"nvidia-rag-lite-{os.getpid()}-{uuid.uuid4().hex[:8]}\"\n )\n session_dir.mkdir(parents=True, exist_ok=False)\n return str(session_dir / \"milvus.db\")\n\n\nMILVUS_LITE_DB_PATH = _resolve_milvus_lite_path()\nprint(f\"Milvus Lite database: {MILVUS_LITE_DB_PATH}\")\n\nconfig_ingestor = NvidiaRAGConfig.from_yaml(\"config.yaml\")\n# You can update the config object to use different models and endpoints like below\n# Example: switch to the text-only embedding model\n# config_ingestor.embeddings.model_name = \"nvidia/llama-nemotron-embed-1b-v2\"\n# config_ingestor.embeddings.server_url = \"https://integrate.api.nvidia.com/v1\"\n\n# Set config for rag lite library mode\n# Lite mode uses the embedded Milvus Lite backend; override `name` so it\n# doesn't fall through to the default backend from config.yaml.\nconfig_ingestor.vector_store.name = \"milvus\"\nconfig_ingestor.vector_store.url = MILVUS_LITE_DB_PATH\nconfig_ingestor.nv_ingest.message_client_port = 7671 # Port for NV-Ingest libary mode\n\n# Set config for cloud API endpoints\nconfig_ingestor.embeddings.server_url = \"https://integrate.api.nvidia.com/v1\"\n\ningestor = NvidiaRAGIngestor(config=config_ingestor, mode=\"lite\")"
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Create a new collection\n",
"Creates a new collection in the vector database."
]
"source": "## 1. Create a new collection\nCreates a new collection in the vector database. Each run of the initialization cell above starts from an empty Milvus Lite database (see warning there), so this call always creates a brand-new collection — running it after upload does not return the previously created one."
},
{
"cell_type": "code",
Expand Down Expand Up @@ -519,8 +437,10 @@
"# Set config for rag lite library mode\n",
"# Lite mode uses the embedded Milvus Lite backend; override `name` so it\n",
"# doesn't fall through to the default backend from config.yaml.\n",
"# Reuse the same MILVUS_LITE_DB_PATH set above so this RAG client opens the\n",
"# database that the ingestor just populated.\n",
"config_rag.vector_store.name = \"milvus\"\n",
"config_rag.vector_store.url = \"./milvus-lite.db\"\n",
"config_rag.vector_store.url = MILVUS_LITE_DB_PATH\n",
"config_rag.enable_citations = False\n",
"\n",
"# Set config for cloud API endpoints\n",
Expand Down
26 changes: 21 additions & 5 deletions src/nvidia_rag/utils/vdb/milvus/milvus_vdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,17 @@ def _compact_and_wait(self, collection_name: str, timeout: float = 30.0) -> None
This is a blocking (non-async) method. Callers in an async context must
dispatch it via asyncio.to_thread to avoid blocking the event loop.
"""
# milvus-lite does not implement ManualCompaction (the RPC returns
# UNIMPLEMENTED). Calling it surfaces a pymilvus error log and our own
# warning even though the failure is benign. Lite endpoints are file
# URIs with no URL scheme, so skip the call up front.
if not self.url.scheme:
logger.debug(
"Skipping compaction on milvus-lite endpoint %s; ManualCompaction "
"is unimplemented in lite mode.",
self.vdb_endpoint,
)
return
try:
job_id = self._client.compact(collection_name)
logger.debug(
Expand Down Expand Up @@ -856,11 +867,16 @@ def delete_documents(
)

if result_dict is not None:
delete_count = (
resp.get("delete_count", 0)
if isinstance(resp, dict)
else getattr(resp, "delete_count", 0)
)
# MilvusClient.delete returns either a dict {"delete_count": N, ...}
# or, against older Milvus cores (including milvus-lite), the list
# of primary keys that were removed. Handle both so the lite path
# doesn't falsely report a successful delete as "not found".
if isinstance(resp, dict):
delete_count = resp.get("delete_count", 0)
elif isinstance(resp, list):
delete_count = len(resp)
else:
delete_count = getattr(resp, "delete_count", 0)
if delete_count == 0:
logger.info(f"File {doc_name} does not exist in the vectorstore")
result_dict["not_found"].append(doc_name)
Expand Down
Loading
Loading