Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion delayed_job.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Gem::Specification.new do |s|
s.name = "delayed_job"
s.version = "1.7.0"
s.version = "3.0.3"
s.date = "2008-11-28"
s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify"
s.email = "tobi@leetsoft.com"
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_70.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
delayed_job (1.7.0)
delayed_job (3.0.3)
activerecord (>= 7, < 8.2)
benchmark
railties
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_71.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
delayed_job (1.7.0)
delayed_job (3.0.3)
activerecord (>= 7, < 8.2)
benchmark
railties
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_72.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
delayed_job (1.7.0)
delayed_job (3.0.3)
activerecord (>= 7, < 8.2)
benchmark
railties
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_80.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
delayed_job (1.7.0)
delayed_job (3.0.3)
activerecord (>= 7, < 8.2)
benchmark
railties
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_81.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
delayed_job (1.7.0)
delayed_job (3.0.3)
activerecord (>= 7, < 8.2)
benchmark
railties
Expand Down
38 changes: 24 additions & 14 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class Job < JobSuperclass
end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!
def self.clear_locks!(_worker_name = nil)
# TODO: унести worker_name в worker и передавать
where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil)
end

Expand Down Expand Up @@ -153,6 +154,7 @@ def run_with_lock(max_run_time, worker_name)
logger.info "* [JOB] #{name} completed after %.4f" % runtime
return true # did work
rescue Exception => e
# TODO: self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) }
begin
# Log before reschedule to report more params
log_exception(e)
Expand All @@ -172,9 +174,14 @@ def self.enqueue(*args, &block)

priority = args.first || 0
run_at = args[1] || db_time_now
job = Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
logger.info "* [JOB] create job_id: #{job.id} class: #{job.handler}"
job

Job.new(:payload_object => object, :priority => priority.to_i, :run_at => run_at).tap do |job|
Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do
job.hook(:enqueue)
job.save!
end
logger.info "* [JOB] create job_id: #{job.id} class: #{job.handler}"
end
end

def self.cached_min_available_id
Expand Down Expand Up @@ -297,19 +304,20 @@ def self.work_off(num = 100)
# Moved into its own method so that new_relic can trace it.
# add hook https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/backend/base.rb#L90
def invoke_job
begin
hook :before
payload_object.perform
hook :success
rescue Exception => e # rubocop:disable RescueException
hook :error, e
raise e
ensure
hook :after
Delayed::Worker.lifecycle.run_callbacks(:invoke_job, self) do
begin
hook :before
payload_object.perform
hook :success
rescue Exception => e # rubocop:disable RescueException
hook :error, e
raise e
ensure
hook :after
end
end
end

private
# add hook https://github.com/collectiveidea/delayed_job/blob/v4.0.6/lib/delayed/backend/base.rb#L111
def hook(name, *args)
if payload_object.respond_to?(name)
Expand All @@ -319,6 +327,8 @@ def hook(name, *args)
rescue DeserializationError # rubocop:disable HandleExceptions
end

private

def deserialize(source)
handler = (YAML.respond_to?(:unsafe_load) ? YAML.unsafe_load(source) : YAML.load(source)) rescue nil

Expand Down
84 changes: 84 additions & 0 deletions lib/delayed/lifecycle.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
module Delayed
class InvalidCallback < Exception; end

class Lifecycle
EVENTS = {
:enqueue => [:job],
:execute => [:worker],
:loop => [:worker],
:perform => [:worker, :job],
:error => [:worker, :job],
:failure => [:worker, :job],
:invoke_job => [:job]
}

def initialize
@callbacks = EVENTS.keys.inject({}) { |hash, e| hash[e] = Callback.new; hash }
end

def before(event, &block)
add(:before, event, &block)
end

def after(event, &block)
add(:after, event, &block)
end

def around(event, &block)
add(:around, event, &block)
end

def run_callbacks(event, *args, &block)
missing_callback(event) unless @callbacks.has_key?(event)

unless EVENTS[event].size == args.size
raise ArgumentError, "Callback #{event} expects #{EVENTS[event].size} parameter(s): #{EVENTS[event].join(', ')}"
end

