Skip to content

Commit ca5f70f

Browse files
DiegoTavaresclaude
andauthored
[scheduler] Optimization pass (#2411)
Apply a series of optimizations suggested by Claude Fable. All have been reviewed line by line and tested on a dev environment. [[scheduler] Fix layer dispatch order lost when grouping query rows](e9e2479) [[scheduler] Fix tag priority sort that was not a total order](0e32585) [[scheduler] Only fetch host groups from DB when absent or expired](b31e9c1) [[scheduler] Fix ghost entries and stale re-index in the host cache index](a7f8dbc) [[scheduler] Don't return stale hosts as successful checkout candidates](f0c5e34) [[scheduler] Fix GPU memory unit mismatch in host resource updates](afacfd4) [[scheduler] Use a transaction-scoped advisory lock for host dispatch](bca100c) [[scheduler] Don't stop the whole feed on a transient job query error](87c8117) [[scheduler] Back off clusters when a full pass dispatches zero frames](a51e45e) [[scheduler] Refresh host reservations on re-reserve](2ab87ce) [[scheduler] Apply rustfmt to recently touched lines](3e54401) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Pinned dependency to avoid a build break; prevent CI from resolving the problematic release * Avoid full scheduler shutdown on transient job-fetch errors * **New Features** * Session-level metrics for dispatched frames and resource-limit events with periodic logging * **Configuration** * Added configurable sleep when a cluster is saturated but makes no progress * **Improvements** * Host cache and reservation consistency improvements; safer transaction-scoped locking * Reduced logging verbosity for some dispatch events * **Tests** * Adjusted and added tests to cover cache/regression scenarios <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: Diego Tavares <dtavares@imageworks.com> Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent f9acdd0 commit ca5f70f

16 files changed

Lines changed: 550 additions & 205 deletions

File tree

rust/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,13 @@ tracing = "0.1.40"
3535
tracing-appender = "0.2.3"
3636
tracing-rolling-file = "0.1.2"
3737
tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter"] }
38+
# Pin time to 0.3.47: the 0.3.48 release adds an impl that collides with
39+
# sentry-types' blanket `impl<T> From<T> for LogAttribute`, producing an
40+
# E0119 "conflicting implementations" error when building the scheduler
41+
# (which pulls both `sentry` and, via tracing-appender, `time/formatting`).
42+
# Since Cargo.lock is gitignored, CI resolves fresh and would otherwise float
43+
# to the broken 0.3.48. Remove the pin once sentry-types/time resolve this
44+
# upstream. Referenced by the scheduler crate so the constraint takes effect.
45+
time = "=0.3.47"
3846
structopt = "0.3.26"
3947
home = "=0.5.9"

rust/crates/scheduler/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ lazy_static = "1.5"
5454
moka = { version = "0.12.10", features = ["future"] }
5555
prometheus = "0.13"
5656
sentry = { version = "0.47", features = ["tracing"] }
57+
# Version constraint only — see the `time` pin note in the workspace Cargo.toml.
58+
# Not used directly; declared so the `=0.3.47` pin applies to resolution.
59+
time = { workspace = true }
5760
axum = "0.7"
5861
tower-http = { version = "0.5", features = ["trace"] }
5962
urlencoding = "2.1"

rust/crates/scheduler/src/accounting/recompute.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::accounting::redis_client::ReseedOp;
4040
use crate::accounting::AccountingService;
4141
use crate::config::CONFIG;
4242
use crate::dao::ResourceAccountingDao;
43+
use crate::metrics;
4344
use crate::models::CoreSize;
4445

4546
pub fn spawn_loop(service: Arc<AccountingService>) {
@@ -51,11 +52,35 @@ pub fn spawn_loop(service: Arc<AccountingService>) {
5152
return;
5253
}
5354
};
54-
let mut interval = time::interval(CONFIG.accounting.recompute_interval);
55+
let interval_dur = CONFIG.accounting.recompute_interval;
56+
let mut interval = time::interval(interval_dur);
5557
// Skip the immediate first tick - bootstrap reseed already ran at startup.
5658
interval.tick().await;
59+
// Dispatch heartbeat baseline: snapshot the session counters so the first
60+
// logged delta only covers events after this point.
61+
let mut last_dispatched = metrics::frames_dispatched_session();
62+
let mut last_limit_exceeded = metrics::resource_limit_exceeded_session();
5763
loop {
5864
interval.tick().await;
65+
66+
// Dispatch heartbeat: the aggregate INFO that replaces the demoted
67+
// per-frame dispatch logs. Decoupled from the accounting reseed below.
68+
let current_dispatched = metrics::frames_dispatched_session();
69+
let dispatched_delta = current_dispatched.saturating_sub(last_dispatched);
70+
last_dispatched = current_dispatched;
71+
72+
let current_limit_exceeded = metrics::resource_limit_exceeded_session();
73+
let limit_exceeded_delta =
74+
current_limit_exceeded.saturating_sub(last_limit_exceeded);
75+
last_limit_exceeded = current_limit_exceeded;
76+
77+
info!(
78+
"Dispatched {} frames in the last {}ms ({} resource-limit-exceeded)",
79+
dispatched_delta,
80+
interval_dur.as_millis(),
81+
limit_exceeded_delta
82+
);
83+
5984
let result = AssertUnwindSafe(async {
6085
if let Err(err) = run_once(&service, &pg_dao).await {
6186
warn!("Recompute cycle failed: {err}");

rust/crates/scheduler/src/accounting/redis_client.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ impl RedisAccounting {
141141

142142
/// Reads the subscription hash's booked cores + burst in one round-trip from
143143
/// `acct:sub:{show_id}:{alloc_id}` (fields `int_cores`, `burst`, both in
144-
/// centicores). Missing keys/fields are treated as `(0, 0)`. Non-authoritative:
144+
/// cores — the centicore→core conversion happens once at the reseed write
145+
/// boundary, see the `lua.rs` unit invariant). Missing keys/fields are
146+
/// treated as `(0, 0)`. Non-authoritative:
145147
/// the dispatcher's Lua `BOOK_OR_FORCE` call remains the source of truth for
146148
/// the booking decision; this is a snapshot suitable for optimistic pre-filters
147149
/// and scoring inputs.
@@ -158,8 +160,9 @@ impl RedisAccounting {
158160
Ok((booked, burst))
159161
}
160162

161-
/// Reads `acct:job:{job_id}` `int_cores` (live booked cores, in centicores).
162-
/// Returns 0 when the key/field is missing. Used by the E-PVM placement
163+
/// Reads `acct:job:{job_id}` `int_cores` (live booked cores, in cores — see
164+
/// the `lua.rs` unit invariant; Redis accounting counters are never stored
165+
/// in centicores). Returns 0 when the key/field is missing. Used by the E-PVM placement
163166
/// snapshot in `MatchingService::process_layer` (design Branch 2a).
164167
pub async fn read_job_cores_in_use(
165168
&self,

rust/crates/scheduler/src/config/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ pub struct QueueConfig {
146146
/// The reload only swaps the live set when it actually changed.
147147
#[serde(with = "humantime_serde")]
148148
pub cluster_reload_interval: Duration,
149+
/// Duration a cluster sleeps after a pass found jobs but dispatched zero
150+
/// frames (saturated farm: no host candidate fits any pending layer).
151+
/// Keeps the loop from re-querying jobs and layers continuously while
152+
/// nothing can be placed. Should stay in the same order of magnitude as
153+
/// the host cache refresh interval so freed hosts are picked up promptly.
154+
#[serde(with = "humantime_serde")]
155+
pub cluster_saturated_sleep: Duration,
149156
pub stream: StreamConfig,
150157
/// Maximum number of jobs returned per cluster pass. Caps the per-pass
151158
/// dispatch cost so a big-show cluster doesn't iterate thousands of jobs
@@ -176,6 +183,7 @@ impl Default for QueueConfig {
176183
job_back_off_duration: Duration::from_secs(300),
177184
cluster_empty_sleep: Duration::from_secs(30),
178185
cluster_reload_interval: Duration::from_secs(120),
186+
cluster_saturated_sleep: Duration::from_secs(5),
179187
stream: StreamConfig::default(),
180188
max_jobs_per_cluster_pass: 20,
181189
manual_tags_chunk_size: 50,

rust/crates/scheduler/src/dao/host_dao.rs

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,11 @@ impl HostDao {
272272

273273
/// Acquires an advisory lock on a host to prevent concurrent dispatch.
274274
///
275-
/// Uses PostgreSQL's advisory lock mechanism to ensure only one dispatcher
276-
/// can modify a host's resources at a time. The lock is based on a hash
277-
/// of the host ID string.
275+
/// Uses PostgreSQL's transaction-scoped advisory lock mechanism to ensure
276+
/// only one dispatcher can modify a host's resources at a time. The lock is
277+
/// based on a hash of the host ID string and is released automatically when
278+
/// the surrounding transaction commits or rolls back — there is no unlock
279+
/// call, so the lock can never leak onto a pooled connection.
278280
///
279281
/// # Arguments
280282
/// * `host_id` - The UUID of the host to lock
@@ -289,40 +291,14 @@ impl HostDao {
289291
host_id: &Uuid,
290292
) -> Result<bool> {
291293
trace!("Locking {}", host_id);
292-
sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock(hashtext($1))")
294+
sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_xact_lock(hashtext($1))")
293295
.bind(host_id.to_string())
294296
.fetch_one(&mut **transaction)
295297
.await
296298
.into_diagnostic()
297299
.wrap_err("Failed to acquire advisory lock")
298300
}
299301

300-
/// Releases an advisory lock on a host after dispatch completion.
301-
///
302-
/// Releases the PostgreSQL advisory lock that was acquired during
303-
/// the dispatch process, allowing other dispatchers to access the host.
304-
///
305-
/// # Arguments
306-
/// * `host_id` - The UUID of the host to unlock
307-
///
308-
/// # Returns
309-
/// * `Ok(true)` - Lock successfully released
310-
/// * `Ok(false)` - Lock was not held by this process
311-
/// * `Err(miette::Error)` - Database operation failed
312-
pub async fn unlock(
313-
&self,
314-
transaction: &mut Transaction<'_, Postgres>,
315-
host_id: &Uuid,
316-
) -> Result<bool> {
317-
trace!("Unlocking {}", host_id);
318-
sqlx::query_scalar::<_, bool>("SELECT pg_advisory_unlock(hashtext($1))")
319-
.bind(host_id.to_string())
320-
.fetch_one(&mut **transaction)
321-
.await
322-
.into_diagnostic()
323-
.wrap_err("Failed to release advisory lock")
324-
}
325-
326302
/// Updates a host's available resource counts after frame dispatch.
327303
///
328304
/// Modifies the host's idle resource counters in the database to reflect
@@ -348,7 +324,8 @@ impl HostDao {
348324
.bind(virtual_proc.cores_reserved.value())
349325
.bind((virtual_proc.memory_reserved.as_u64() / KB) as i64)
350326
.bind(virtual_proc.gpus_reserved as i32)
351-
.bind(virtual_proc.gpu_memory_reserved.as_u64() as i64)
327+
// GPU memory is stored in KB on the database, like main memory
328+
.bind((virtual_proc.gpu_memory_reserved.as_u64() / KB) as i64)
352329
.bind(host_id.to_string())
353330
.fetch_optional(&mut **transaction)
354331
.await
@@ -362,7 +339,7 @@ impl HostDao {
362339
if CONFIG.host_cache.update_stat_on_book {
363340
sqlx::query(UPDATE_HOST_STAT)
364341
.bind((virtual_proc.memory_reserved.as_u64() / KB) as i64)
365-
.bind(virtual_proc.gpu_memory_reserved.as_u64() as i64)
342+
.bind((virtual_proc.gpu_memory_reserved.as_u64() / KB) as i64)
366343
.bind(host_id.to_string())
367344
.execute(&mut **transaction)
368345
.await
@@ -395,7 +372,8 @@ impl HostDao {
395372
.bind(virtual_proc.cores_reserved.value())
396373
.bind((virtual_proc.memory_reserved.as_u64() / KB) as i64)
397374
.bind(virtual_proc.gpus_reserved as i32)
398-
.bind(virtual_proc.gpu_memory_reserved.as_u64() as i64)
375+
// GPU memory is stored in KB on the database, like main memory
376+
.bind((virtual_proc.gpu_memory_reserved.as_u64() / KB) as i64)
399377
.bind(host_id.to_string())
400378
.execute(&mut **transaction)
401379
.await
@@ -404,7 +382,7 @@ impl HostDao {
404382
if CONFIG.host_cache.update_stat_on_book {
405383
sqlx::query(RESTORE_HOST_STAT)
406384
.bind((virtual_proc.memory_reserved.as_u64() / KB) as i64)
407-
.bind(virtual_proc.gpu_memory_reserved.as_u64() as i64)
385+
.bind((virtual_proc.gpu_memory_reserved.as_u64() / KB) as i64)
408386
.bind(host_id.to_string())
409387
.execute(&mut **transaction)
410388
.await

rust/crates/scheduler/src/dao/layer_dao.rs

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ SELECT DISTINCT
243243
l.int_gpus_min,
244244
l.int_gpu_mem_min,
245245
l.str_tags,
246-
jr.int_max_cores AS int_job_max_cores,
246+
jr.int_max_cores::bigint AS int_job_max_cores,
247247
je.job_env,
248248
le.layer_env,
249249
l.int_dispatch_order,
@@ -364,33 +364,45 @@ impl LayerDao {
364364
///
365365
/// * `Vec<DispatchLayer>` - Structured layers with grouped frames
366366
fn group_layers_and_frames(&self, models: Vec<LayerWithFramesModel>) -> Vec<DispatchLayer> {
367-
let mut layers_map: HashMap<String, (DispatchLayerModel, Vec<DispatchFrameModel>)> =
368-
HashMap::new();
367+
// Rows arrive ordered by (layer.int_dispatch_order, frame order) from the SQL
368+
// query. Group by layer while preserving that ordering: a HashMap maps the
369+
// layer key to its slot in the output Vec, so layers keep their dispatch
370+
// priority and frames keep their dispatch order within each layer.
371+
let mut layer_slots: HashMap<String, usize> = HashMap::new();
372+
let mut layers: Vec<(DispatchLayerModel, Vec<DispatchFrameModel>)> = Vec::new();
369373

370374
for model in models {
371-
// Extract layer data
372-
let layer_model = DispatchLayerModel {
373-
pk_layer: model.pk_layer.clone(),
374-
pk_job: model.pk_job.clone(),
375-
pk_facility: model.pk_facility.clone(),
376-
pk_show: model.pk_show.clone(),
377-
pk_folder: model.pk_folder.clone(),
378-
pk_dept: model.pk_dept.clone(),
379-
str_name: model.layer_name.clone(),
380-
str_job_name: model.job_name.clone(),
381-
str_os: model.str_os.clone(),
382-
int_cores_min: model.int_cores_min,
383-
int_mem_min: model.int_mem_min,
384-
b_threadable: model.b_threadable,
385-
int_gpus_min: model.int_gpus_min,
386-
int_gpu_mem_min: model.int_gpu_mem_min,
387-
str_tags: model.str_tags.clone(),
388-
int_job_max_cores: model.int_job_max_cores,
375+
let slot = match layer_slots.get(&model.pk_layer) {
376+
Some(slot) => *slot,
377+
None => {
378+
let layer_model = DispatchLayerModel {
379+
pk_layer: model.pk_layer.clone(),
380+
pk_job: model.pk_job.clone(),
381+
pk_facility: model.pk_facility.clone(),
382+
pk_show: model.pk_show.clone(),
383+
pk_folder: model.pk_folder.clone(),
384+
pk_dept: model.pk_dept.clone(),
385+
str_name: model.layer_name.clone(),
386+
str_job_name: model.job_name.clone(),
387+
str_os: model.str_os.clone(),
388+
int_cores_min: model.int_cores_min,
389+
int_mem_min: model.int_mem_min,
390+
b_threadable: model.b_threadable,
391+
int_gpus_min: model.int_gpus_min,
392+
int_gpu_mem_min: model.int_gpu_mem_min,
393+
str_tags: model.str_tags.clone(),
394+
int_job_max_cores: model.int_job_max_cores,
395+
};
396+
layers.push((layer_model, vec![]));
397+
let slot = layers.len() - 1;
398+
layer_slots.insert(model.pk_layer.clone(), slot);
399+
slot
400+
}
389401
};
390402

391403
// Extract frame data (if present)
392-
let frame_model = if let Some(pk_frame) = model.pk_frame {
393-
Some(DispatchFrameModel {
404+
if let Some(pk_frame) = model.pk_frame {
405+
let frame_model = DispatchFrameModel {
394406
pk_frame,
395407
str_frame_name: model.str_frame_name.unwrap_or_default(),
396408
pk_show: model.pk_show.clone(),
@@ -418,26 +430,16 @@ impl LayerDao {
418430
int_version: model.int_version.unwrap_or(1),
419431
str_loki_url: model.str_loki_url,
420432
ts_updated: model.ts_updated,
421-
job_env: model.job_env.0.clone(),
422-
layer_env: model.layer_env.0.clone(),
423-
})
424-
} else {
425-
None
426-
};
427-
428-
// Group by layer_id
429-
let layer_entry = layers_map
430-
.entry(model.pk_layer.clone())
431-
.or_insert((layer_model, vec![]));
432-
433-
if let Some(frame) = frame_model {
434-
layer_entry.1.push(frame);
433+
job_env: model.job_env.0,
434+
layer_env: model.layer_env.0,
435+
};
436+
layers[slot].1.push(frame_model);
435437
}
436438
}
437439

438440
// Convert to DispatchLayer objects
439-
layers_map
440-
.into_values()
441+
layers
442+
.into_iter()
441443
.map(|(layer_model, frame_models)| DispatchLayer::new(layer_model, frame_models))
442444
.collect()
443445
}

0 commit comments

Comments
 (0)