Skip to content
Draft

WIP #46

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Iteration
)

define_callbacks :start
define_callbacks :reenqueue
define_callbacks :shutdown
define_callbacks :complete
end
Expand All @@ -32,6 +33,10 @@ def on_shutdown(*filters, &blk)
set_callback(:shutdown, :after, *filters, &blk)
end

def on_reenqueue(*filters, &blk)
set_callback(:reenqueue, :before, *filters, &blk)
end

def on_complete(*filters, &blk)
set_callback(:complete, :after, *filters, &blk)
end
Expand Down Expand Up @@ -74,6 +79,18 @@ def retry_job(*)
@retried = true
end

def reenqueue_iteration_job(options = {})
self.executions -= 1 if executions > 1
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")

adjust_total_time
self.times_interrupted += 1

self.already_in_queue = true if respond_to?(:already_in_queue=)
retry_job(options)
end

private

def enumerator_builder
Expand Down Expand Up @@ -123,8 +140,9 @@ def iterate_with_enumerator(enumerator, arguments)
end

next unless job_should_exit?
self.executions -= 1 if executions > 1
reenqueue_iteration_job
run_callbacks(:reenqueue) do
reenqueue_iteration_job
end
return false
end

Expand All @@ -137,17 +155,6 @@ def record_unit_of_work
end
end

def reenqueue_iteration_job
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")

adjust_total_time
self.times_interrupted += 1

self.already_in_queue = true if respond_to?(:already_in_queue=)
retry_job
end

def adjust_total_time
self.total_time += (Time.now.utc.to_f - start_time.to_f).round(6)
end
Expand Down
4 changes: 3 additions & 1 deletion lib/job-iteration/throttle_enumerator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def to_enum
@enum.each do |*val|
if should_throttle?
ActiveSupport::Notifications.instrument("throttled.iteration", job_class: @job.class.name)
@job.retry_job(wait: @backoff)
@job.run_callbacks(:reenqueue) do
@job.reenqueue_iteration_job(wait: @backoff)
end
throw(:abort, :skip_complete_callbacks)
end

Expand Down
34 changes: 34 additions & 0 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class SimpleIterationJob < ActiveJob::Base
self.on_complete_called = 0
cattr_accessor :on_shutdown_called, instance_accessor: false
self.on_shutdown_called = 0
cattr_accessor :on_reenqueue_called, instance_accessor: false
self.on_reenqueue_called = 0

on_start do
self.class.on_start_called += 1
Expand All @@ -26,6 +28,10 @@ class SimpleIterationJob < ActiveJob::Base
on_shutdown do
self.class.on_shutdown_called += 1
end

on_reenqueue do
self.class.on_reenqueue_called += 1
end
end

class MultiArgumentIterationJob < SimpleIterationJob
Expand Down Expand Up @@ -61,6 +67,12 @@ def each_iteration(record)
end
end

class ActiveRecordIterationJobHaltReenqueue < ActiveRecordIterationJob
on_reenqueue do |job|
throw(:abort) if job.times_interrupted > 0
end
end

class BatchActiveRecordIterationJob < SimpleIterationJob
def build_enumerator(cursor:)
enumerator_builder.active_record_on_batches(
Expand Down Expand Up @@ -297,6 +309,7 @@ def setup
klass.on_start_called = 0
klass.on_complete_called = 0
klass.on_shutdown_called = 0
klass.on_reenqueue_called = 0
end
JobShouldExitJob.records_performed = []
super
Expand Down Expand Up @@ -329,6 +342,7 @@ def test_works_with_private_methods
assert_equal(1, PrivateIterationJob.on_start_called)
assert_equal(1, PrivateIterationJob.on_complete_called)
assert_equal(1, PrivateIterationJob.on_shutdown_called)
assert_equal(0, PrivateIterationJob.on_reenqueue_called)
end

def test_failing_job
Expand Down Expand Up @@ -379,6 +393,7 @@ def test_active_record_job

assert_equal(0, ActiveRecordIterationJob.on_complete_called)
work_one_job
assert_equal(1, ActiveRecordIterationJob.on_reenqueue_called)

assert_equal(2, ActiveRecordIterationJob.records_performed.size)

Expand All @@ -389,6 +404,7 @@ def test_active_record_job

work_one_job
assert_equal(4, ActiveRecordIterationJob.records_performed.size)
assert_equal(2, ActiveRecordIterationJob.on_reenqueue_called)

job = peek_into_queue
assert_equal(2, job.times_interrupted)
Expand All @@ -401,6 +417,24 @@ def test_active_record_job
assert_equal(2, ActiveRecordIterationJob.on_shutdown_called)
end

def test_active_record_job_halt_reenqueue
iterate_exact_times(3.times)

push(ActiveRecordIterationJobHaltReenqueue)
assert_jobs_in_queue(1)

work_one_job
assert_equal(1, ActiveRecordIterationJob.on_reenqueue_called)
assert_equal(3, ActiveRecordIterationJob.records_performed.size)
assert_jobs_in_queue(1)

work_one_job
assert_equal(2, ActiveRecordIterationJob.on_reenqueue_called)
assert_equal(6, ActiveRecordIterationJob.records_performed.size)
# By throwing abort on the reenqueue callback we halt the iteration and no jobs are reenqueue
assert_jobs_in_queue(0)
end

def test_activerecord_batches_complete
push(BatchActiveRecordIterationJob)
processed_records = Product.order(:id).pluck(:id)
Expand Down
32 changes: 27 additions & 5 deletions test/unit/throttle_enumerator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class IterationThrottleJob < ActiveJob::Base

cattr_accessor :on_complete_called, instance_accessor: false
self.on_complete_called = 0
cattr_accessor :on_reenqueue_called, instance_accessor: false
self.on_reenqueue_called = 0

cattr_accessor :should_throttle_sequence, instance_accessor: false
self.should_throttle_sequence = []
Expand All @@ -20,6 +22,10 @@ class IterationThrottleJob < ActiveJob::Base
self.class.on_complete_called += 1
end

on_reenqueue do
self.class.on_reenqueue_called += 1
end

def build_enumerator(_params, cursor:)
enumerator_builder.build_throttle_enumerator(
enumerator_builder.build_array_enumerator(
Expand All @@ -36,13 +42,18 @@ def each_iteration(record, _params)
end
end

setup do
IterationThrottleJob.iterations_performed = []
class IterationThrottleJobHaltReenqueue < IterationThrottleJob
on_reenqueue do |_job|
throw(:abort)
end
end

teardown do
IterationThrottleJob.on_complete_called = 0
IterationThrottleJob.should_throttle_sequence = []
setup do
IterationThrottleJob.descendants.each do |klass|
klass.iterations_performed = []
klass.on_complete_called = 0
klass.on_reenqueue_called = 0
end
end

test "throttle enumerator proxies wrapped enumerator" do
Expand Down Expand Up @@ -92,6 +103,17 @@ def each_iteration(record, _params)
assert_equal [1], IterationThrottleJob.iterations_performed
end

test "do not push back to queue if reenqueue callback abort" do
IterationThrottleJobHaltReenqueue.should_throttle_sequence = [false, true, false]

IterationThrottleJobHaltReenqueue.perform_now({})

enqueued = ActiveJob::Base.queue_adapter.enqueued_jobs
assert_equal 0, enqueued.size

assert_equal [1], IterationThrottleJobHaltReenqueue.iterations_performed
end

test "does not pushed back to queue if not throttle" do
assert_predicate ActiveJob::Base.queue_adapter.enqueued_jobs, :empty?

Expand Down