Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion spec/helpers/mock_queue_list.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
class MockQueueList < Mosquito::Runners::QueueList
getter queues
setter state

def discovered_queues : Array(Mosquito::Queue)
@discovered_queues
end

def stop(wait_group : WaitGroup = WaitGroup.new(1)) : WaitGroup
self.state = Mosquito::Runnable::State::Stopping
spawn do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ describe "Mosquito::ShuffleDequeueAdapter" do

def register(job_class : Mosquito::Job.class)
Mosquito::Base.register_job_mapping job_class.name.underscore, job_class
queue_list.queues << job_class.queue
queue_list.discovered_queues << job_class.queue
end

it "is the default adapter" do
Expand Down Expand Up @@ -61,7 +61,7 @@ describe "Mosquito::ShuffleDequeueAdapter" do
overseer.dequeue_adapter = spy_adapter

register QueuedTestJob
queue_list.queues << Mosquito::Queue.new("extra_queue")
queue_list.discovered_queues << Mosquito::Queue.new("extra_queue")

overseer.dequeue_job?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ describe "Mosquito::WeightedDequeueAdapter" do

def register(job_class : Mosquito::Job.class)
Mosquito::Base.register_job_mapping job_class.name.underscore, job_class
queue_list.queues << job_class.queue
queue_list.discovered_queues << job_class.queue
end

it "dequeues a job from a weighted queue" do
Expand Down
67 changes: 67 additions & 0 deletions spec/mosquito/resource_gate_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
require "../spec_helper"

describe "Mosquito::OpenGate" do
it "always allows" do
gate = Mosquito::OpenGate.new
assert gate.allow?
end
end

describe "Mosquito::ThresholdGate" do
it "allows when metric is below threshold" do
gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 0.seconds) { 50.0 }
assert gate.allow?
end

it "blocks when metric is at or above threshold" do
gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 0.seconds) { 85.0 }
refute gate.allow?
end

it "blocks when metric equals threshold" do
gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 0.seconds) { 80.0 }
refute gate.allow?
end
end

describe "Mosquito::ResourceGate caching" do
it "caches the check result within TTL" do
call_count = 0
gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 5.seconds) do
call_count += 1
50.0
end

now = Time.utc
Timecop.freeze(now) do
gate.allow?
gate.allow?
gate.allow?
assert_equal 1, call_count
end
end

it "re-checks after TTL expires" do
call_count = 0
gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 5.seconds) do
call_count += 1
50.0
end

now = Time.utc
Timecop.freeze(now) do
gate.allow?
assert_equal 1, call_count
end

Timecop.freeze(now + 3.seconds) do
gate.allow?
assert_equal 1, call_count, "Should still be cached at 3s"
end

Timecop.freeze(now + 6.seconds) do
gate.allow?
assert_equal 2, call_count, "Should re-check after 6s (past 5s TTL)"
end
end
end
2 changes: 1 addition & 1 deletion spec/mosquito/runners/coordinator_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ describe "Mosquito::Runners::Coordinator" do
getter(enqueue_time) { Time.utc }

def enqueue_job_run : JobRun
queue_list.queues << queue
queue_list.discovered_queues << queue

job_run = JobRun.new "blah"

Expand Down
2 changes: 1 addition & 1 deletion spec/mosquito/runners/executor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe "Mosquito::Runners::Executor" do

def register(job_class : Mosquito::Job.class)
Mosquito::Base.register_job_mapping job_class.name.underscore, job_class
queue_list.queues << job_class.queue
queue_list.discovered_queues << job_class.queue
end

def run_job(job_class : Mosquito::Job.class)
Expand Down
2 changes: 1 addition & 1 deletion spec/mosquito/runners/overseer_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ describe "Mosquito::Runners::Overseer" do

def register(job_class : Mosquito::Job.class)
Mosquito::Base.register_job_mapping job_class.name.underscore, job_class
queue_list.queues << job_class.queue
queue_list.discovered_queues << job_class.queue
end

def run_job(job_class : Mosquito::Job.class)
Expand Down
80 changes: 80 additions & 0 deletions spec/mosquito/runners/queue_list_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,84 @@ describe "Mosquito::Runners::QueueList" do
end
end
end

describe "resource gate filtering" do
it "excludes queues whose gate blocks" do
clean_slate do
enqueue_jobs
queue_list.each_run

gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 0.seconds) { 90.0 }
queue_list.resource_gates = {"passing_job" => gate.as(Mosquito::ResourceGate)}

refute_includes queue_list.queues.map(&.name), "passing_job"
assert_includes queue_list.queues.map(&.name), "failing_job"
assert_includes queue_list.queues.map(&.name), "io_queue"
end
end

it "includes queues whose gate allows" do
clean_slate do
enqueue_jobs
queue_list.each_run

gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 0.seconds) { 50.0 }
queue_list.resource_gates = {"passing_job" => gate.as(Mosquito::ResourceGate)}

assert_includes queue_list.queues.map(&.name), "passing_job"
end
end

it "ungated queues are always included" do
clean_slate do
enqueue_jobs
queue_list.each_run

gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 0.seconds) { 90.0 }
queue_list.resource_gates = {"passing_job" => gate.as(Mosquito::ResourceGate)}

