Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion core/routes/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from core.services.telemetry import TelemetryService
from core.services_init import ingestion_service
from core.utils.arq_jobs import enqueue_job_clearing_stale_result
from core.utils.typed_metadata import TypedMetadataError

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -350,7 +351,13 @@ async def _process_document(doc: Document, override_flag: Optional[bool]) -> Non
folder_leaf=doc.folder_name,
end_user_id=doc.end_user_id,
)
job = await redis.enqueue_job("process_ingestion_job", **job_payload)
job = await enqueue_job_clearing_stale_result(
redis,
"process_ingestion_job",
job_payload,
logger=logger,
context=f"requeue doc_id={ext_id}",
)

if job is None:
results.append(
Expand Down
17 changes: 15 additions & 2 deletions core/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from core.storage.utils_file_extensions import detect_content_type, detect_file_type
from core.utils.fast_ops import bytes_to_data_uri, encode_base64
from core.utils.folder_utils import normalize_folder_path, normalize_ingest_folder_inputs
from core.utils.arq_jobs import enqueue_job_clearing_stale_result
from core.utils.storage_usage import extract_storage_bytes
from core.utils.typed_metadata import MetadataBundle, merge_metadata, normalize_metadata
from core.vector_store.base_vector_store import BaseVectorStore
Expand Down Expand Up @@ -604,7 +605,13 @@ async def ingest_file_content(
folder_leaf=folder_leaf,
end_user_id=end_user_id,
)
job = await redis.enqueue_job("process_ingestion_job", **job_payload)
job = await enqueue_job_clearing_stale_result(
redis,
"process_ingestion_job",
job_payload,
logger=logger,
context=f"ingest doc_id={doc.external_id}",
)
if job is None:
logger.info("Connector file ingestion job already queued (doc_id=%s)", doc.external_id)
else:
Expand Down Expand Up @@ -746,7 +753,13 @@ async def queue_document_update(
folder_leaf=doc.folder_name,
end_user_id=doc.end_user_id,
)
job = await redis.enqueue_job("process_ingestion_job", **job_payload)
job = await enqueue_job_clearing_stale_result(
redis,
"process_ingestion_job",
job_payload,
logger=logger,
context=f"update doc_id={doc.external_id}",
)
if job is None:
logger.info("Update ingestion job already queued (doc_id=%s)", doc.external_id)
else:
Expand Down
9 changes: 8 additions & 1 deletion core/services/v2_document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from core.parser.morphik_parser import MorphikParser
from core.storage.base_storage import BaseStorage
from core.storage.utils_file_extensions import detect_content_type
from core.utils.arq_jobs import enqueue_job_clearing_stale_result
from core.utils.folder_utils import normalize_folder_path, normalize_ingest_folder_inputs
from core.utils.typed_metadata import MetadataBundle, normalize_metadata
from core.vector_store.chunk_v2_store import ChunkV2Store
Expand Down Expand Up @@ -703,7 +704,13 @@ async def ingest_document(
end_user_id=end_user_id,
force_plain_text=force_plain_text,
)
job = await redis.enqueue_job("process_v2_ingestion_job", **job_payload)
job = await enqueue_job_clearing_stale_result(
redis,
"process_v2_ingestion_job",
job_payload,
logger=logger,
context=f"v2 ingest doc_id={doc.external_id}",
)
if job is None:
logger.info("V2 ingestion job already queued (doc_id=%s)", doc.external_id)
else:
Expand Down
100 changes: 100 additions & 0 deletions core/tests/unit/test_arq_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging

import pytest

from core.utils.arq_jobs import arq_result_key, enqueue_job_clearing_stale_result


class FakeRedis:
def __init__(self, *, delete_result=0, enqueue_result=None, delete_raises=None):
self.delete_result = delete_result
self.enqueue_result = enqueue_result
self.delete_raises = delete_raises
self.deleted_keys = []
self.enqueued = []

async def delete(self, key):
self.deleted_keys.append(key)
if self.delete_raises:
raise self.delete_raises
return self.delete_result

async def enqueue_job(self, function_name, **job_payload):
self.enqueued.append((function_name, job_payload))
return self.enqueue_result


def test_arq_result_key():
assert arq_result_key("ingest:doc-1") == "arq:result:ingest:doc-1"


@pytest.mark.asyncio
async def test_enqueue_clears_stale_result_key_before_enqueue():
redis = FakeRedis(delete_result=1, enqueue_result=object())
payload = {"_job_id": "ingest:doc-1", "document_id": "doc-1"}

result = await enqueue_job_clearing_stale_result(
redis,
"process_ingestion_job",
payload,
logger=logging.getLogger(__name__),
context="test",
)

assert result is redis.enqueue_result
assert redis.deleted_keys == ["arq:result:ingest:doc-1"]
assert redis.enqueued == [("process_ingestion_job", payload)]


@pytest.mark.asyncio
async def test_enqueue_returns_none_for_real_duplicate_job():
redis = FakeRedis(delete_result=0, enqueue_result=None)
payload = {"_job_id": "ingest:doc-1", "document_id": "doc-1"}

result = await enqueue_job_clearing_stale_result(
redis,
"process_ingestion_job",
payload,
logger=logging.getLogger(__name__),
context="test",
)

assert result is None
assert redis.deleted_keys == ["arq:result:ingest:doc-1"]
assert redis.enqueued == [("process_ingestion_job", payload)]


@pytest.mark.asyncio
async def test_enqueue_continues_if_stale_result_delete_fails():
redis = FakeRedis(delete_raises=RuntimeError("redis delete failed"), enqueue_result=object())
payload = {"_job_id": "ingest:doc-1", "document_id": "doc-1"}

result = await enqueue_job_clearing_stale_result(
redis,
"process_ingestion_job",
payload,
logger=logging.getLogger(__name__),
context="test",
)

assert result is redis.enqueue_result
assert redis.deleted_keys == ["arq:result:ingest:doc-1"]
assert redis.enqueued == [("process_ingestion_job", payload)]


@pytest.mark.asyncio
async def test_enqueue_without_job_id_does_not_delete():
redis = FakeRedis(enqueue_result=object())
payload = {"document_id": "doc-1"}

result = await enqueue_job_clearing_stale_result(
redis,
"process_ingestion_job",
payload,
logger=logging.getLogger(__name__),
context="test",
)

assert result is redis.enqueue_result
assert redis.deleted_keys == []
assert redis.enqueued == [("process_ingestion_job", payload)]
35 changes: 35 additions & 0 deletions core/utils/arq_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
from typing import Any, Dict


def arq_result_key(job_id: str) -> str:
return f"arq:result:{job_id}"


async def enqueue_job_clearing_stale_result(
redis: Any,
function_name: str,
job_payload: Dict[str, Any],
*,
logger: logging.Logger,
context: str,
) -> Any:
"""Enqueue an ARQ job after removing a completed result for the same job id.

ARQ treats existing result keys as duplicate job ids. Ingestion uses stable
per-document job ids for dedupe, so a completed result can otherwise block a
legitimate later update/requeue and leave the document marked processing.
"""

job_id = job_payload.get("_job_id")
if isinstance(job_id, str) and job_id:
result_key = arq_result_key(job_id)
try:
deleted = await redis.delete(result_key)
except Exception as exc: # noqa: BLE001
logger.warning("Failed to clear stale ARQ result key %s before enqueue (%s): %s", result_key, context, exc)
else:
if deleted:
logger.info("Cleared stale ARQ result key %s before enqueue (%s)", result_key, context)

return await redis.enqueue_job(function_name, **job_payload)
Loading