Skip to content

Commit 72bee10

Browse files
Merge branch 'master' into cueweb-monitor-hosts-full
2 parents 791ddde + ca5f70f commit 72bee10

26 files changed

Lines changed: 2299 additions & 467 deletions

File tree

cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1243,7 +1243,8 @@ private static final String replaceQueryForFifo(String query) {
12431243

12441244
/**
12451245
* Looks for shows that are under their burst for a particular type of proc. The show has to be
1246-
* at least one whole proc under their burst to be considered for booking.
1246+
* at least one whole proc under their burst to be considered for booking. Scheduler-managed
1247+
* shows are excluded; their dispatch is owned by the standalone Rust scheduler.
12471248
*/
12481249
// spotless:off
12491250
public static final String FIND_SHOWS =

rust/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,13 @@ tracing = "0.1.40"
3535
tracing-appender = "0.2.3"
3636
tracing-rolling-file = "0.1.2"
3737
tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter"] }
38+
# Pin time to 0.3.47: the 0.3.48 release adds an impl that collides with
39+
# sentry-types' blanket `impl<T> From<T> for LogAttribute`, producing an
40+
# E0119 "conflicting implementations" error when building the scheduler
41+
# (which pulls both `sentry` and, via tracing-appender, `time/formatting`).
42+
# Since Cargo.lock is gitignored, CI resolves fresh and would otherwise float
43+
# to the broken 0.3.48. Remove the pin once sentry-types/time resolve this
44+
# upstream. Referenced by the scheduler crate so the constraint takes effect.
45+
time = "=0.3.47"
3846
structopt = "0.3.26"
3947
home = "=0.5.9"

rust/config/scheduler.yaml

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,15 +158,103 @@ queue:
158158
# # Default: 3
159159
# job_buffer_size: 3
160160

161-
# Host booking strategy configuration
161+
# ---------------------------------------------------------------------------
162+
# Host booking strategy. Two variants:
163+
#
164+
# Saturation (default) — legacy first-fit. Saturation flags control B-tree
165+
# iteration direction (saturated-first vs spread-first per dimension).
162166
# host_booking_strategy:
163-
# # Enable core saturation booking
164-
# # Default: true
167+
# type: saturation
165168
# core_saturation: true
166-
#
167-
# # Enable memory saturation booking
168-
# # Default: false
169169
# memory_saturation: false
170+
#
171+
# Epvm — E-PVM stranding score; picks the lowest-scoring host among up to
172+
# `max_candidates` scanned. Iteration is always saturated-first under Epvm,
173+
# independent of the Saturation flags. `weights` express per-dimension
174+
# importance ratios (W3 normalization, see scheduler design doc).
175+
# host_booking_strategy:
176+
# type: epvm
177+
# max_candidates: 500
178+
# weights:
179+
# cores: 1.0
180+
# mem: 1.0
181+
# gpus: 2.0
182+
# gpu_mem: 1.0
183+
# gpu_count_reservation: 2.0
184+
# gpu_mem_reservation: 2.0
185+
#
186+
# --- gpu_*_reservation (soft-reservation penalty) --------------------------
187+
# Applied ONLY when the layer requests no GPUs (gpus_min == 0). Penalizes GPU
188+
# hosts to nudge non-GPU work onto non-GPU hosts. The penalty is split into
189+
# two independent knobs so operators control the count-vs-memory balance:
190+
#
191+
# penalty = gpu_count_reservation * idle_gpus
192+
# + gpu_mem_reservation * idle_gpu_memory_gb
193+
#
194+
# Unlike the stranding weights these scale raw capacity (not a normalized
195+
# stranding term). A typical default is 2.0/2.0 — strong enough to push GPU
196+
# hosts behind non-GPU hosts for non-GPU work, weak enough not to swamp the
197+
# cores/mem stranding signal. On hosts with a lot of GPU memory the mem
198+
# penalty will tend to dominate; raise `gpu_count_reservation` (or lower
199+
# `gpu_mem_reservation`) to rebalance.
200+
#
201+
# --- Example: equal-weight baseline ----------------------------------------
202+
# Cores and memory stranding count equally; GPU dimensions count slightly
203+
# more; non-GPU work mildly prefers non-GPU hosts. Matches the defaults.
204+
# host_booking_strategy:
205+
# type: epvm
206+
# max_candidates: 500
207+
# weights:
208+
# cores: 1.0
209+
# mem: 1.0
210+
# gpus: 2.0
211+
# gpu_mem: 1.0
212+
# gpu_count_reservation: 2.0
213+
# gpu_mem_reservation: 2.0
214+
#
215+
# --- Example: GPU-scarce farm ----------------------------------------------
216+
# Protect GPU hosts aggressively for GPU work and make GPU stranding very
217+
# expensive (don't waste a GPU on a low-GPU layer). Reservation favors count
218+
# heavily since GPUs themselves are the scarce resource.
219+
# host_booking_strategy:
220+
# type: epvm
221+
# max_candidates: 500
222+
# weights:
223+
# cores: 1.0
224+
# mem: 1.0
225+
# gpus: 8.0
226+
# gpu_mem: 2.0
227+
# gpu_count_reservation: 10.0
228+
# gpu_mem_reservation: 1.0
229+
#
230+
# --- Example: memory-tight farm --------------------------------------------
231+
# Penalize memory stranding more than cores so jobs with high mem:core
232+
# ratios go to high-memory hosts first.
233+
# host_booking_strategy:
234+
# type: epvm
235+
# max_candidates: 500
236+
# weights:
237+
# cores: 1.0
238+
# mem: 3.0
239+
# gpus: 2.0
240+
# gpu_mem: 1.0
241+
# gpu_count_reservation: 2.0
242+
# gpu_mem_reservation: 2.0
243+
#
244+
# --- Example: cores-only (debugging sanity check) --------------------------
245+
# Scoring degenerates to "pack cores tightest" — useful for diagnosing
246+
# whether other dimensions are responsible for a placement regression.
247+
# host_booking_strategy:
248+
# type: epvm
249+
# max_candidates: 500
250+
# weights:
251+
# cores: 1.0
252+
# mem: 0.0
253+
# gpus: 0.0
254+
# gpu_mem: 0.0
255+
# gpu_count_reservation: 0.0
256+
# gpu_mem_reservation: 0.0
257+
# ---------------------------------------------------------------------------
170258

171259
# Soft memory limit multiplier for frame memory requirements
172260
# Used as a threshold to determine if a frame can be dispatched based on available memory

rust/crates/scheduler/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ lazy_static = "1.5"
5454
moka = { version = "0.12.10", features = ["future"] }
5555
prometheus = "0.13"
5656
sentry = { version = "0.47", features = ["tracing"] }
57+
# Version constraint only — see the `time` pin note in the workspace Cargo.toml.
58+
# Not used directly; declared so the `=0.3.47` pin applies to resolution.
59+
time = { workspace = true }
5760
axum = "0.7"
5861
tower-http = { version = "0.5", features = ["trace"] }
5962
urlencoding = "2.1"
@@ -69,5 +72,6 @@ tokio-test = "0.4"
6972
tracing-test = "0.2"
7073
serial_test = "3.0"
7174
rand = "0.8"
75+
proptest = "1.5"
7276
testcontainers = "0.23"
7377
testcontainers-modules = { version = "0.11", features = ["redis"] }

rust/crates/scheduler/src/accounting/recompute.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::accounting::redis_client::ReseedOp;
4040
use crate::accounting::AccountingService;
4141
use crate::config::CONFIG;
4242
use crate::dao::ResourceAccountingDao;
43+
use crate::metrics;
4344
use crate::models::CoreSize;
4445

4546
pub fn spawn_loop(service: Arc<AccountingService>) {
@@ -51,11 +52,35 @@ pub fn spawn_loop(service: Arc<AccountingService>) {
5152
return;
5253
}
5354
};
54-
let mut interval = time::interval(CONFIG.accounting.recompute_interval);
55+
let interval_dur = CONFIG.accounting.recompute_interval;
56+
let mut interval = time::interval(interval_dur);
5557
// Skip the immediate first tick - bootstrap reseed already ran at startup.
5658
interval.tick().await;
59+
// Dispatch heartbeat baseline: snapshot the session counters so the first
60+
// logged delta only covers events after this point.
61+
let mut last_dispatched = metrics::frames_dispatched_session();
62+
let mut last_limit_exceeded = metrics::resource_limit_exceeded_session();
5763
loop {
5864
interval.tick().await;
65+
66+
// Dispatch heartbeat: the aggregate INFO that replaces the demoted
67+
// per-frame dispatch logs. Decoupled from the accounting reseed below.
68+
let current_dispatched = metrics::frames_dispatched_session();
69+
let dispatched_delta = current_dispatched.saturating_sub(last_dispatched);
70+
last_dispatched = current_dispatched;
71+
72+
let current_limit_exceeded = metrics::resource_limit_exceeded_session();
73+
let limit_exceeded_delta =
74+
current_limit_exceeded.saturating_sub(last_limit_exceeded);
75+
last_limit_exceeded = current_limit_exceeded;
76+
77+
info!(
78+
"Dispatched {} frames in the last {}ms ({} resource-limit-exceeded)",
79+
dispatched_delta,
80+
interval_dur.as_millis(),
81+
limit_exceeded_delta
82+
);
83+
5984
let result = AssertUnwindSafe(async {
6085
if let Err(err) = run_once(&service, &pg_dao).await {
6186
warn!("Recompute cycle failed: {err}");

rust/crates/scheduler/src/accounting/redis_client.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,14 @@ impl RedisAccounting {
139139
Ok(v.unwrap_or(0))
140140
}
141141

142-
/// Reads the subscription hash's booked cores + burst in one round-trip. Missing
143-
/// keys/fields are treated as `(0, 0)`. Currently used by `AccountingService::
144-
/// subscription_can_book` for diagnostic visibility and reachable future use; the
145-
/// dispatcher hot path relies on the authoritative Lua booking check instead.
146-
#[allow(dead_code)]
142+
/// Reads the subscription hash's booked cores + burst in one round-trip from
143+
/// `acct:sub:{show_id}:{alloc_id}` (fields `int_cores`, `burst`, both in
144+
/// cores — the centicore→core conversion happens once at the reseed write
145+
/// boundary, see the `lua.rs` unit invariant). Missing keys/fields are
146+
/// treated as `(0, 0)`. Non-authoritative:
147+
/// the dispatcher's Lua `BOOK_OR_FORCE` call remains the source of truth for
148+
/// the booking decision; this is a snapshot suitable for optimistic pre-filters
149+
/// and scoring inputs.
147150
pub async fn read_sub_counters(
148151
&self,
149152
show_id: uuid::Uuid,
@@ -156,6 +159,20 @@ impl RedisAccounting {
156159
let burst = values.get(1).copied().flatten().unwrap_or(0);
157160
Ok((booked, burst))
158161
}
162+
163+
/// Reads `acct:job:{job_id}` `int_cores` (live booked cores, in cores — see
164+
/// the `lua.rs` unit invariant; Redis accounting counters are never stored
165+
/// in centicores). Returns 0 when the key/field is missing. Used by the E-PVM placement
166+
/// snapshot in `MatchingService::process_layer` (design Branch 2a).
167+
pub async fn read_job_cores_in_use(
168+
&self,
169+
job_id: uuid::Uuid,
170+
) -> Result<i64, AccountingError> {
171+
let mut conn = self.conn.clone();
172+
let key = format!("acct:job:{}", job_id);
173+
let v: Option<i64> = conn.hget(&key, "int_cores").await?;
174+
Ok(v.unwrap_or(0))
175+
}
159176
}
160177

161178
/// Parses the raw `redis::Value` returned by the `BOOK_OR_FORCE` Lua script (see

rust/crates/scheduler/src/cluster.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,14 +260,19 @@ impl ClusterFeed {
260260
let facility_id = cluster.facility_id;
261261
let show_id = parse_uuid(&cluster.show_id);
262262
match cluster.ttype.as_str() {
263-
// Each alloc tag becomes its own cluster
263+
// Each alloc tag becomes its own cluster. Carry pk_alloc
264+
// through Tag so the matcher can snapshot the
265+
// (show, alloc) subscription burst from Redis before
266+
// host checkout (see `MatchingService::process_layer`).
264267
"ALLOC" => {
268+
let alloc_id = cluster.alloc_id.as_deref().map(parse_uuid);
265269
clusters.push(Cluster::single_tag(
266270
facility_id,
267271
show_id,
268272
Tag {
269273
name: cluster.tag,
270274
ttype: TagType::Alloc,
275+
alloc_id,
271276
},
272277
));
273278
}
@@ -279,6 +284,7 @@ impl ClusterFeed {
279284
.insert(Tag {
280285
name: cluster.tag,
281286
ttype: TagType::Manual,
287+
alloc_id: None,
282288
});
283289
}
284290
"HOSTNAME" => {
@@ -288,6 +294,7 @@ impl ClusterFeed {
288294
.insert(Tag {
289295
name: cluster.tag,
290296
ttype: TagType::HostName,
297+
alloc_id: None,
291298
});
292299
}
293300
"HARDWARE" => {
@@ -297,6 +304,7 @@ impl ClusterFeed {
297304
.insert(Tag {
298305
name: cluster.tag,
299306
ttype: TagType::Hardware,
307+
alloc_id: None,
300308
});
301309
}
302310
_ => (),
@@ -775,6 +783,7 @@ mod tests {
775783
Tag {
776784
name: tag.to_string(),
777785
ttype: TagType::Alloc,
786+
alloc_id: None,
778787
},
779788
)
780789
}

rust/crates/scheduler/src/cluster_key.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,30 @@ pub enum TagType {
2121
Hardware,
2222
}
2323

24+
/// IDENTITY NOTE: the derived `Hash`/`PartialEq`/`Eq`/`Ord` include every field,
25+
/// so `alloc_id` participates in tag identity. A `Tag{name, Alloc, Some(uuid)}`
26+
/// and a `Tag{name, Alloc, None}` with the same `name`/`ttype` are *distinct*
27+
/// keys in `BTreeSet<Tag>` and `HashMap<ClusterKey, _>`. Today the DB-loaded
28+
/// path produces `Some(uuid)` for alloc tags and the CLI override path produces
29+
/// `None`; the two are not mixed within a single set, so this is latent. The
30+
/// CLI override path for alloc tags is being discontinued in the next stage,
31+
/// after which every `TagType::Alloc` tag carries a resolved `alloc_id` and
32+
/// this becomes a non-issue.
2433
#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
2534
pub struct Tag {
2635
pub name: String,
2736
pub ttype: TagType,
37+
/// `pk_alloc` (allocation UUID) when this tag was loaded as a
38+
/// `TagType::Alloc` cluster tag from the database. Populated by
39+
/// `cluster.rs::load_clusters` on the `"ALLOC"` arm and consumed by
40+
/// `MatchingService::process_layer` to read the per-(show, alloc)
41+
/// subscription burst snapshot from Redis before host checkout.
42+
///
43+
/// `None` for non-alloc tags (manual / hostname / hardware) and for
44+
/// CLI-built tags where the str_tag → pk_alloc mapping isn't resolved
45+
/// at startup. Those paths fall back to the burst-unaware behavior.
46+
#[serde(default)]
47+
pub alloc_id: Option<Uuid>,
2848
}
2949

3050
impl std::ops::Deref for Tag {

0 commit comments

Comments
 (0)