Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion spec/helpers/bare_base_class.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ module Mosquito
end
end
end

3 changes: 2 additions & 1 deletion spec/helpers/mock_overseer.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class MockOverseer < Mosquito::Runners::Overseer
property queue_list, coordinator, executors, work_handout, finished_notifier, dequeue_adapter
property queue_list, coordinator, perpetual_job_runner, executors, work_handout, finished_notifier, dequeue_adapter

def initialize
@executor_count = Mosquito.configuration.executor_count
Expand All @@ -8,6 +8,7 @@ class MockOverseer < Mosquito::Runners::Overseer

@queue_list = MockQueueList.new
@coordinator = MockCoordinator.new queue_list
@perpetual_job_runner = Mosquito::Runners::PerpetualJobRunner.new @coordinator
@dequeue_adapter = Mosquito.configuration.dequeue_adapter
@executors = [] of Mosquito::Runners::Executor
@work_handout = Channel(Mosquito::WorkUnit).new
Expand Down
26 changes: 26 additions & 0 deletions spec/helpers/mocks.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ class QueuedTestJob < Mosquito::QueuedJob
include PerformanceCounter
end

class PerpetualTestJob < Mosquito::QueuedJob
include PerformanceCounter

param value : String

class_property next_batch_items = [] of Mosquito::Job

def next_batch : Array(Mosquito::Job)
self.class.next_batch_items
end
end

class PerpetualPollTestJob < Mosquito::QueuedJob
include PerformanceCounter

poll_every 10.seconds

param item_id : Int64

class_property next_batch_items = [] of Mosquito::Job

def next_batch : Array(Mosquito::Job)
self.class.next_batch_items
end
end

class QueueHookedTestJob < Mosquito::QueuedJob
include PerformanceCounter

Expand Down
89 changes: 89 additions & 0 deletions spec/mosquito/next_batch_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
require "../spec_helper"

describe "Job#next_batch hook" do
getter(queue_list) { MockQueueList.new }
getter(overseer) { MockOverseer.new }
getter(executor) { MockExecutor.new overseer.as(Mosquito::Runners::Overseer) }

def register(job_class : Mosquito::Job.class)
Mosquito::Base.register_job_mapping job_class.name.underscore, job_class
queue_list.discovered_queues << job_class.queue
end

# Build and execute a job without adding the trigger to the queue,
# so that queue.size afterward reflects only the next_batch items.
def run_perpetual_job(value = "trigger")
register PerpetualTestJob
PerpetualTestJob.reset_performance_counter!
job = PerpetualTestJob.new(value: value)
job_run = job.build_job_run
job_run.store
executor.work_unit = Mosquito::WorkUnit.of(job_run, from: PerpetualTestJob.queue)
executor.execute
end

it "returns an empty array by default" do
job = JobWithPerformanceCounter.new
assert_equal [] of Mosquito::Job, job.next_batch
end

it "enqueues next_batch jobs after a successful run" do
clean_slate do
PerpetualTestJob.next_batch_items = [
PerpetualTestJob.new(value: "followup").as(Mosquito::Job),
]

run_perpetual_job

assert_equal 1, PerpetualTestJob.performances
queue_size = PerpetualTestJob.queue.size(include_dead: false)
assert_equal 1, queue_size
end
ensure
PerpetualTestJob.next_batch_items = [] of Mosquito::Job
end

it "does not enqueue next_batch jobs after a failed run" do
clean_slate do
register FailingJob

job = FailingJob.new
job_run = job.build_job_run
job_run.store
executor.work_unit = Mosquito::WorkUnit.of(job_run, from: FailingJob.queue)
executor.execute

# FailingJob has no next_batch override — nothing extra enqueued
queue_size = PerpetualTestJob.queue.size(include_dead: false)
assert_equal 0, queue_size
end
end

it "enqueues multiple jobs from next_batch" do
clean_slate do
PerpetualTestJob.next_batch_items = [
PerpetualTestJob.new(value: "one").as(Mosquito::Job),
PerpetualTestJob.new(value: "two").as(Mosquito::Job),
PerpetualTestJob.new(value: "three").as(Mosquito::Job),
]

run_perpetual_job

