Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ embedder:

# --- Vector Database (Milvus) ---
# Env: VDB_HOST, VDB_PORT, VDB_CONNECTOR_NAME, VDB_COLLECTION_NAME, VDB_HYBRID_SEARCH,
# VDB_ENABLE_INSERTION
# VDB_ENABLE_INSERTION, VDB_TIMEOUT
vectordb:
host: milvus
port: 19530
connector_name: milvus
collection_name: vdb_test
hybrid_search: true
enable: true
timeout: 120.0 # per-request timeout (s) for the Milvus sync/async clients
schema_version: 1 # Increment when the collection schema changes and a migration is required

# --- Relational Database (PostgreSQL) ---
Expand Down Expand Up @@ -257,29 +258,11 @@ loader:

# --- Ray ---
ray:
# Env: RAY_NUM_GPUS, RAY_POOL_SIZE, RAY_MAX_TASKS_PER_WORKER
num_gpus: 0.01
pool_size: 1
max_tasks_per_worker: 8

indexer:
# Env: RAY_MAX_TASK_RETRIES, INDEXER_SERIALIZE_TIMEOUT, VECTORDB_TIMEOUT
max_task_retries: 2
serialize_timeout: 3600
vectordb_timeout: 30
concurrency_groups:
# Env: INDEXER_DEFAULT_CONCURRENCY, INDEXER_UPDATE_CONCURRENCY, etc.
default: 1000
update: 100
search: 100
delete: 100
serialize: 50
chunk: 1000
insert: 100

semaphore:
# Env: RAY_SEMAPHORE_CONCURRENCY
concurrency: 100000
# Env: RAY_POOL_SIZE, RAY_MAX_TASKS_PER_WORKER
# Indexing capacity = pool_size (worker actors) × max_tasks_per_worker (files per worker).
pool_size: 1
max_tasks_per_worker: 50

serve:
# Env: ENABLE_RAY_SERVE, RAY_SERVE_NUM_REPLICAS, RAY_SERVE_HOST,
Expand Down
10 changes: 3 additions & 7 deletions docs/content/docs/documentation/deploy_ray_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ Ensure your `.env` file includes the standard app variables **plus Ray-specific
// .env
# Ray
# Resources for all files
RAY_NUM_GPUS=0.1
RAY_POOL_SIZE=1
RAY_MAX_TASKS_PER_WORKER=5

