Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ authors = [
requires-python = ">=3.11,<3.14"

dependencies = [
"bleach>=6.2,<7.0",
"bleach>=6.4.0,<7.0",
"dataclass-wizard>=0.27,<1.0",
"fastapi>=0.115.5,<1.0",
"fastapi>=0.133.0,<1.0",
"anyio>=4.12.0",
"httpx>=0.28.1",
"httpx-sse>=0.4.3",
"langchain>=1.3.1",
"langchain>=1.3.9",
"langchain-community>=0.4",
"langgraph>=1.2.1",
"langchain-milvus>=0.3.0",
Expand All @@ -30,7 +30,7 @@ dependencies = [
"pydantic>=2.11,<3.0",
"pymilvus[milvus_lite]>=2.6.7,<3.0",
"pymilvus-model>=0.3,<1.0",
"python-multipart>=0.0.27,<1.0",
"python-multipart>=0.0.31,<1.0",
"pyyaml>=6.0,<7.0",
"uvicorn[standard]>=0.32,<1.0",
"langchain-core>=1.2.28",
Expand All @@ -56,7 +56,7 @@ rag = [
"prometheus-client>=0.20,<1.0",
"azure-core>=1.35,<2.0",
"azure-storage-blob>=12.26,<13.0",
"pyarrow>=21.0,<22.0",
"pyarrow>=23.0.1,<25.0",
"tiktoken>=0.7",
]
ingest = [
Expand All @@ -79,7 +79,7 @@ ingest = [
"opentelemetry-sdk>=1.29,<2.0",
"azure-core>=1.35,<2.0",
"azure-storage-blob>=12.26,<13.0",
"pyarrow>=21.0,<22.0",
"pyarrow>=23.0.1,<25.0",
"setuptools>=80.10.2",
]
all = [
Expand All @@ -102,7 +102,7 @@ all = [
"opentelemetry-sdk>=1.29,<2.0",
"azure-core>=1.35,<2.0",
"azure-storage-blob>=12.26,<13.0",
"pyarrow>=21.0,<22.0",
"pyarrow>=23.0.1,<25.0",
# Elasticsearch support
"langchain-elasticsearch>=0.3",
]
Expand All @@ -117,9 +117,10 @@ nvidia-rag = { workspace = true }
# Pillow 12.x required for containers; moviepy pins pillow<12 so override needed for resolution
override-dependencies = [
"pillow>=12.2.0",
"cryptography>=46.0.6",
"cryptography>=48.0.1",
"urllib3>=2.7.0",
"aiohttp>=3.13.4",
"aiohttp>=3.14.1",
"starlette>=1.3.1",
"orjson>=3.11.6",
"langsmith>=0.8.0",
"langchain-classic>=1.0.7",
Expand Down
14 changes: 9 additions & 5 deletions src/nvidia_rag/ingestor_server/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ async def check_object_store_health(
)
except Exception as e:
latency_ms = round((time.time() - start_time) * 1000, 2)
logger.error("Error checking filesystem object-store health: %s", e, exc_info=True)
logger.error(
"Error checking filesystem object-store health: %s", e, exc_info=True
)
return StorageHealthInfo(
service="Object Storage",
url=config.storage_root.as_uri(),
Expand All @@ -209,7 +211,9 @@ async def check_object_store_health(

try:
start_time = time.time()
object_store_operator = get_object_store_operator(config=NvidiaRAGConfig(object_store=config))
object_store_operator = get_object_store_operator(
config=NvidiaRAGConfig(object_store=config)
)
# Test basic operation - list buckets
buckets = object_store_operator.client.list_buckets()
latency_ms = round((time.time() - start_time) * 1000, 2)
Expand Down Expand Up @@ -443,10 +447,10 @@ async def check_all_services_health(
embed_url = config.embeddings.server_url
if not embed_url.startswith(("http://", "https://")):
embed_url = f"http://{embed_url}"

# Check if version suffix (v1, v2, vN) is already present in the URL
has_version = re.search(r'/v\d+(?:/|$)', embed_url)
has_version = re.search(r"/v\d+(?:/|$)", embed_url)

if has_version:
# Version already present, just add /health/ready
embed_url = f"{embed_url.rstrip('/')}/health/ready"
Expand Down
28 changes: 16 additions & 12 deletions src/nvidia_rag/ingestor_server/ingestion_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def __init__(
self.total_documents_completed = 0
self.total_batches_completed = 0
self.documents_completed_list = [] # list[dict[str, Any]]

# NV-Ingest document-wise status variables
self.nv_ingest_status = {} # { "extraction_completed": int, "document_wise_status": dict[str, Any] }
self.nv_ingest_status = {} # { "extraction_completed": int, "document_wise_status": dict[str, Any] }
self.nv_ingest_document_wise_status = {}

# Add request data
Expand Down Expand Up @@ -125,7 +125,9 @@ async def initialize_nv_ingest_status(
}
filenames = [os.path.basename(filepath) for filepath in filepaths]
self.nv_ingest_document_wise_status = dict.fromkeys(filenames, "not_started")
self.nv_ingest_status["document_wise_status"] = self.nv_ingest_document_wise_status
self.nv_ingest_status["document_wise_status"] = (
self.nv_ingest_document_wise_status
)
return self.nv_ingest_status

async def update_nv_ingest_status(
Expand All @@ -134,13 +136,15 @@ async def update_nv_ingest_status(
):
async with self.asyncio_lock:
self.nv_ingest_document_wise_status.update(nv_ingest_document_wise_status)
self.nv_ingest_status["document_wise_status"] = self.nv_ingest_document_wise_status
self.nv_ingest_status["extraction_completed"] = len([
status
for status in self.nv_ingest_document_wise_status.values()
if status == "completed"
])
logger.debug(
f"Updated NV-Ingest status: {self.nv_ingest_status}"
)
self.nv_ingest_status["document_wise_status"] = (
self.nv_ingest_document_wise_status
)
self.nv_ingest_status["extraction_completed"] = len(
[
status
for status in self.nv_ingest_document_wise_status.values()
if status == "completed"
]
)
logger.debug(f"Updated NV-Ingest status: {self.nv_ingest_status}")
return self.nv_ingest_status
42 changes: 29 additions & 13 deletions src/nvidia_rag/ingestor_server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def __init__(
# config.nv_ingest.backend == "nrl". None means NV-Ingest path is active.
# Type annotation uses a string literal to avoid importing at module level
# (nemo_retriever may not be installed when the NV-Ingest backend is used).
self._nrl_handler: "NemoRetrieverHandler | None" = None
self._nrl_handler: NemoRetrieverHandler | None = None
self.prompts = get_prompts(prompts)

# Initialize instance-based clients
Expand All @@ -191,9 +191,7 @@ def __init__(
self.object_store_operator = get_object_store_operator(config=self.config)
# Ensure default bucket exists (idempotent operation)
try:
self.object_store_operator._make_bucket(
bucket_name=DEFAULT_BUCKET_NAME
)
self.object_store_operator._make_bucket(bucket_name=DEFAULT_BUCKET_NAME)
logger.debug(
"Ensured default object-store bucket '%s' exists",
DEFAULT_BUCKET_NAME,
Expand Down Expand Up @@ -1609,7 +1607,9 @@ def delete_collections(
collection_prefix
)
if len(delete_object_names):
self.object_store_operator.delete_payloads(delete_object_names)
self.object_store_operator.delete_payloads(
delete_object_names
)
logger.info(
f"Deleted all document summaries from object storage for collection: {collection}"
)
Expand Down Expand Up @@ -1866,7 +1866,9 @@ def delete_documents(
# Helper function to delete object-store metadata for documents
def delete_object_store_metadata(docs_to_delete: list[str]) -> None:
if self.object_store_operator is None:
logger.warning("Object store unavailable - skipping metadata deletion")
logger.warning(
"Object store unavailable - skipping metadata deletion"
)
return

for doc in docs_to_delete:
Expand All @@ -1893,7 +1895,9 @@ def delete_object_store_metadata(docs_to_delete: list[str]) -> None:
filename_prefix
)
if len(delete_object_names):
self.object_store_operator.delete_payloads(delete_object_names)
self.object_store_operator.delete_payloads(
delete_object_names
)
logger.info(
"Deleted summary for doc: %s from object storage", doc
)
Expand Down Expand Up @@ -2345,7 +2349,9 @@ async def __run_nrl_ingestion(
# so that callers polling GET /summary see a consistent initial state.
# ------------------------------------------------------------------
if generate_summary:
logger.debug("NRL: setting PENDING summary status for %d files", len(filepaths))
logger.debug(
"NRL: setting PENDING summary status for %d files", len(filepaths)
)
for filepath in filepaths:
file_name = os.path.basename(filepath)
SUMMARY_STATUS_HANDLER.set_status(
Expand All @@ -2369,12 +2375,18 @@ async def __run_nrl_ingestion(
"before full ingest pipeline",
len(filepaths),
)
page_filter = summary_options.get("page_filter") if summary_options else None
page_filter = (
summary_options.get("page_filter") if summary_options else None
)
summarization_strategy = (
summary_options.get("summarization_strategy") if summary_options else None
summary_options.get("summarization_strategy")
if summary_options
else None
)
try:
shallow_schema_mgr = await self._nrl_handler.ingest_shallow(filepaths)
shallow_schema_mgr = await self._nrl_handler.ingest_shallow(
filepaths
)
shallow_results = shallow_schema_mgr.to_nv_ingest_results_format()
task = asyncio.create_task(
self.__ingest_document_summary(
Expand Down Expand Up @@ -2463,9 +2475,13 @@ async def __run_nrl_ingestion(
# Mirrors the asyncio.create_task block in __nv_ingest_ingestion_pipeline.
# ------------------------------------------------------------------
if generate_summary and not shallow_summary:
page_filter = summary_options.get("page_filter") if summary_options else None
page_filter = (
summary_options.get("page_filter") if summary_options else None
)
summarization_strategy = (
summary_options.get("summarization_strategy") if summary_options else None
summary_options.get("summarization_strategy")
if summary_options
else None
)
task = asyncio.create_task(
self.__ingest_document_summary(
Expand Down
4 changes: 3 additions & 1 deletion src/nvidia_rag/ingestor_server/nemo_retriever/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from __future__ import annotations

_PDF_DOC_EXTS: frozenset[str] = frozenset({".pdf", ".docx", ".pptx"})
_IMAGE_EXTS: frozenset[str] = frozenset({".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp"})
_IMAGE_EXTS: frozenset[str] = frozenset(
{".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp"}
)
_TEXT_EXTS: frozenset[str] = frozenset({".txt"})
_HTML_EXTS: frozenset[str] = frozenset({".html", ".htm"})
_AUDIO_VIDEO_EXTS: frozenset[str] = frozenset({".mp3", ".wav", ".mp4"})
Expand Down
4 changes: 3 additions & 1 deletion src/nvidia_rag/ingestor_server/nemo_retriever/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import logging
from pathlib import Path

from nvidia_rag.ingestor_server.nemo_retriever.extensions import NRL_SUPPORTED_EXTENSIONS
from nvidia_rag.ingestor_server.nemo_retriever.extensions import (
NRL_SUPPORTED_EXTENSIONS,
)

logger = logging.getLogger(__name__)

Expand Down
37 changes: 18 additions & 19 deletions src/nvidia_rag/ingestor_server/nemo_retriever/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@
from nemo_retriever.graph_ingestor import GraphIngestor

from nvidia_rag.ingestor_server.nemo_retriever.extensions import (
NRL_SUPPORTED_EXTENSIONS,
_AUDIO_VIDEO_EXTS,
_EXT_TO_TYPE,
_HTML_EXTS,
_IMAGE_EXTS,
_PDF_DOC_EXTS,
_TEXT_EXTS,
_TYPE_ORDER,
NRL_SUPPORTED_EXTENSIONS,
)
from nvidia_rag.ingestor_server.nemo_retriever.ingest_schema_manager import (
IngestSchemaManager,
Expand Down Expand Up @@ -126,9 +126,7 @@ def __init__(self, config: NvidiaRAGConfig) -> None:
self._run_mode: str = getattr(config.nv_ingest, "nrl_run_mode", "batch")
# One pipeline at a time: NRL / Ray owns its own worker threads.
self._executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=1)
logger.info(
"NemoRetrieverHandler initialised (run_mode=%s)", self._run_mode
)
logger.info("NemoRetrieverHandler initialised (run_mode=%s)", self._run_mode)

# ------------------------------------------------------------------
# Public async API
Expand Down Expand Up @@ -246,14 +244,11 @@ async def ingest_shallow(self, filepaths: list[str]) -> IngestSchemaManager:
"""
logger.info("ingest_shallow() called with %d filepath(s)", len(filepaths))

supported = [
fp for fp in filepaths if Path(fp).suffix.lower() in _PDF_DOC_EXTS
]
supported = [fp for fp in filepaths if Path(fp).suffix.lower() in _PDF_DOC_EXTS]
skipped_count = len(filepaths) - len(supported)
if skipped_count:
skipped_paths = [
fp for fp in filepaths
if Path(fp).suffix.lower() not in _PDF_DOC_EXTS
fp for fp in filepaths if Path(fp).suffix.lower() not in _PDF_DOC_EXTS
]
logger.warning(
"ingest_shallow: skipping %d non-PDF/DOC/PPTX file(s) "
Expand All @@ -279,7 +274,9 @@ async def ingest_shallow(self, filepaths: list[str]) -> IngestSchemaManager:
self._executor, self._run_sync, ingestor, "pdf_doc_shallow"
)
logger.info(
"ingest_shallow complete: %d rows in %.2fs", len(df), time.perf_counter() - t0
"ingest_shallow complete: %d rows in %.2fs",
len(df),
time.perf_counter() - t0,
)
return IngestSchemaManager(df)

Expand Down Expand Up @@ -316,7 +313,9 @@ def _classify_filepaths(self, filepaths: list[str]) -> dict[str, list[str]]:

non_empty = {k: len(v) for k, v in classified.items() if v}
logger.info(
"File classification result (%d file(s) total): %s", len(filepaths), non_empty
"File classification result (%d file(s) total): %s",
len(filepaths),
non_empty,
)
return classified

Expand Down Expand Up @@ -346,10 +345,10 @@ def _build_ingestors(
classified = self._classify_filepaths(filepaths)

builders: dict[str, Any] = {
"pdf_doc": self._build_pdf_doc_ingestor,
"image": self._build_image_ingestor,
"text": self._build_text_ingestor,
"html": self._build_html_ingestor,
"pdf_doc": self._build_pdf_doc_ingestor,
"image": self._build_image_ingestor,
"text": self._build_text_ingestor,
"html": self._build_html_ingestor,
"audio_video": self._build_audio_video_ingestor,
}

Expand All @@ -364,7 +363,9 @@ def _build_ingestors(
len(paths),
paths,
)
gi = builders[type_key](paths, split_options, extract_override, vdb_op, store_images)
gi = builders[type_key](
paths, split_options, extract_override, vdb_op, store_images
)
result.append((type_key, gi))

logger.info(
Expand Down Expand Up @@ -557,9 +558,7 @@ def _build_shallow_ingestor(self, filepaths: list[str]) -> GraphIngestor:
# Synchronous execution (runs inside ThreadPoolExecutor)
# ------------------------------------------------------------------

def _run_sync(
self, ingestor: GraphIngestor, type_label: str = ""
) -> pd.DataFrame:
def _run_sync(self, ingestor: GraphIngestor, type_label: str = "") -> pd.DataFrame:
"""Call ``ingestor.ingest()`` and materialise the result as a DataFrame.

``inprocess`` mode returns a ``pandas.DataFrame`` directly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from typing import Any

import pandas as pd

from nemo_retriever.vector_store.lancedb_utils import build_lancedb_rows


Expand Down
Loading
Loading