queue_size = PerpetualTestJob.queue.size(include_dead: false)
assert_equal 3, queue_size
end
ensure
PerpetualTestJob.next_batch_items = [] of Mosquito::Job
end

it "does nothing when next_batch returns empty" do
clean_slate do
PerpetualTestJob.next_batch_items = [] of Mosquito::Job

run_perpetual_job

queue_size = PerpetualTestJob.queue.size(include_dead: false)
assert_equal 0, queue_size
end
end
end
94 changes: 94 additions & 0 deletions spec/mosquito/perpetual_job_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
require "../spec_helper"

describe "PerpetualJob polling" do
getter(queue_list) { MockQueueList.new }
getter(coordinator) { MockCoordinator.new(queue_list) }
getter(runner) { Mosquito::Runners::PerpetualJobRunner.new(coordinator) }

def register(job_class : Mosquito::Job.class)
Mosquito::Base.register_job_mapping job_class.name.underscore, job_class
queue_list.discovered_queues << job_class.queue
end

it "auto-registers QueuedJobs that define next_batch and poll_every" do
match = Mosquito::Base.perpetual_job_runs.find { |r| r.class == PerpetualPollTestJob }
refute_nil match, "Expected PerpetualPollTestJob to be auto-registered"
assert_equal 10.seconds, match.not_nil!.interval
end

it "PerpetualJobRun#try_to_poll enqueues next_batch results" do
clean_slate do
register PerpetualPollTestJob
PerpetualPollTestJob.next_batch_items = [
PerpetualPollTestJob.new(item_id: 1_i64).as(Mosquito::Job),
PerpetualPollTestJob.new(item_id: 2_i64).as(Mosquito::Job),
]

run = Mosquito::PerpetualJobRun.new(PerpetualPollTestJob, 0.seconds)
result = run.try_to_poll

assert result, "Expected try_to_poll to return true"
queue_size = PerpetualPollTestJob.queue.size(include_dead: false)
assert_equal 2, queue_size
end
ensure
PerpetualPollTestJob.next_batch_items = [] of Mosquito::Job
end

it "PerpetualJobRun#try_to_poll skips when interval has not elapsed" do
clean_slate do
register PerpetualPollTestJob
PerpetualPollTestJob.next_batch_items = [
PerpetualPollTestJob.new(item_id: 1_i64).as(Mosquito::Job),
]

run = Mosquito::PerpetualJobRun.new(PerpetualPollTestJob, 1.hour)
# First poll succeeds and sets last_polled_at
run.try_to_poll

# Second poll should skip because 1 hour hasn't elapsed
PerpetualPollTestJob.next_batch_items = [
PerpetualPollTestJob.new(item_id: 99_i64).as(Mosquito::Job),
]
result = run.try_to_poll

refute result, "Expected try_to_poll to return false (interval not elapsed)"
# Only the first batch should have been enqueued
queue_size = PerpetualPollTestJob.queue.size(include_dead: false)
assert_equal 1, queue_size
end
ensure
PerpetualPollTestJob.next_batch_items = [] of Mosquito::Job
end

it "PerpetualJobRun#try_to_poll does nothing when next_batch is empty" do
clean_slate do
register PerpetualPollTestJob
PerpetualPollTestJob.next_batch_items = [] of Mosquito::Job

run = Mosquito::PerpetualJobRun.new(PerpetualPollTestJob, 0.seconds)
result = run.try_to_poll

assert result, "Expected try_to_poll to return true (interval elapsed)"
queue_size = PerpetualPollTestJob.queue.size(include_dead: false)
assert_equal 0, queue_size
end
end

it "PerpetualJobRunner#poll calls try_to_poll on auto-registered jobs" do
clean_slate do
register PerpetualPollTestJob
PerpetualPollTestJob.next_batch_items = [
PerpetualPollTestJob.new(item_id: 42_i64).as(Mosquito::Job),
]

coordinator.always_coordinator!
runner.poll

queue_size = PerpetualPollTestJob.queue.size(include_dead: false)
assert_equal 1, queue_size
end
ensure
PerpetualPollTestJob.next_batch_items = [] of Mosquito::Job
end
end
5 changes: 5 additions & 0 deletions src/mosquito/base.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Mosquito
class_getter mapping = {} of String => Mosquito::Job.class
class_getter scheduled_job_runs = [] of PeriodicJobRun
class_getter timetable = [] of PeriodicJobRun
class_getter perpetual_job_runs = [] of PerpetualJobRun