@callbacks[event].execute(*args, &block)
end

private

def add(type, event, &block)
missing_callback(event) unless @callbacks.has_key?(event)

@callbacks[event].add(type, &block)
end

def missing_callback(event)
raise InvalidCallback, "Unknown callback event: #{event}"
end
end

class Callback
def initialize
@before = []
@after = []

# Identity proc. Avoids special cases when there is no existing around chain.
@around = lambda { |*args, &block| block.call(*args) }
end

def execute(*args, &block)
@before.each { |c| c.call(*args) }
result = @around.call(*args, &block)
@after.each { |c| c.call(*args) }
result
end

def add(type, &callback)
case type
when :before
@before << callback
when :after
@after << callback
when :around
chain = @around # use a local variable so that the current chain is closed over in the following lambda
@around = lambda { |*a, &block| chain.call(*a) { |*b| callback.call(*b, &block) } }
else
raise InvalidCallback, "Invalid callback type: #{type}"
end
end
end
end
15 changes: 15 additions & 0 deletions lib/delayed/plugin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
require 'active_support/core_ext/class/attribute'

module Delayed
class Plugin
class_attribute :callback_block

def self.callbacks(&block)
self.callback_block = block
end

def initialize
self.class.callback_block.call(Delayed::Worker.lifecycle) if self.class.callback_block
end
end
end
15 changes: 15 additions & 0 deletions lib/delayed/plugins/clear_locks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module Delayed
module Plugins
class ClearLocks < Plugin
callbacks do |lifecycle|
lifecycle.around(:execute) do |worker, &block|
begin
block.call(worker)
ensure
Delayed::Job.clear_locks!(worker.name)
end
end
end
end
end
end
73 changes: 55 additions & 18 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,85 @@ module Delayed
class Worker
SLEEP = 5

cattr_accessor :plugins

# Add or remove plugins in this list before the worker is instantiated
self.plugins = [Delayed::Plugins::ClearLocks]

cattr_accessor :logger
self.logger = if defined?(Merb::Logger)
Merb.logger
elsif defined?(RAILS_DEFAULT_LOGGER)
RAILS_DEFAULT_LOGGER
end

def self.lifecycle
@lifecycle ||= Delayed::Lifecycle.new
end

def initialize(options={})
@quiet = options[:quiet]
@quiet = options.has_key?(:quiet) ? options[:quiet] : true
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)

self.plugins.each { |klass| klass.new }
end

# Every worker has a unique name which by default is the pid of the process. There are some
# advantages to overriding this with something which survives worker retarts: Workers can#
# safely resume working on tasks which are locked by themselves. The worker will assume that
# it crashed before.
def name
return @name unless @name.nil?
"#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end

# Sets the name of the worker.
# Setting the name to nil will reset the default worker name
def name=(val)
@name = val
end

def start
say "*** Starting job worker #{Delayed::Job.worker_name}"

$exit = false
trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
@exit = false
trap('TERM') { say 'Exiting...'; stop }
trap('INT') { say 'Exiting...'; stop }

loop do
result = nil
self.class.lifecycle.run_callbacks(:execute, self) do
loop do
self.class.lifecycle.run_callbacks(:loop, self) do
result = nil

realtime = Benchmark.realtime do
result = Delayed::Job.work_off
end
realtime = Benchmark.realtime do
result = Delayed::Job.work_off
end

count = result.sum
count = result.sum

break if $exit
break if stop?

if count.zero?
sleep(SLEEP)
else
say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
end
if count.zero?
sleep(SLEEP)
else
say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
end

break if $exit
break if stop?
end
end
end
end

def stop
$exit = true
@exit = true
end

ensure
Delayed::Job.clear_locks!
def stop?
!!@exit || !!$exit
end

def say(text)
Expand Down
3 changes: 3 additions & 0 deletions lib/delayed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

require 'delayed/message_sending'
require 'delayed/performable_method'
require 'delayed/lifecycle'
require 'delayed/plugin'
require 'delayed/plugins/clear_locks'
require 'delayed/job'
require 'delayed/worker'

Expand Down
Loading