Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions core/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import tempfile
import uuid
from copy import copy
from datetime import UTC, datetime, timedelta
from io import BytesIO
from pathlib import Path
Expand All @@ -30,7 +31,7 @@
from fastapi import HTTPException, UploadFile
from PIL import Image as PILImage

from core.config import get_settings
from core import config as config_module
from core.database.postgres_database import PostgresDatabase
from core.embedding.base_embedding_model import BaseEmbeddingModel
from core.limits_utils import check_and_increment_limits, estimate_pages_by_chars
Expand All @@ -48,7 +49,58 @@
from core.vector_store.base_vector_store import BaseVectorStore

logger = logging.getLogger(__name__)
settings = get_settings()


def get_settings():
return config_module.get_settings()


class _SettingsProxy:
"""Lazily resolve app settings while preserving direct test overrides."""

def __init__(self):
self._overrides: Dict[str, Any] = {}

def __getattr__(self, name):
if name in self._overrides:
return self._overrides[name]
return getattr(get_settings(), name)

def __setattr__(self, name, value):
if name.startswith("_"):
object.__setattr__(self, name, value)
return
self._overrides[name] = value

def __delattr__(self, name):
if name in self._overrides:
del self._overrides[name]
return
raise AttributeError(name)

def apply_overrides(self, resolved_settings):
if not self._overrides:
return resolved_settings

overrides = dict(self._overrides)
if hasattr(resolved_settings, "model_dump"):
settings_data = resolved_settings.model_dump()
settings_data.update(overrides)
return type(resolved_settings)(**settings_data)

settings_copy = copy(resolved_settings)
for name, value in overrides.items():
setattr(settings_copy, name, value)
return settings_copy


settings = _SettingsProxy()


def _get_settings():
if isinstance(settings, _SettingsProxy):
return settings.apply_overrides(get_settings())
return settings


