Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,103 @@ jobs:
echo "$OUT2" | grep -q "If all of these points are met, we give a Green label." || exit 1
echo ::endgroup::

- name: Wait for files queue to drain to zero
run: |
echo "::group::Waiting up to 5 minutes for documents queue to reach zero"
for i in {1..30}; do
stats=$(./occ context_chat:stats --json)
scheduled=$(echo "$stats" | jq '[.queued_documents_counts | to_entries[].value] | add // 0')
echo "Attempt $i: queued_documents=$scheduled"
if [ "$scheduled" = "0" ]; then
echo "Queue is empty"
break
fi
if [ "$i" = "30" ]; then
echo "Timeout: queue did not drain to zero (queued_documents=$scheduled)"
exit 1
fi
sleep 10
done
echo "::endgroup::"

- name: Check if the empty files get identified, not indexed and don't cause any issues with the batch
run: |
NEW_FILES_COUNT=5

echo "::group::Stats before the empty files are added"
prev_stats=$(./occ context_chat:stats --json)
echo "$prev_stats"
prev_scheduled=$(echo "$prev_stats" | jq '.queued_documents_counts.files__default // 0')
prev_eligible=$(echo "$prev_stats" | jq '.eligible_files_count')
prev_indexed=$(echo "$prev_stats" | jq '.vectordb_document_counts.files__default // 0')
echo "queued_documents_counts.files__default before scan: $prev_scheduled"
echo "::endgroup::"

# create empty markdown files in admin's home folder
for i in $(seq 1 $NEW_FILES_COUNT); do
touch "data/admin/files/test_empty_$i.md"
done

# create one new file with content
echo "hello world" > "data/admin/files/test_filled.md"

# run files scan so Nextcloud registers the new files
./occ files:scan admin

echo "::group::Confirming new files appear in the queue after scan"
stats=$(./occ context_chat:stats --json)
echo "$stats"
scheduled=$(echo "$stats" | jq '.queued_documents_counts.files__default // 0')
echo "queued_documents_counts.files__default after scan: $scheduled"
expected_scheduled=$((prev_scheduled + NEW_FILES_COUNT + 1)) # +1 non-empty file
if [ "$scheduled" -ne "$expected_scheduled" ]; then
echo "Error: expected exactly $expected_scheduled files in the queue (prev=$prev_scheduled + new=$NEW_FILES_COUNT), got $scheduled"
exit 1
fi
echo "::endgroup::"

echo "::group::Waiting up to 5 minutes for queue to drain to zero again"
for i in {1..30}; do
stats=$(./occ context_chat:stats --json)
scheduled=$(echo "$stats" | jq '[.queued_documents_counts | to_entries[].value] | add // 0')
locked=$(echo "$stats" | jq '[.queued_documents_locked_counts | to_entries[].value] | add // 0')
echo "Attempt $i: queued_documents=$scheduled locked=$locked"
if [ "$scheduled" = "0" ]; then
echo "Queue drained"
break
fi
if [ "$i" = "30" ]; then
echo "Timeout: queue did not drain to zero (queued_documents=$scheduled)"
exit 1
fi
sleep 10
done
echo "::endgroup::"

# locked must be zero
if [ "$locked" != "0" ]; then
echo "Error: expected locked count to be 0, got $locked"
exit 1
fi
echo "Locked count is 0 as expected"

# StorageService#countFiles filters size > 0, so empty files are NOT counted as eligible.
# Only the 1 non-empty file is both eligible and indexed, so the gap must stay the same (diff=0).
eligible=$(echo "$stats" | jq '.eligible_files_count')
indexed=$(echo "$stats" | jq '.vectordb_document_counts.files__default // 0')
echo "prev: eligible=$prev_eligible indexed=$prev_indexed"
echo "eligible_files_count=$eligible vectordb_document_counts.files__default=$indexed"

prev_gap=$((prev_eligible - prev_indexed))
curr_gap=$((eligible - indexed))
gap_diff=$((curr_gap - prev_gap))
echo "gap before=$prev_gap gap after=$curr_gap difference=$gap_diff expected=0"
if [ "$gap_diff" -ne "0" ]; then
echo "Error: expected the eligible-indexed gap to stay the same (empty files are excluded from eligible_files_count by the size > 0 filter), got diff=$gap_diff"
exit 1
fi
echo "PASS: empty files excluded from eligible_files_count (size > 0 filter) and absent from vectordb_document_counts"

- name: Check python memory usage
run: |
ps -p $(cat pid.txt) -o pid,cmd,%mem,rss --sort=-%mem
Expand Down
51 changes: 45 additions & 6 deletions context_chat_backend/chain/ingest/injest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
import json
import logging
import re
from collections.abc import Mapping
Expand All @@ -12,6 +13,8 @@
import niquests
from langchain.schema import Document
from nc_py_api import AsyncNextcloudApp
from nc_py_api._exceptions import NextcloudException
from pydantic import ValidationError