Expand Down Expand Up @@ -54,15 +53,12 @@ UV_CACHE_DIR=/tmp/uv-cache
```

:::tip[**Tips**]
- `RAY_NUM_GPUS` defines **per-actor resource requirements**. Ray will not start a task until these resources are available on one of the nodes.
For example, if one indexation consumes ~1GB of VRAM and your GPU has 4GB, setting `RAY_NUM_GPUS=0.25` allows you to run **4 indexers per node**. In a 2-node cluster, that means up to **8 concurrent indexation tasks**.

- `RAY_POOL_SIZE` defines the number of worker actors that will be created to handle indexation tasks. It acts like a **maximum concurrency limit**.
Using the previous example, you can set `POOL_SIZE=8` to fully utilize your cluster capacity.
- `RAY_POOL_SIZE` defines the number of indexer worker actors created to handle indexation tasks, and `RAY_MAX_TASKS_PER_WORKER` the number of files each worker processes concurrently.
Total indexing concurrency is `RAY_POOL_SIZE × RAY_MAX_TASKS_PER_WORKER` — e.g. `RAY_POOL_SIZE=8` with `RAY_MAX_TASKS_PER_WORKER=5` allows up to **40 concurrent indexation tasks**. Size these to your cluster capacity.
Comment thread
Ahmath-Gadji marked this conversation as resolved.
:::

:::caution
If other GPU-intensive services are running on your nodes (e.g. vLLM, the RAG API), make sure to **reserve enough GPU memory** for them and subtract that from your total when calculating the safe pool size.
`RAY_POOL_SIZE` and `RAY_MAX_TASKS_PER_WORKER` size **indexing throughput**, not GPU reservation — the indexer workers don't claim GPU memory. GPU on each node is consumed by the model servers (e.g. vLLM) and by the GPU-accelerated parsers, so budget GPU memory through **`MARKER_NUM_GPUS`** / **`MARKER_POOL_SIZE`** (and the Docling equivalents) and the model-server settings rather than through the indexer pool size.
:::

---
Expand Down
32 changes: 3 additions & 29 deletions docs/content/docs/documentation/env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ The vector database stores embeddings and is configured using the following envi
| `VDB_COLLECTION_NAME` | str | vdb_test | Name of the collection storing embeddings |
|`VDB_HYBRID_SEARCH`| `bool` | true |To activate hybrid search (semantic similarity + Keyword search)|
| `VDB_ENABLE_INSERTION` | bool | true | Enable or disable vector database insertion. When disabled, documents are processed but not inserted into Milvus. Useful for testing. |
| `VDB_TIMEOUT` | float | 120.0 | Per-request timeout (seconds) applied to the Milvus sync and async clients |

These variables can be overridden when using an external vector database service.

Expand Down Expand Up @@ -375,8 +376,8 @@ Ray is used for distributed task processing and parallel execution in the RAG pi

| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `RAY_POOL_SIZE` | `int` | 1 | Number of serializer actor instances (typically 1 actor per cluster node) |
| `RAY_MAX_TASKS_PER_WORKER` | `int` | 8 | Maximum number of concurrent tasks (serialization tasks) per serializer actor instance |
| `RAY_POOL_SIZE` | `int` | 1 | Number of indexer worker actors in the pool. Total indexing capacity = `RAY_POOL_SIZE` × `RAY_MAX_TASKS_PER_WORKER`. |
| `RAY_MAX_TASKS_PER_WORKER` | `int` | 50 | Maximum number of files processed concurrently per indexer worker actor |
| `RAY_DASHBOARD_PORT` | `int` | 8265 | Ray Dashboard port used for monitoring. In production, [comment out this line](https://github.com/linagora/openrag/blob/ee732ea8e080dcde0107d62d12703a7525f810cd/docker-compose.yaml#L21C1-L22C1) to avoid exposing the port, as it may introduce security vulnerabilities. |

:::danger[Attention]
Expand All @@ -391,33 +392,6 @@ The following environment variables control Ray's logging behavior, task retry s
| `RAY_ENABLE_UV_RUN_RUNTIME_ENV` | `number` | `0` | Controls UV runtime environment integration. **Critical**: Must be set to `0` when using the newest version of UV to avoid compatibility issues. |
|`RAY_memory_monitor_refresh_ms`| `number` | 250 ms | To control the frequency of memory usage checks and task or actor termination if needed. If you set this value to 0, task killing is disabled. |

#### Indexer Configuration

| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `RAY_MAX_TASK_RETRIES` | int | 2 | Number of retry attempts for failed tasks |
| `INDEXER_SERIALIZE_TIMEOUT` | int | 36000 | Timeout in seconds for serialization operations (10 hours) |

#### Indexer Concurrency Groups

Controls the maximum number of concurrent operations for different indexer tasks:

| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `INDEXER_DEFAULT_CONCURRENCY` | int | 1000 | Default concurrency limit for general operations |
| `INDEXER_UPDATE_CONCURRENCY` | int | 100 | Maximum concurrent document update operations |
| `INDEXER_SERIALIZE_CONCURRENCY` | int | 50 | Maximum concurrent serialization operations |
| `INDEXER_SEARCH_CONCURRENCY` | int | 100 | Maximum concurrent search/retrieval operations |
| `INDEXER_DELETE_CONCURRENCY` | int | 100 | Maximum concurrent document deletion operations |
| `INDEXER_CHUNK_CONCURRENCY` | int | 1000 | Maximum concurrent document chunking operations |
| `INDEXER_INSERT_CONCURRENCY` | int | 10 | Maximum concurrent document insertion operations |

#### Semaphore Configuration

| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `RAY_SEMAPHORE_CONCURRENCY` | int | 100000 | Global concurrency limit for Ray semaphore operations |

#### Ray Serve Configuration
Ray Serve enables deployment of the FastAPI as a scalable service. For simple deployment, without the intend to scale, one can usage the [uvicorn deployment mode](/openrag/documentation/env_vars/#ray-serve-configuration)

Expand Down
1 change: 0 additions & 1 deletion infra/charts/openrag-stack/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ env:
TRANSCRIBER_BASE_URL: "http://{{ .Release.Name }}-whisper-engine-service/v1"

# Ray
RAY_NUM_GPUS: "0.1"
RAY_POOL_SIZE: "3"
RAY_MAX_TASKS_PER_WORKER: "50"
RAY_DASHBOARD_PORT: "8265"
Expand Down
1 change: 0 additions & 1 deletion infra/compose/.env.ollama
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ CHAINLIT_AUTH_SECRET=openrag_local_dev_secret_2026

# RAY & System
RAY_ENABLE_UV_RUN_RUNTIME_ENV=0
RAY_NUM_GPUS=0
RAY_DASHBOARD_PORT=8265
RAY_memory_usage_threshold=0.99
RAY_memory_monitor_refresh_ms=0
Expand Down
31 changes: 6 additions & 25 deletions openrag/core/config/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class VectorDBConfig(ConfigMixin):
collection_name: str = "vdb_test"
hybrid_search: bool = True
enable: bool = True
# Per-request timeout (s) applied to the Milvus sync and async clients.
timeout: float = Field(default=120.0, gt=0)
schema_version: int = 1


Expand Down Expand Up @@ -47,31 +49,14 @@ class RDBConfig(ConfigMixin):


# ---------------------------------------------------------------------------
# Ray — concurrency groups, serve config
# Ray — worker pool, serve config
# ---------------------------------------------------------------------------


class IndexerConcurrencyGroupsConfig(ConfigMixin):
default: int = 1000
update: int = 100
search: int = 100
delete: int = 100
serialize: int = 50
chunk: int = 1000
insert: int = 100


class RayIndexerConfig(ConfigMixin):
max_task_retries: int = 2
serialize_timeout: int = 3600
vectordb_timeout: int = 30
concurrency_groups: IndexerConcurrencyGroupsConfig = Field(
default_factory=IndexerConcurrencyGroupsConfig,
)


class RaySemaphoreConfig(ConfigMixin):
concurrency: int = 100000
# Indexing capacity = pool_size (worker actors) × max_tasks_per_worker (files per worker).
pool_size: int = Field(default=1, ge=1)
max_tasks_per_worker: int = Field(default=50, ge=1)


class RayServeConfig(ConfigMixin):
Expand All @@ -83,11 +68,7 @@ class RayServeConfig(ConfigMixin):


class RayConfig(ConfigMixin):
num_gpus: float = 0.01
pool_size: int = 1
max_tasks_per_worker: int = 8
indexer: RayIndexerConfig = Field(default_factory=RayIndexerConfig)
semaphore: RaySemaphoreConfig = Field(default_factory=RaySemaphoreConfig)
serve: RayServeConfig = Field(default_factory=RayServeConfig)


Expand Down
17 changes: 3 additions & 14 deletions openrag/core/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
("VDB_COLLECTION_NAME", "vectordb.collection_name", str),
("VDB_HYBRID_SEARCH", "vectordb.hybrid_search", bool),
("VDB_ENABLE_INSERTION", "vectordb.enable", bool),
("VDB_TIMEOUT", "vectordb.timeout", float),
# RDB (Postgres)
("POSTGRES_HOST", "rdb.host", str),
("POSTGRES_PORT", "rdb.port", int),
Expand Down Expand Up @@ -127,20 +128,8 @@
("OPENAI_LOADER_CONCURRENCY_LIMIT", "loader.openai.concurrency_limit", int),
("OPENAI_LOADER_ENABLE_THINKING", "loader.openai.enable_thinking", bool),
# Ray
("RAY_NUM_GPUS", "ray.num_gpus", float),
("RAY_POOL_SIZE", "ray.pool_size", int),
("RAY_MAX_TASKS_PER_WORKER", "ray.max_tasks_per_worker", int),
("RAY_MAX_TASK_RETRIES", "ray.indexer.max_task_retries", int),
("INDEXER_SERIALIZE_TIMEOUT", "ray.indexer.serialize_timeout", int),
("VECTORDB_TIMEOUT", "ray.indexer.vectordb_timeout", int),
("INDEXER_DEFAULT_CONCURRENCY", "ray.indexer.concurrency_groups.default", int),
("INDEXER_UPDATE_CONCURRENCY", "ray.indexer.concurrency_groups.update", int),
("INDEXER_SEARCH_CONCURRENCY", "ray.indexer.concurrency_groups.search", int),
("INDEXER_DELETE_CONCURRENCY", "ray.indexer.concurrency_groups.delete", int),
("INDEXER_SERIALIZE_CONCURRENCY", "ray.indexer.concurrency_groups.serialize", int),
("INDEXER_CHUNK_CONCURRENCY", "ray.indexer.concurrency_groups.chunk", int),
("INDEXER_INSERT_CONCURRENCY", "ray.indexer.concurrency_groups.insert", int),
("RAY_SEMAPHORE_CONCURRENCY", "ray.semaphore.concurrency", int),
("RAY_POOL_SIZE", "ray.indexer.pool_size", int),
("RAY_MAX_TASKS_PER_WORKER", "ray.indexer.max_tasks_per_worker", int),
("ENABLE_RAY_SERVE", "ray.serve.enable", bool),
("RAY_SERVE_NUM_REPLICAS", "ray.serve.num_replicas", int),
("RAY_SERVE_HOST", "ray.serve.host", str),
Expand Down
31 changes: 14 additions & 17 deletions openrag/services/orchestrators/model_endpoint_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,31 @@ async def seed_defaults(self) -> None:
logger.info(f"Seeded default {model_type} endpoint '{row.name}'.")

def _build_default_seeds(self) -> dict[str, dict[str, Any]]:
"""Build seed data from env overrides + existing Settings fallbacks."""
"""Build seed data from env overrides + existing Settings fallbacks.

The ``*_ENDPOINT`` / ``*_MODEL`` env vars below are seed-specific names
the config loader does NOT map onto ``Settings``, so they are read here
directly. The api-key env vars (``API_KEY``, ``EMBEDDER_API_KEY``, ...)
ARE mapped by the loader (loader.py), so ``s.<type>.api_key`` already
reflects any env override — reading them via ``os.getenv`` again would
be redundant double-handling (and non-deterministic when a local .env is
loaded into the process via ``load_dotenv``).
"""
s = self._config
return {
"embedder": {
"endpoint": os.getenv("EMBEDDER_ENDPOINT", s.embedder.base_url),
"model_name": os.getenv("EMBEDDING_MODEL", s.embedder.model_name),
"batch_size": s.embedder.batch_size,
"timeout": s.embedder.timeout,
"extra": _with_api_key(
{"implementation": "vllm"},
os.getenv("EMBEDDER_API_KEY", s.embedder.api_key),
),
"extra": _with_api_key({"implementation": "vllm"}, s.embedder.api_key),
},
"llm": {
"endpoint": os.getenv("LLM_ENDPOINT", s.llm.base_url),
"model_name": os.getenv("LLM_MODEL", s.llm.model),
"timeout": s.llm.timeout,
"extra": _with_enable_thinking(
_with_api_key(
{"implementation": "vllm"},
os.getenv("API_KEY", s.llm.api_key),
),
_with_api_key({"implementation": "vllm"}, s.llm.api_key),
s.llm.enable_thinking,
),
},
Expand All @@ -128,10 +131,7 @@ def _build_default_seeds(self) -> dict[str, dict[str, Any]]:
"model_name": os.getenv("VLM_MODEL", s.vlm.model),
"timeout": s.vlm.timeout,
"extra": _with_enable_thinking(
_with_api_key(
{"implementation": "vllm"},
os.getenv("VLM_API_KEY", s.vlm.api_key),
),
_with_api_key({"implementation": "vllm"}, s.vlm.api_key),
s.vlm.enable_thinking,
),
},
Expand All @@ -145,10 +145,7 @@ def _build_default_seeds(self) -> dict[str, dict[str, Any]]:
"endpoint": os.getenv("RERANKER_ENDPOINT", s.reranker.base_url),
"model_name": os.getenv("RERANKER_MODEL", s.reranker.model_name),
"timeout": s.reranker.timeout,
"extra": _with_api_key(
{"implementation": s.reranker.provider},
os.getenv("RERANKER_API_KEY", s.reranker.api_key),
),
"extra": _with_api_key({"implementation": s.reranker.provider}, s.reranker.api_key),
},
}

Expand Down
2 changes: 1 addition & 1 deletion openrag/services/storage/milvus_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def __init__(self, config: VectorDBConfig) -> None:
self._collection_name = config.collection_name
self._hybrid = config.hybrid_search
self._uri = f"http://{config.host}:{config.port}"
self._timeout = 60
self._timeout = config.timeout
try:
self._client = MilvusClient(uri=self._uri, timeout=self._timeout)
self._async_client = AsyncMilvusClient(uri=self._uri, timeout=self._timeout)
Expand Down
28 changes: 18 additions & 10 deletions openrag/services/workers/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,25 @@ async def dispatch_indexing(
task_description=f"set_details({task_id})",
)

task = self._pool.process_file.remote(
task_id=task_id,
path=path,
metadata=metadata,
partition=partition,
user=user,
workspace_ids=workspace_ids,
replace=replace,
indexation_config=indexation_config,
embedder_name=embedder_name,
# ``IndexerPool`` is a Ray actor; ``submit`` returns ``[worker_ref]``
# (wrapped so Ray doesn't auto-dereference and block on the worker task).
# Awaiting the submit call yields that list; element 0 is the worker ref
# that ``cancel_task``/``ray.cancel`` must target.
submitted = await self._call(
self._pool.submit.remote(
task_id=task_id,
path=path,
metadata=metadata,
partition=partition,
user=user,
workspace_ids=workspace_ids,
replace=replace,
indexation_config=indexation_config,
embedder_name=embedder_name,
),
task_description=f"submit({task_id})",
)
task = submitted[0]

await self._call(
self._tsm.set_object_ref.remote(task_id, {"ref": task}),
Expand Down
Loading
Loading