assert_equal 2, queue_list.queues.size
end
end

it "multiple queues can share a gate" do
clean_slate do
enqueue_jobs
queue_list.each_run

gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 0.seconds) { 90.0 }
queue_list.resource_gates = {
"passing_job" => gate.as(Mosquito::ResourceGate),
"failing_job" => gate.as(Mosquito::ResourceGate),
}

assert_equal ["io_queue"], queue_list.queues.map(&.name)
end
end

it "gate state is evaluated on each access" do
clean_slate do
enqueue_jobs
queue_list.each_run

value = 90.0
gate = Mosquito::ThresholdGate.new(threshold: 80.0, sample_ttl: 0.seconds) { value }
queue_list.resource_gates = {"passing_job" => gate.as(Mosquito::ResourceGate)}

refute_includes queue_list.queues.map(&.name), "passing_job"

value = 50.0
assert_includes queue_list.queues.map(&.name), "passing_job"
end
end

it "returns all queues when no gates are configured" do
clean_slate do
enqueue_jobs
queue_list.each_run

assert_equal 3, queue_list.queues.size
end
end
end
end
4 changes: 4 additions & 0 deletions src/mosquito/configuration.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ module Mosquito

property dequeue_adapter : Mosquito::DequeueAdapter = Mosquito::ShuffleDequeueAdapter.new

# Maps queue names to resource gates. Queues whose gate returns
# `false` from `#allow?` are excluded from dequeuing.
property resource_gates : Hash(String, Mosquito::ResourceGate) = {} of String => Mosquito::ResourceGate

property publish_metrics : Bool = false

# How often a mosquito runner should emit a heartbeat metric.
Expand Down
15 changes: 15 additions & 0 deletions src/mosquito/gates/open_gate.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
require "../resource_gate"

module Mosquito
# A gate that always allows dequeuing. This is the default when no
# resource constraint is configured.
class OpenGate < ResourceGate
def initialize
super(sample_ttl: 0.seconds)
end

protected def check : Bool
true
end
end
end
29 changes: 29 additions & 0 deletions src/mosquito/gates/threshold_gate.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
require "../resource_gate"

module Mosquito
# A gate that samples a metric via a callback and compares it against
# a threshold.
#
# ## Example
#
# ```crystal
# gate = Mosquito::ThresholdGate.new(
# threshold: 85.0,
# sample_ttl: 2.seconds
# ) { `nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits`.strip.to_f }
# ```
class ThresholdGate < ResourceGate
getter threshold : Float64

@sampler : -> Float64

def initialize(@threshold : Float64, sample_ttl : Time::Span = 2.seconds, &sampler : -> Float64)
super(sample_ttl: sample_ttl)
@sampler = sampler
end

protected def check : Bool
@sampler.call < @threshold
end
end
end
53 changes: 53 additions & 0 deletions src/mosquito/resource_gate.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module Mosquito
# A ResourceGate controls whether work should be dequeued based on
# external resource availability (GPU utilization, CPU load, network
# bandwidth, etc.).
#
# Subclass `ResourceGate` and implement `#check` to test the resource.
# The result is cached for `sample_ttl` so expensive checks (shelling
# out to nvidia-smi, reading /sys, etc.) aren't repeated on every
# dequeue spin.
#
# ## Example
#
# ```crystal
# class GpuUtilizationGate < Mosquito::ResourceGate
# def initialize(@threshold : Float64 = 85.0)
# super(sample_ttl: 2.seconds)
# end
#
# protected def check : Bool
# current_gpu_utilization < @threshold
# end
# end
# ```
abstract class ResourceGate
getter sample_ttl : Time::Span

@last_result : Bool = true
@last_check_at : Time = Time::UNIX_EPOCH

def initialize(@sample_ttl : Time::Span = 2.seconds)
end

# Returns the cached result of `#check`, re-evaluating only after
# `sample_ttl` has elapsed since the last check.
def allow? : Bool
now = Time.utc
if now - @last_check_at >= @sample_ttl
@last_result = check
@last_check_at = now
end
@last_result
end

# Subclasses implement the actual resource check. Called at most
# once per `sample_ttl` interval.
protected abstract def check : Bool

# Called after a job finishes, in case the gate needs to update
# internal bookkeeping (e.g. decrement an in-flight counter).
def released(job_run : JobRun, queue : Queue) : Nil
end
end
end
2 changes: 2 additions & 0 deletions src/mosquito/runners/overseer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ module Mosquito::Runners
@finished_notifier = Channel(WorkUnit?).new

@queue_list = QueueList.new
@queue_list.resource_gates = Mosquito.configuration.resource_gates
@coordinator = Coordinator.new queue_list
@dequeue_adapter = Mosquito.configuration.dequeue_adapter
@executors = [] of Executor
Expand Down Expand Up @@ -145,6 +146,7 @@ module Mosquito::Runners
all_executors_busy = false
if finished_job
dequeue_adapter.finished_with(finished_job.job_run, finished_job.queue)
queue_list.notify_released(finished_job.job_run, finished_job.queue)
end
when timeout(idle_wait)
log.trace { "Idled for #{idle_wait.total_seconds}s" }
Expand Down
Loading
Loading