from ...dyn_loader import VectorDBLoader
from ...types import IndexingError, IndexingException, ReceivedFileItem, SourceItem, TConfig
Expand All @@ -26,6 +29,21 @@
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB, all loaded in RAM at once


async def __try_parse_ocs_response(response: niquests.AsyncResponse | None, e: Exception) -> dict | str: # pyright: ignore[reportArgumentType]
if response is None or (await response.text) is None:
return str(e)

res_text = await response.text or str(e)
try:
ocs_response = json.loads(res_text)
ocs_data = ocs_response.get("ocs", {}).get("data")
if not ocs_data:
return res_text
return ocs_data
except json.JSONDecodeError:
return str(e)


async def __fetch_file_content(
semaphore: asyncio.Semaphore,
file_id: int,
Expand All @@ -52,6 +70,7 @@ async def __fetch_file_content(
fp.seek(0)
return fp
except niquests.exceptions.RequestException as e:
# this path is probably moot since the exceptions would be re-raised as NextcloudException
if e.response is None:
raise

Expand All @@ -75,6 +94,16 @@ async def __fetch_file_content(
raise
except IndexingException:
raise
except NextcloudException as e:
err_msg = await __try_parse_ocs_response(e.response, e) # pyright: ignore[reportArgumentType]
if e.status_code == 404:
# see context_chat's `/files/{fileId}` endpoint
raise IndexingException(f'File fetch from Nextcloud server failed: {err_msg}') from e

raise IndexingException(
f'Network error encountered fetching content for file id {file_id}, user id {user_id}: {err_msg}',
retryable=True,
) from e
except Exception as e:
logger.error(f'Error fetching content for file id {file_id}, user id {user_id}: {e}', exc_info=e)
raise IndexingException(f'Error fetching content for file id {file_id}, user id {user_id}: {e}') from e
Expand Down Expand Up @@ -131,12 +160,22 @@ async def __fetch_files_content(
results = await asyncio.gather(*tasks, return_exceptions=True)
for (db_id, file), result in zip(task_sources.items(), results, strict=True):
if isinstance(result, str) or isinstance(result, BytesIO):
source_items[db_id] = SourceItem(
**{
**file.model_dump(),
'content': result,
}
)
try:
source_items[db_id] = SourceItem(
**{
**file.model_dump(),
'content': result,
}
)
except ValidationError as e:
logger.info(
f'Found empty content for db id {db_id}, file id {file.file_id}, reference {file.reference}'
f': {e}',
)
error_items[db_id] = IndexingError(
error=f'Found empty content: {e}',
retryable=False,
)
elif isinstance(result, IndexingException):
logger.error(
f'Error fetching content for db id {db_id}, file id {file.file_id}, reference {file.reference}'
Expand Down
39 changes: 28 additions & 11 deletions context_chat_backend/task_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
EmbeddingException,
FilesQueueItems,
IndexingError,
ItemValidationError,
LoaderException,
ReceivedFileItem,
SourceItem,
Expand Down Expand Up @@ -121,6 +122,7 @@ def _load_sources(source_items: Mapping[int, SourceItem | ReceivedFileItem]) ->
error=f'{e.__class__.__name__}: {e}',
retryable=True,
)
# todo: to retry or not to retry?
LOGGER.error(
'embed_sources subprocess raised a %s error for %d sources, marking all as retryable',
e.__class__.__name__, len(source_refs), exc_info=e, extra={
Expand Down Expand Up @@ -169,22 +171,37 @@ def _load_sources(source_items: Mapping[int, SourceItem | ReceivedFileItem]) ->
sleep(POLLING_COOLDOWN)
continue

files_result = {}
providers_result = {}
# separate valid items from per-item pydantic validation errors
valid_files: dict[int, ReceivedFileItem] = {}
valid_providers: dict[int, SourceItem] = {}
files_result: dict[int, IndexingError | None] = {}
providers_result: dict[int, IndexingError | None] = {}

for db_id, item in q_items.files.items():
if isinstance(item, ItemValidationError):
files_result[db_id] = IndexingError(error=item.error, retryable=False)
else:
valid_files[db_id] = item
for db_id, item in q_items.content_providers.items():
if isinstance(item, ItemValidationError):
providers_result[db_id] = IndexingError(error=item.error, retryable=False)
else:
valid_providers[db_id] = item

if not valid_files and not valid_providers:
LOGGER.warning('No valid files or providers found in the current batch.')
# do not short the loop here since the invalid items need to be deleted at the end

# chunk file parsing for better file operation parallelism
file_chunk_size = max(MIN_FILES_PER_CPU, math.ceil(len(q_items.files) / file_parsing_cpu_count))
file_chunk_size = max(MIN_FILES_PER_CPU, math.ceil(len(valid_files) / file_parsing_cpu_count))
file_chunks = [
dict(list(q_items.files.items())[i:i+file_chunk_size])
for i in range(0, len(q_items.files), file_chunk_size)
dict(list(valid_files.items())[i:i+file_chunk_size])
for i in range(0, len(valid_files), file_chunk_size)
]
provider_chunk_size = max(
MIN_FILES_PER_CPU,
math.ceil(len(q_items.content_providers) / file_parsing_cpu_count),
)
provider_chunk_size = max(MIN_FILES_PER_CPU, math.ceil(len(valid_providers) / file_parsing_cpu_count))
provider_chunks = [
dict(list(q_items.content_providers.items())[i:i+provider_chunk_size])
for i in range(0, len(q_items.content_providers), provider_chunk_size)
dict(list(valid_providers.items())[i:i+provider_chunk_size])
for i in range(0, len(valid_providers), provider_chunk_size)
]

with ThreadPoolExecutor(
Expand Down
44 changes: 40 additions & 4 deletions context_chat_backend/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@
from collections.abc import Mapping
from enum import Enum
from io import BytesIO
from typing import Annotated, Literal, Self
from typing import Annotated, Any, Literal, Self

import niquests
from pydantic import AfterValidator, BaseModel, Discriminator, computed_field, field_validator, model_validator
from pydantic import (
AfterValidator,
BaseModel,
Discriminator,
ValidationError,
computed_field,
field_validator,
model_validator,
)

from .mimetype_list import SUPPORTED_MIMETYPES
from .vectordb.types import UpdateAccessOp
Expand Down Expand Up @@ -236,9 +244,37 @@ class Config:
arbitrary_types_allowed = True


class ItemValidationError(BaseModel):
'''Wraps a per-item Pydantic validation failure so the rest of the batch can still be processed.'''
error: str
raw: Any


class FilesQueueItems(BaseModel):
files: Mapping[int, ReceivedFileItem] # [db id]: FileItem
content_providers: Mapping[int, SourceItem] # [db id]: SourceItem
files: Mapping[int, ReceivedFileItem | ItemValidationError] # db id as the key
content_providers: Mapping[int, SourceItem | ItemValidationError] # db id as the key

# so the FilesQueueItems validation does not fail altogether only for one item being invalid
@model_validator(mode='before')
@classmethod
def validate_items_individually(cls, data: Any) -> Any:
if not isinstance(data, dict):
return data
for collection_key, item_cls in (
('files', ReceivedFileItem),
('content_providers', SourceItem),
):
if not isinstance(data.get(collection_key), dict):
continue
result: dict = {}
for k, v in data[collection_key].items():
try:
item_cls.model_validate(v)
result[k] = v
except (ValidationError, ValueError) as e:
result[k] = {'error': str(e), 'raw': v}
data[collection_key] = result
return data


class IndexingException(Exception):
Expand Down
7 changes: 7 additions & 0 deletions context_chat_backend/vectordb/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ def __init__(self, embedding: Embeddings | None = None, **kwargs):
# tried to create the tables but it was already created in another process
# init the client again to detect it already exists, and continue from there
self.client = PGVector(embedding, collection_name=COLLECTION_NAME, **kwargs)
except sa.exc.ProgrammingError as pe: # pyright: ignore[reportAttributeAccessIssue]
if not isinstance(pe.orig, psycopg.errors.DuplicateTable):
raise

# multiple processes raced to CREATE TABLE, all but one lost so tables now exist, retry
# may not be necessary now since initial tables creation happens in main.py now
self.client = PGVector(embedding, collection_name=COLLECTION_NAME, **kwargs)

def get_instance(self) -> VectorStore:
return self.client
Expand Down
5 changes: 5 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from context_chat_backend.controller import app # isort: skip
from context_chat_backend.logger import get_logging_config, setup_logging # isort: skip
from context_chat_backend.utils import is_k8s_env, redact_config # isort: skip
from context_chat_backend.dyn_loader import VectorDBLoader # isort: skip

LOGGER_CONFIG_NAME = 'logger_config.yaml'
LOGGER_K8S_CONFIG_NAME = 'logger_config.k8s.yaml'
Expand Down Expand Up @@ -64,6 +65,10 @@ def _setup_log_levels(debug: bool):
'sqlalchemy',
])

# init the vectordb in the main process before workers are forked so that
# no two worker processes race to CREATE TABLE at the same time
VectorDBLoader(app_config).load()

print(f'CPU count: {cpu_count()}, Memory: {psutil.virtual_memory()}')
print('App config:\n' + redact_config(app_config).model_dump_json(indent=2), flush=True)

Expand Down
Loading