def self.register_job_mapping(string, klass)
@@mapping[string] = klass
Expand Down Expand Up @@ -40,6 +41,10 @@ module Mosquito
@@scheduled_job_runs << PeriodicJobRun.new(klass, interval)
end

def self.register_perpetual_job(klass, interval : Time::Span)
@@perpetual_job_runs << PerpetualJobRun.new(klass, interval)
end

def self.register_job(klass, *, to_run_at scheduled_time : Time)
position = @@timetable.index do
end
Expand Down
24 changes: 24 additions & 0 deletions src/mosquito/job.cr
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,30 @@ module Mosquito
fail
end

# Override to return an Array of QueuedJob instances that should be
# enqueued after this job succeeds. Each returned instance will be
# enqueued via its normal `#enqueue` method.
#
# This hook turns any job into a perpetual job: one that knows how
# to produce follow-up work for itself (or other job types).
#
# ```
# class FanOutJob < Mosquito::QueuedJob
# param batch_id : Int64
#
# def perform
# # process batch_id
# end
#
# def next_batch : Array(Mosquito::Job)
# next_ids.map { |id| FanOutJob.new(batch_id: id).as(Mosquito::Job) }
# end
# end
# ```
def next_batch : Array(Mosquito::Job)
[] of Mosquito::Job
end

# To be called from inside a #perform
# Marks this job as a failure. By default, if the job is a candidate for
# re-scheduling, it will be run again at a later time.
Expand Down
61 changes: 61 additions & 0 deletions src/mosquito/perpetual_job_run.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
module Mosquito
# Wraps a PerpetualJob class with its polling interval and tracks
# when the last poll occurred. Used by the PerpetualJobRunner to
# decide when to call `next_batch` and enqueue the results.
class PerpetualJobRun
Log = ::Log.for self

property class : Mosquito::QueuedJob.class
property interval : Time::Span
getter metadata : Metadata { Metadata.new(Mosquito.backend.build_key("perpetual_jobs", @class.name)) }

def initialize(@class, @interval)
end

# The last time this perpetual job was polled, or epoch-zero if never.
def last_polled_at? : Time?
if timestamp = metadata["last_polled_at"]?
Time.unix(timestamp.to_i)
end
end

def last_polled_at : Time
last_polled_at? || Time.unix(0)
end

def last_polled_at=(time : Time)
metadata["last_polled_at"] = time.to_unix.to_s
metadata.delete(in: interval * 3)
end

# Check whether the polling interval has elapsed and, if so,
# instantiate a blank job and enqueue whatever `next_batch` returns.
def try_to_poll : Bool
now = Time.utc

if last_polled_at + interval <= now
poll
self.last_polled_at = now
true
else
false
end
end

# Create a fresh instance of the job class and call `next_batch`.
# Each returned job is enqueued via its normal `#enqueue` method.
def poll
job = @class.new
batch = job.next_batch
return if batch.empty?

Log.info { "#{@class.name}: next_batch returned #{batch.size} job(s)" }

batch.each do |next_job|
if queued = next_job.as?(QueuedJob)
queued.enqueue
end
end
end
end
end
11 changes: 11 additions & 0 deletions src/mosquito/queued_job.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ module Mosquito

PARAMETERS = [] of Nil

macro poll_every(interval)
PERPETUAL_POLL_INTERVAL = \{{ interval }}
end

macro param(parameter)
{% verbatim do %}
{%
Expand Down Expand Up @@ -114,6 +118,13 @@ module Mosquito
job_run
end
{% end %}

# Auto-register for perpetual polling when the class defines
# next_batch AND specifies a poll_every interval.
{% if @type.methods.map(&.name).includes?(:next_batch.id) &&
@type.has_constant?(:PERPETUAL_POLL_INTERVAL) %}
Mosquito::Base.register_perpetual_job({{ @type.id }}, {{ @type.constant(:PERPETUAL_POLL_INTERVAL) }})
{% end %}
{% end %}
end
end
Expand Down
Loading
Loading