Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
require "../../spec_helper"

describe "Mosquito::ConcurrencyLimitedDequeueAdapter" do
getter(overseer : MockOverseer) { MockOverseer.new }
getter(queue_list : MockQueueList) { overseer.queue_list.as(MockQueueList) }

def register(job_class : Mosquito::Job.class)
Mosquito::Base.register_job_mapping job_class.name.underscore, job_class
queue_list.queues << job_class.queue
end

it "dequeues a job when under the limit" do
clean_slate do
register QueuedTestJob
expected_job_run = QueuedTestJob.new.enqueue

adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
"queued_test_job" => 3,
})

result = adapter.dequeue(queue_list)
refute_nil result
if result
assert_equal expected_job_run, result.job_run
assert_equal QueuedTestJob.queue, result.queue
end
end
end

it "returns nil when no jobs are available" do
clean_slate do
register QueuedTestJob

adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
"queued_test_job" => 3,
})

result = adapter.dequeue(queue_list)
assert_nil result
end
end

it "skips a queue that has reached its concurrency limit" do
clean_slate do
register QueuedTestJob
3.times { QueuedTestJob.new.enqueue }

adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
"queued_test_job" => 2,
})

# Dequeue twice — should succeed and fill the limit.
result1 = adapter.dequeue(queue_list)
refute_nil result1
assert_equal 1, adapter.active_count("queued_test_job")

result2 = adapter.dequeue(queue_list)
refute_nil result2
assert_equal 2, adapter.active_count("queued_test_job")

# Third dequeue should be blocked by the limit.
result3 = adapter.dequeue(queue_list)
assert_nil result3
end
end

it "allows dequeue again after finished_with" do
clean_slate do
register QueuedTestJob
3.times { QueuedTestJob.new.enqueue }

adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
"queued_test_job" => 1,
})

# Fill the single slot.
result1 = adapter.dequeue(queue_list)
refute_nil result1
assert_equal 1, adapter.active_count("queued_test_job")

# Blocked.
result2 = adapter.dequeue(queue_list)
assert_nil result2

# Signal that the job finished.
adapter.finished_with(result1.not_nil!.job_run, result1.not_nil!.queue)
assert_equal 0, adapter.active_count("queued_test_job")

# Now dequeue should work again.
result3 = adapter.dequeue(queue_list)
refute_nil result3
end
end

it "does not limit queues not in the limits table" do
clean_slate do
register QueuedTestJob
5.times { QueuedTestJob.new.enqueue }

# No limit configured for queued_test_job.
adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
"other_queue" => 1,
})

# Should dequeue all 5 without blocking.
5.times do |i|
result = adapter.dequeue(queue_list)
refute_nil result, "Expected dequeue ##{i + 1} to succeed"
end
end
end

it "enforces independent limits across multiple queues" do
clean_slate do
register QueuedTestJob
register EchoJob
3.times { QueuedTestJob.new.enqueue }
3.times { EchoJob.new(text: "hello").enqueue }

adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
"queued_test_job" => 1,
"io_queue" => 2,
})

# Saturate queued_test_job (limit 1).
# Because of shuffle we may get either queue first, so keep
# dequeuing until the counters match the limits.
results = [] of Mosquito::WorkUnit
6.times do
if r = adapter.dequeue(queue_list)
results << r
end
end

assert_equal 1, adapter.active_count("queued_test_job")
assert_equal 2, adapter.active_count("io_queue")
assert_equal 3, results.size
end
end

it "finished_with does not go below zero" do
adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
"queued_test_job" => 3,
})

job_run = Mosquito::JobRun.new("queued_test_job")
queue = Mosquito::Queue.new("queued_test_job")
adapter.finished_with(job_run, queue)
assert_equal 0, adapter.active_count("queued_test_job")
end

it "can be used via the overseer" do
clean_slate do
adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
"queued_test_job" => 5,
})
overseer.dequeue_adapter = adapter

register QueuedTestJob
expected_job_run = QueuedTestJob.new.enqueue

result = overseer.dequeue_job?
refute_nil result
if result
assert_equal expected_job_run, result.job_run
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
require "../dequeue_adapter"

module Mosquito
# A dequeue adapter that enforces per-queue concurrency limits.
#
# Each queue can be assigned a maximum number of jobs that may execute
# concurrently. When a queue has reached its limit, it is skipped during
# dequeue until an in-flight job finishes.
#
# Queues not present in the limits table have no concurrency ceiling and
# are bounded only by the total executor pool size.
#
# Among eligible queues the adapter uses a shuffle to provide rough
# fairness, similar to `ShuffleDequeueAdapter`.
#
# ## Example
#
# ```crystal
# Mosquito.configure do |settings|
# settings.executor_count = 8
#
# settings.dequeue_adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
# "queue_a" => 3,
# "queue_b" => 5,
# })
# end
# ```
#
# In this configuration at most 3 jobs from "queue_a" and 5 from "queue_b"
# will execute at the same time. Other queues are unlimited.
class ConcurrencyLimitedDequeueAdapter < DequeueAdapter
getter limits : Hash(String, Int32)

# Tracks the number of currently in-flight jobs per queue name.
# Access is fiber-safe because Crystal fibers are cooperatively
# scheduled and we never yield between read and write.
@active : Hash(String, Int32)

def initialize(@limits : Hash(String, Int32))
@active = Hash(String, Int32).new(0)
end

def dequeue(queue_list : Runners::QueueList) : WorkUnit?
queue_list.queues.shuffle.each do |q|
if limit = limits[q.name]?
next if @active[q.name] >= limit
end

if job_run = q.dequeue
@active[q.name] = @active[q.name] + 1
return WorkUnit.of(job_run, from: q)
end
end
end

# Called by the Overseer when a job from this queue has finished
# executing. Decrements the in-flight counter so the queue becomes
# eligible for dequeue again.
def finished_with(job_run : JobRun, queue : Queue) : Nil
count = @active[queue.name]
@active[queue.name] = {count - 1, 0}.max
end

# Returns the current number of in-flight jobs for the given queue.
def active_count(queue_name : String) : Int32
@active[queue_name]
end
end
end
1 change: 1 addition & 0 deletions src/mosquito/runners/overseer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ module Mosquito::Runners
return unless work_unit = dead_executor.work_unit?

observer.recovered_job_from_executor work_unit.job_run, dead_executor
dequeue_adapter.finished_with(work_unit.job_run, work_unit.queue)
work_unit.job_run.retry_or_banish work_unit.queue
end

Expand Down
Loading