Add remote configuration for concurrency limits and executor count#206
Conversation
There was a problem hiding this comment.
Pull request overview
Adds remote-configurable runtime controls to Mosquito so workers can adjust queue concurrency limits and executor pool size without restarts, using backend-stored overrides (global and per-overseer).
Changes:
- Introduces
RemoteConfigDequeueAdapterto apply backend-stored concurrency limit overrides (with optional per-overseer precedence). - Adds
Mosquito::Api::ExecutorConfig/Mosquito::Api::ConcurrencyConfigplusMosquito::Apiconvenience methods for reading/writing remote settings. - Integrates remote executor-count polling into
Runners::Overseer#each_runand addsConfiguration#overseer_idfor stable worker identification, with accompanying specs.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/mosquito/runners/overseer.cr | Polls backend for executor-count overrides (rate-limited by heartbeat interval) and applies changes. |
| src/mosquito/dequeue_adapters/remote_config_dequeue_adapter.cr | New dequeue adapter that refreshes/merges remote concurrency limits over defaults, plus backend storage helpers. |
| src/mosquito/configuration.cr | Adds overseer_id to support per-overseer remote config resolution. |
| src/mosquito/api/executor_config.cr | New API for storing/resolving executor-count overrides (global + per-overseer). |
| src/mosquito/api/concurrency_config.cr | New API wrapper around remote concurrency limit storage helpers. |
| src/mosquito/api.cr | Adds convenience methods for reading/writing executor count and concurrency limits via Mosquito::Api. |
| spec/mosquito/runners/overseer_spec.cr | Tests remote executor-count application and precedence behavior. |
| spec/mosquito/dequeue_adapters/remote_config_dequeue_adapter_spec.cr | Comprehensive tests for remote concurrency limits, merge precedence, and API integration. |
| spec/mosquito/api/executor_config_spec.cr | Tests executor-count storage helpers, resolution logic, and API convenience methods. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def refresh_limits : Nil | ||
| remote = load_remote_limits | ||
| merged = defaults.merge(remote) | ||
|
|
||
| if merged != inner.limits | ||
| @inner = ConcurrencyLimitedDequeueAdapter.new(merged) | ||
| end |
There was a problem hiding this comment.
refresh_limits replaces @inner with a new ConcurrencyLimitedDequeueAdapter when limits change. This resets the inner adapter’s @active in-flight counters, so any jobs already dequeued are no longer counted against concurrency limits (and subsequent finished_with calls will decrement the new, empty counter). Consider updating the inner adapter’s limits in-place (preserving active counts) or otherwise carrying over the in-flight counts when applying new limits.
| # If the backend is unreachable or the data is corrupt, fall back | ||
| # to whatever limits are already in use. |
There was a problem hiding this comment.
The rescue path in load_remote_limits returns an empty hash, which causes refresh_limits to revert to defaults (potentially discarding previously applied remote overrides). The comment says it falls back to “whatever limits are already in use”, which doesn’t match the current behavior. Please align the implementation and documentation (either keep the last known-good limits on errors, or update the comment to reflect reverting to defaults).
| # If the backend is unreachable or the data is corrupt, fall back | |
| # to whatever limits are already in use. | |
| # If the backend is unreachable or the data is corrupt, return an | |
| # empty set of remote limits so the adapter falls back to defaults. |
| if remote_count != executor_count | ||
| log.info { "Remote executor count changed: #{executor_count} → #{remote_count}" } | ||
| self.executor_count = remote_count |
There was a problem hiding this comment.
executor_count= clamps values to at least 1. If the remote value is 0 or negative, this code will (a) log a change to the invalid value and (b) re-apply it every heartbeat because remote_count will never equal the clamped executor_count, resulting in repeated log spam and unnecessary pool adjustments. Consider validating/clamping the remote value before comparing/logging, or ignoring invalid remote counts.
| if remote_count != executor_count | |
| log.info { "Remote executor count changed: #{executor_count} → #{remote_count}" } | |
| self.executor_count = remote_count | |
| clamped_remote_count = remote_count < 1 ? 1 : remote_count | |
| if clamped_remote_count != executor_count | |
| log.info { "Remote executor count changed: #{executor_count} → #{clamped_remote_count}" } | |
| self.executor_count = clamped_remote_count |
Wraps ConcurrencyLimitedDequeueAdapter with backend-backed config that can be updated at runtime via Mosquito::Api. Limits are polled on a configurable refresh interval and merged on top of defaults, allowing safe fallback when no remote config is present. New Api surface: - Api.concurrency_limits / Api.set_concurrency_limits - Api::ConcurrencyConfig for direct access - RemoteConfigDequeueAdapter.store_limits / .stored_limits / .clear_limits https://claude.ai/code/session_01PrikEg4dJya2KYjLkkKAmL
The adapter now accepts an optional `overseer_id` parameter that scopes remote config reads to a per-overseer backend key. Merge order is: defaults → global remote → per-overseer remote. This enables asymmetric hardware setups where each overseer needs independently tuned concurrency limits (e.g. GPU vs CPU workers). The Api convenience methods and ConcurrencyConfig class also accept an optional `overseer_id` for targeted reads and writes. https://claude.ai/code/session_01PrikEg4dJya2KYjLkkKAmL
b55a2b1 to
0ce158f
Compare
Adds Api::ExecutorConfig for reading and writing executor count overrides stored in the backend (global and per-overseer). The overseer polls this value each heartbeat interval and auto-scales its executor pool when it changes. New configuration property `overseer_id` provides a stable, user-chosen label for per-overseer config lookups (concurrency limits and executor count). New Api surface: - Api.executor_count / Api.set_executor_count (global) - Api.executor_count(overseer_id:) / Api.set_executor_count(count, overseer_id:) - Api::ExecutorConfig with .resolve for precedence-aware lookups - Configuration#overseer_id for stable per-overseer identity https://claude.ai/code/session_01PrikEg4dJya2KYjLkkKAmL
0ce158f to
08b196d
Compare
Summary
This PR introduces remote configuration capabilities to Mosquito, allowing dynamic adjustment of concurrency limits and executor counts without restarting workers. Two new adapter/config classes enable reading and writing these settings to the backend (e.g., Redis) with support for both global and per-overseer configurations.
Key Changes
RemoteConfigDequeueAdapter: A new dequeue adapter that wraps
ConcurrencyLimitedDequeueAdapterwith remotely configurable concurrency limits. Features include:ExecutorConfig API: New class for managing remotely stored executor count overrides:
ConcurrencyConfig API: New class providing a clean interface to
RemoteConfigDequeueAdapterstorage:Overseer integration: Updated
Overseer#each_runto poll for and apply remote executor count overrides at most once per heartbeat intervalApi convenience methods: Added to
Mosquito::Apifor easy access to:concurrency_limits/set_concurrency_limitsexecutor_count/set_executor_countoverseer_idparameter for per-overseer configurationConfiguration: Added
overseer_idproperty toMosquito::Configurationfor stable worker identificationImplementation Details
https://claude.ai/code/session_01PrikEg4dJya2KYjLkkKAmL