class PdfConversionError(Exception):
Expand Down Expand Up @@ -96,6 +148,7 @@ def __init__(
parser: BaseParser,
colpali_embedding_model: Optional[BaseEmbeddingModel] = None,
colpali_vector_store: Optional[BaseVectorStore] = None,
settings: Optional[Any] = None,
):
"""
Initialize the IngestionService.
Expand All @@ -108,6 +161,7 @@ def __init__(
parser: Document parser for text extraction
colpali_embedding_model: Optional ColPali embedding model (local or API)
colpali_vector_store: Optional ColPali vector store for multi-vector embeddings
settings: Optional pre-resolved settings object
"""
self.db = database
self.vector_store = vector_store
Expand All @@ -116,6 +170,7 @@ def __init__(
self.parser = parser
self.colpali_embedding_model = colpali_embedding_model
self.colpali_vector_store = colpali_vector_store
self.settings = settings if settings is not None else _get_settings()

# -------------------------------------------------------------------------
# Validation helpers
Expand Down Expand Up @@ -466,7 +521,7 @@ async def _verify_ingest_and_storage_limits(
content_length: int,
document_id: str,
) -> None:
if settings.MODE != "cloud" or not auth.user_id:
if self.settings.MODE != "cloud" or not auth.user_id:
return

num_pages = estimate_pages_by_chars(content_length)
Expand All @@ -492,7 +547,7 @@ async def _verify_ingest_and_storage_limits(
)

async def _record_storage_usage(self, auth: AuthContext, content_length: int, document_id: str) -> None:
if settings.MODE != "cloud" or not auth.user_id:
if self.settings.MODE != "cloud" or not auth.user_id:
return

try:
Expand Down Expand Up @@ -1177,7 +1232,9 @@ async def _process_colpali_embeddings(
"""Process colpali multi-vector embeddings if enabled."""
chunk_objects_multivector = []

if not (use_colpali and settings.ENABLE_COLPALI and self.colpali_embedding_model and self.colpali_vector_store):
if not (
use_colpali and self.settings.ENABLE_COLPALI and self.colpali_embedding_model and self.colpali_vector_store
):
return chunk_objects_multivector

mime_type = file_type if isinstance(file_type, str) else (file_type.mime if file_type is not None else None)
Expand Down Expand Up @@ -1577,7 +1634,7 @@ def _process_pdf_for_colpali(self, file_content: bytes) -> List[Chunk]:
logger.error("PDF file content is empty")
raise PdfConversionError("PDF file content is empty")

dpi = settings.COLPALI_PDF_DPI
dpi = self.settings.COLPALI_PDF_DPI

try:
# Check document density to decide processing strategy
Expand Down Expand Up @@ -1799,12 +1856,12 @@ def _convert_office_to_images(
images_payload: List[Tuple[str, bytes]] = []
total_pages = len(pdf_document)
render_failures = 0
dpi = self.settings.COLPALI_PDF_DPI

try:
for page_num in range(total_pages):
page = pdf_document[page_num]
try:
dpi = settings.COLPALI_PDF_DPI
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
Expand Down
1 change: 1 addition & 0 deletions core/services_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@
embedding_model=embedding_model,
colpali_embedding_model=colpali_embedding_model,
colpali_vector_store=colpali_vector_store,
settings=settings,
)
logger.info("Ingestion service initialised")

Expand Down
35 changes: 23 additions & 12 deletions core/tests/unit/test_ingestion_colpali_rendering.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import subprocess
from io import BytesIO
from pathlib import Path
from types import SimpleNamespace

import pytest
from PIL import Image

from core.services import ingestion_service as ingestion_module
Expand Down Expand Up @@ -61,8 +63,13 @@ def _non_blank_png_bytes() -> bytes:
return output.getvalue()


def test_render_pdf_with_pymupdf_skips_blank_and_failed_pages(monkeypatch):
service = IngestionService(None, None, None, None, None)
@pytest.fixture
def colpali_rendering_settings():
return SimpleNamespace(COLPALI_PDF_DPI=150, ENABLE_COLPALI=True, MODE="cloud")


def test_render_pdf_with_pymupdf_skips_blank_and_failed_pages(monkeypatch, colpali_rendering_settings):
service = IngestionService(None, None, None, None, None, settings=colpali_rendering_settings)
fake_document = FakeDocument(
[
FakePage(_non_blank_png_bytes()),
Expand All @@ -81,8 +88,8 @@ def test_render_pdf_with_pymupdf_skips_blank_and_failed_pages(monkeypatch):
assert fake_document.closed is True


def test_pdf_pdf2image_fallback_skips_blank_and_failed_pages(monkeypatch):
service = IngestionService(None, None, None, None, None)
def test_pdf_pdf2image_fallback_skips_blank_and_failed_pages(monkeypatch, colpali_rendering_settings):
service = IngestionService(None, None, None, None, None, settings=colpali_rendering_settings)
good_page = Image.open(BytesIO(_non_blank_png_bytes()))
blank_page = Image.open(BytesIO(_png_bytes((255, 255, 255))))
failing_page = Image.open(BytesIO(_non_blank_png_bytes()))
Expand All @@ -97,12 +104,16 @@ def fake_img_to_base64_with_bytes(image):
"open",
lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("force pdf2image fallback")),
)
monkeypatch.setattr(ingestion_module.pdf2image, "convert_from_bytes", lambda *args, **kwargs: [
good_page,
blank_page,
failing_page,
good_page,
])
monkeypatch.setattr(
ingestion_module.pdf2image,
"convert_from_bytes",
lambda *args, **kwargs: [
good_page,
blank_page,
failing_page,
good_page,
],
)
monkeypatch.setattr(service, "img_to_base64_with_bytes", fake_img_to_base64_with_bytes)

chunks = service._process_pdf_for_colpali(b"%PDF")
Expand All @@ -112,8 +123,8 @@ def fake_img_to_base64_with_bytes(image):
assert all(chunk.content.startswith("data:image/png;base64,") for chunk in chunks)


def test_office_conversion_skips_blank_and_failed_pages(monkeypatch):
service = IngestionService(None, None, None, None, None)
def test_office_conversion_skips_blank_and_failed_pages(monkeypatch, colpali_rendering_settings):
service = IngestionService(None, None, None, None, None, settings=colpali_rendering_settings)
fake_document = FakeDocument(
[
FakePage(_non_blank_png_bytes()),
Expand Down
Loading