Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions src/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,56 @@ def __init__(self, document_service=None, models_service=None, ingestion_timeout
# TaskService is a singleton, so this limits concurrency system-wide.
self._worker_count = get_worker_count()
self._processing_semaphore = asyncio.Semaphore(self._worker_count)
self._worker_stats_lock = asyncio.Lock()
self._active_workers = 0
self._queued_workers = 0

async def _acquire_processing_slot(self, task_id: str, item_key: str) -> float:
"""Acquire a worker slot and log if work had to wait in the queue."""
queued_at = time.monotonic()

async with self._worker_stats_lock:
self._queued_workers += 1
queued_workers = self._queued_workers
active_workers = self._active_workers

logger.debug(
"File processing task queued for worker slot",
task_id=task_id,
file_path=item_key,
active_workers=active_workers,
queued_workers=queued_workers,
worker_count=self._worker_count,
)

await self._processing_semaphore.acquire()
wait_seconds = time.monotonic() - queued_at

async with self._worker_stats_lock:
self._queued_workers -= 1
self._active_workers += 1
queued_workers = self._queued_workers
active_workers = self._active_workers

# Ignore scheduler jitter and only log meaningful contention.
if wait_seconds >= 0.01:
logger.info(
"File processing task waited for worker slot",
task_id=task_id,
file_path=item_key,
wait_seconds=round(wait_seconds, 3),
active_workers=active_workers,
queued_workers=queued_workers,
worker_count=self._worker_count,
)

return wait_seconds

async def _release_processing_slot(self) -> None:
"""Release a worker slot and update queue accounting."""
self._processing_semaphore.release()
async with self._worker_stats_lock:
self._active_workers = max(0, self._active_workers - 1)

def _get_task_lock(self, task_id: str) -> asyncio.Lock:
"""Get or create a lock for a specific task's counter updates"""
Expand Down Expand Up @@ -318,7 +368,11 @@ async def background_custom_processor(
# - Limits concurrency across all tasks, not just within this one
# - Potential bottlenecks related to downstream Langflow / Docling capacity rather than backend I/O
async def process_with_semaphore(item, item_key: str):
async with self._processing_semaphore:
slot_acquired = False
wait_seconds = 0.0
try:
wait_seconds = await self._acquire_processing_slot(task_id, item_key)
slot_acquired = True
file_task = upload_task.file_tasks[item_key]
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
Expand All @@ -328,6 +382,7 @@ async def process_with_semaphore(item, item_key: str):
task_number=upload_task.sequence_number,
task_id=task_id,
file_path=file_task.file_path,
wait_seconds=round(wait_seconds, 3),
)

try:
Expand Down Expand Up @@ -408,6 +463,9 @@ async def process_with_semaphore(item, item_key: str):
async with self._get_task_lock(task_id):
upload_task.processed_files += 1
upload_task.updated_at = time.time()
finally:
if slot_acquired:
await self._release_processing_slot()

tasks = [process_with_semaphore(item, str(item)) for item in items]

Expand Down Expand Up @@ -793,4 +851,3 @@ async def shutdown(self):
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
logger.warning("Background task raised exception during shutdown", error=str(result))

26 changes: 26 additions & 0 deletions tests/unit/test_task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,29 @@ async def update_counter(file_id: str, should_fail: bool):
assert task.successful_files == 13
assert task.processed_files == task.successful_files + task.failed_files


@pytest.mark.asyncio
async def test_logs_when_waiting_for_worker_slot(task_service):
"""Test that contention on the global worker semaphore is logged."""
task_service._worker_count = 1
task_service._processing_semaphore = asyncio.Semaphore(1)

await task_service._acquire_processing_slot("task-1", "file-1")

async def blocked_acquire():
return await task_service._acquire_processing_slot("task-2", "file-2")

blocked_task = asyncio.create_task(blocked_acquire())
await asyncio.sleep(0.02)

with patch("services.task_service.logger.info") as mock_info:
await task_service._release_processing_slot()
wait_seconds = await blocked_task

await task_service._release_processing_slot()

assert wait_seconds >= 0.01
assert any(
call.args and call.args[0] == "File processing task waited for worker slot"
for call in mock_info.call_args_list
)
Loading