Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ embedder:

# --- Vector Database (Milvus) ---
# Env: VDB_HOST, VDB_PORT, VDB_CONNECTOR_NAME, VDB_COLLECTION_NAME, VDB_HYBRID_SEARCH,
# VDB_ENABLE_INSERTION
# VDB_ENABLE_INSERTION, VDB_TIMEOUT
vectordb:
host: milvus
port: 19530
connector_name: milvus
collection_name: vdb_test
hybrid_search: true
enable: true
timeout: 120.0 # per-request timeout (s) for the Milvus sync/async clients
schema_version: 1 # Increment when the collection schema changes and a migration is required

# --- Relational Database (PostgreSQL) ---
Expand Down Expand Up @@ -257,29 +258,11 @@ loader:

# --- Ray ---
ray:
# Env: RAY_NUM_GPUS, RAY_POOL_SIZE, RAY_MAX_TASKS_PER_WORKER
num_gpus: 0.01
pool_size: 1
max_tasks_per_worker: 8

indexer:
# Env: RAY_MAX_TASK_RETRIES, INDEXER_SERIALIZE_TIMEOUT, VECTORDB_TIMEOUT
max_task_retries: 2
serialize_timeout: 3600
vectordb_timeout: 30
concurrency_groups:
# Env: INDEXER_DEFAULT_CONCURRENCY, INDEXER_UPDATE_CONCURRENCY, etc.
default: 1000
update: 100
search: 100
delete: 100
serialize: 50
chunk: 1000
insert: 100

semaphore:
# Env: RAY_SEMAPHORE_CONCURRENCY
concurrency: 100000
# Env: RAY_POOL_SIZE, RAY_MAX_TASKS_PER_WORKER
# Indexing capacity = pool_size (worker actors) × max_tasks_per_worker (files per worker).
pool_size: 1
max_tasks_per_worker: 50

serve:
# Env: ENABLE_RAY_SERVE, RAY_SERVE_NUM_REPLICAS, RAY_SERVE_HOST,
Expand Down
8 changes: 2 additions & 6 deletions docs/content/docs/documentation/deploy_ray_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ Ensure your `.env` file includes the standard app variables **plus Ray-specific
// .env
# Ray
# Resources for all files
RAY_NUM_GPUS=0.1
RAY_POOL_SIZE=1
RAY_MAX_TASKS_PER_WORKER=5

Expand Down Expand Up @@ -54,11 +53,8 @@ UV_CACHE_DIR=/tmp/uv-cache
```

:::tip[**Tips**]
- `RAY_NUM_GPUS` defines **per-actor resource requirements**. Ray will not start a task until these resources are available on one of the nodes.
For example, if one indexation consumes ~1GB of VRAM and your GPU has 4GB, setting `RAY_NUM_GPUS=0.25` allows you to run **4 indexers per node**. In a 2-node cluster, that means up to **8 concurrent indexation tasks**.

- `RAY_POOL_SIZE` defines the number of worker actors that will be created to handle indexation tasks. It acts like a **maximum concurrency limit**.
Using the previous example, you can set `POOL_SIZE=8` to fully utilize your cluster capacity.
- `RAY_POOL_SIZE` defines the number of indexer worker actors created to handle indexation tasks, and `RAY_MAX_TASKS_PER_WORKER` the number of files each worker processes concurrently.
Total indexing concurrency is `RAY_POOL_SIZE × RAY_MAX_TASKS_PER_WORKER` — e.g. `RAY_POOL_SIZE=8` with `RAY_MAX_TASKS_PER_WORKER=5` allows up to **40 concurrent indexation tasks**. Size these to your cluster capacity.
Comment thread
Ahmath-Gadji marked this conversation as resolved.
:::

:::caution
Expand Down
23 changes: 5 additions & 18 deletions docs/content/docs/documentation/env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ The vector database stores embeddings and is configured using the following envi
| `VDB_COLLECTION_NAME` | str | vdb_test | Name of the collection storing embeddings |
|`VDB_HYBRID_SEARCH`| `bool` | true |To activate hybrid search (semantic similarity + Keyword search)|
| `VDB_ENABLE_INSERTION` | bool | true | Enable or disable vector database insertion. When disabled, documents are processed but not inserted into Milvus. Useful for testing. |
| `VDB_TIMEOUT` | float | 120.0 | Per-request timeout (seconds) applied to the Milvus sync and async clients |

These variables can be overridden when using an external vector database service.

Expand Down Expand Up @@ -375,8 +376,8 @@ Ray is used for distributed task processing and parallel execution in the RAG pi

| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `RAY_POOL_SIZE` | `int` | 1 | Number of serializer actor instances (typically 1 actor per cluster node) |
| `RAY_MAX_TASKS_PER_WORKER` | `int` | 8 | Maximum number of concurrent tasks (serialization tasks) per serializer actor instance |
| `RAY_POOL_SIZE` | `int` | 1 | Number of indexer worker actors in the pool. Total indexing capacity = `RAY_POOL_SIZE` × `RAY_MAX_TASKS_PER_WORKER`. |
| `RAY_MAX_TASKS_PER_WORKER` | `int` | 8 | Maximum number of files processed concurrently per indexer worker actor |
| `RAY_DASHBOARD_PORT` | `int` | 8265 | Ray Dashboard port used for monitoring. In production, [comment out this line](https://github.com/linagora/openrag/blob/ee732ea8e080dcde0107d62d12703a7525f810cd/docker-compose.yaml#L21C1-L22C1) to avoid exposing the port, as it may introduce security vulnerabilities. |

:::danger[Attention]
Expand All @@ -395,22 +396,8 @@ The following environment variables control Ray's logging behavior, task retry s

| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `RAY_MAX_TASK_RETRIES` | int | 2 | Number of retry attempts for failed tasks |
| `INDEXER_SERIALIZE_TIMEOUT` | int | 36000 | Timeout in seconds for serialization operations (10 hours) |

#### Indexer Concurrency Groups

Controls the maximum number of concurrent operations for different indexer tasks:

| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `INDEXER_DEFAULT_CONCURRENCY` | int | 1000 | Default concurrency limit for general operations |
| `INDEXER_UPDATE_CONCURRENCY` | int | 100 | Maximum concurrent document update operations |
| `INDEXER_SERIALIZE_CONCURRENCY` | int | 50 | Maximum concurrent serialization operations |
| `INDEXER_SEARCH_CONCURRENCY` | int | 100 | Maximum concurrent search/retrieval operations |
| `INDEXER_DELETE_CONCURRENCY` | int | 100 | Maximum concurrent document deletion operations |
| `INDEXER_CHUNK_CONCURRENCY` | int | 1000 | Maximum concurrent document chunking operations |
| `INDEXER_INSERT_CONCURRENCY` | int | 10 | Maximum concurrent document insertion operations |
| `RAY_POOL_SIZE` | int | 1 | Number of indexer worker actors created to handle indexation tasks |
| `RAY_MAX_TASKS_PER_WORKER` | int | 50 | Files each indexer worker processes concurrently (total concurrency = `RAY_POOL_SIZE × RAY_MAX_TASKS_PER_WORKER`) |

#### Semaphore Configuration

Expand Down
1 change: 0 additions & 1 deletion infra/charts/openrag-stack/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ env:
TRANSCRIBER_BASE_URL: "http://{{ .Release.Name }}-whisper-engine-service/v1"

# Ray
RAY_NUM_GPUS: "0.1"
RAY_POOL_SIZE: "3"
RAY_MAX_TASKS_PER_WORKER: "50"
RAY_DASHBOARD_PORT: "8265"
Expand Down
1 change: 0 additions & 1 deletion infra/compose/.env.ollama
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ CHAINLIT_AUTH_SECRET=openrag_local_dev_secret_2026

# RAY & System
RAY_ENABLE_UV_RUN_RUNTIME_ENV=0
RAY_NUM_GPUS=0
RAY_DASHBOARD_PORT=8265
RAY_memory_usage_threshold=0.99
RAY_memory_monitor_refresh_ms=0
Expand Down
31 changes: 6 additions & 25 deletions openrag/core/config/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class VectorDBConfig(ConfigMixin):
collection_name: str = "vdb_test"
hybrid_search: bool = True
enable: bool = True
# Per-request timeout (s) applied to the Milvus sync and async clients.
timeout: float = 120.0
Comment thread
Ahmath-Gadji marked this conversation as resolved.
Outdated
schema_version: int = 1


Expand Down Expand Up @@ -47,31 +49,14 @@ class RDBConfig(ConfigMixin):


# ---------------------------------------------------------------------------
# Ray — concurrency groups, serve config
# Ray — worker pool, serve config
# ---------------------------------------------------------------------------


class IndexerConcurrencyGroupsConfig(ConfigMixin):
default: int = 1000
update: int = 100
search: int = 100
delete: int = 100
serialize: int = 50
chunk: int = 1000
insert: int = 100


class RayIndexerConfig(ConfigMixin):
max_task_retries: int = 2
serialize_timeout: int = 3600
vectordb_timeout: int = 30
concurrency_groups: IndexerConcurrencyGroupsConfig = Field(
default_factory=IndexerConcurrencyGroupsConfig,
)


class RaySemaphoreConfig(ConfigMixin):
concurrency: int = 100000
# Indexing capacity = pool_size (worker actors) × max_tasks_per_worker (files per worker).
pool_size: int = Field(default=1, ge=1)
max_tasks_per_worker: int = Field(default=50, ge=1)


class RayServeConfig(ConfigMixin):
Expand All @@ -83,11 +68,7 @@ class RayServeConfig(ConfigMixin):


class RayConfig(ConfigMixin):
num_gpus: float = 0.01
pool_size: int = 1
max_tasks_per_worker: int = 8
indexer: RayIndexerConfig = Field(default_factory=RayIndexerConfig)
semaphore: RaySemaphoreConfig = Field(default_factory=RaySemaphoreConfig)
serve: RayServeConfig = Field(default_factory=RayServeConfig)


Expand Down
17 changes: 3 additions & 14 deletions openrag/core/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
("VDB_COLLECTION_NAME", "vectordb.collection_name", str),
("VDB_HYBRID_SEARCH", "vectordb.hybrid_search", bool),
("VDB_ENABLE_INSERTION", "vectordb.enable", bool),
("VDB_TIMEOUT", "vectordb.timeout", float),
# RDB (Postgres)
("POSTGRES_HOST", "rdb.host", str),
("POSTGRES_PORT", "rdb.port", int),
Expand Down Expand Up @@ -127,20 +128,8 @@
("OPENAI_LOADER_CONCURRENCY_LIMIT", "loader.openai.concurrency_limit", int),
("OPENAI_LOADER_ENABLE_THINKING", "loader.openai.enable_thinking", bool),
# Ray
("RAY_NUM_GPUS", "ray.num_gpus", float),
("RAY_POOL_SIZE", "ray.pool_size", int),
("RAY_MAX_TASKS_PER_WORKER", "ray.max_tasks_per_worker", int),
("RAY_MAX_TASK_RETRIES", "ray.indexer.max_task_retries", int),
("INDEXER_SERIALIZE_TIMEOUT", "ray.indexer.serialize_timeout", int),
("VECTORDB_TIMEOUT", "ray.indexer.vectordb_timeout", int),
("INDEXER_DEFAULT_CONCURRENCY", "ray.indexer.concurrency_groups.default", int),
("INDEXER_UPDATE_CONCURRENCY", "ray.indexer.concurrency_groups.update", int),
("INDEXER_SEARCH_CONCURRENCY", "ray.indexer.concurrency_groups.search", int),
("INDEXER_DELETE_CONCURRENCY", "ray.indexer.concurrency_groups.delete", int),
("INDEXER_SERIALIZE_CONCURRENCY", "ray.indexer.concurrency_groups.serialize", int),
("INDEXER_CHUNK_CONCURRENCY", "ray.indexer.concurrency_groups.chunk", int),
("INDEXER_INSERT_CONCURRENCY", "ray.indexer.concurrency_groups.insert", int),
("RAY_SEMAPHORE_CONCURRENCY", "ray.semaphore.concurrency", int),
("RAY_POOL_SIZE", "ray.indexer.pool_size", int),
("RAY_MAX_TASKS_PER_WORKER", "ray.indexer.max_tasks_per_worker", int),
("ENABLE_RAY_SERVE", "ray.serve.enable", bool),
("RAY_SERVE_NUM_REPLICAS", "ray.serve.num_replicas", int),
("RAY_SERVE_HOST", "ray.serve.host", str),
Expand Down
31 changes: 14 additions & 17 deletions openrag/services/orchestrators/model_endpoint_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,36 +96,36 @@ async def seed_defaults(self) -> None:
logger.info(f"Seeded default {model_type} endpoint '{row.name}'.")

def _build_default_seeds(self) -> dict[str, dict[str, Any]]:
"""Build seed data from env overrides + existing Settings fallbacks."""
"""Build seed data from env overrides + existing Settings fallbacks.

The ``*_ENDPOINT`` / ``*_MODEL`` env vars below are seed-specific names
the config loader does NOT map onto ``Settings``, so they are read here
directly. The api-key env vars (``API_KEY``, ``EMBEDDER_API_KEY``, ...)
ARE mapped by the loader (loader.py), so ``s.<type>.api_key`` already
reflects any env override — reading them via ``os.getenv`` again would
be redundant double-handling (and non-deterministic when a local .env is
loaded into the process via ``load_dotenv``).
"""
s = self._config
return {
"embedder": {
"endpoint": os.getenv("EMBEDDER_ENDPOINT", s.embedder.base_url),
"model_name": os.getenv("EMBEDDING_MODEL", s.embedder.model_name),
"extra": _with_api_key(
{"implementation": "vllm"},
os.getenv("EMBEDDER_API_KEY", s.embedder.api_key),
),
"extra": _with_api_key({"implementation": "vllm"}, s.embedder.api_key),
},
"llm": {
"endpoint": os.getenv("LLM_ENDPOINT", s.llm.base_url),
"model_name": os.getenv("LLM_MODEL", s.llm.model),
"extra": _with_enable_thinking(
_with_api_key(
{"implementation": "vllm"},
os.getenv("API_KEY", s.llm.api_key),
),
_with_api_key({"implementation": "vllm"}, s.llm.api_key),
s.llm.enable_thinking,
),
},
"vlm": {
"endpoint": os.getenv("VLM_ENDPOINT", s.vlm.base_url),
"model_name": os.getenv("VLM_MODEL", s.vlm.model),
"extra": _with_enable_thinking(
_with_api_key(
{"implementation": "vllm"},
os.getenv("VLM_API_KEY", s.vlm.api_key),
),
_with_api_key({"implementation": "vllm"}, s.vlm.api_key),
s.vlm.enable_thinking,
),
},
Expand All @@ -138,10 +138,7 @@ def _build_default_seeds(self) -> dict[str, dict[str, Any]]:
# remains available for per-partition opt-in.
"endpoint": os.getenv("RERANKER_ENDPOINT", s.reranker.base_url),
"model_name": os.getenv("RERANKER_MODEL", s.reranker.model_name),
"extra": _with_api_key(
{"implementation": s.reranker.provider},
os.getenv("RERANKER_API_KEY", s.reranker.api_key),
),
"extra": _with_api_key({"implementation": s.reranker.provider}, s.reranker.api_key),
},
}

Expand Down
2 changes: 1 addition & 1 deletion openrag/services/storage/milvus_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def __init__(self, config: VectorDBConfig) -> None:
self._collection_name = config.collection_name
self._hybrid = config.hybrid_search
self._uri = f"http://{config.host}:{config.port}"
self._timeout = 60
self._timeout = config.timeout
try:
self._client = MilvusClient(uri=self._uri, timeout=self._timeout)
self._async_client = AsyncMilvusClient(uri=self._uri, timeout=self._timeout)
Expand Down
2 changes: 1 addition & 1 deletion openrag/services/workers/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def dispatch_indexing(
task_description=f"set_details({task_id})",
)

task = self._pool.process_file.remote(
task = self._pool.submit(
task_id=task_id,
path=path,
metadata=metadata,
Expand Down
Loading
Loading