Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/Queues/DTO/BackoffStrategyDTO.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

namespace Qless\Queues\DTO;

class BackoffStrategyDTO
{
/**
* @var int
*/
private $initialDelay;

/**
* @var int
*/
private $factor;

public function __construct(int $initialDelay, int $factor)
{
$this->initialDelay = $initialDelay;
$this->factor = $factor;
}
Comment thread
YaroslavStryhun marked this conversation as resolved.

/**
* @return int
*/
public function getFactor(): int
{
return $this->factor;
}

/**
* @return int
*/
public function getInitialDelay(): int
{
return $this->initialDelay;
}

public function toArray(): array
{
return [
'factor' => $this->getFactor(),
'initial_delay' => $this->getInitialDelay(),
];
}
}
8 changes: 6 additions & 2 deletions src/Queues/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Qless\Exceptions\UnknownPropertyException;
use Qless\Jobs\BaseJob;
use Qless\Jobs\JobData;
use Qless\Queues\DTO\BackoffStrategyDTO;
use Qless\Support\PropertyAccessor;
use Ramsey\Uuid\Uuid;

Expand Down Expand Up @@ -85,7 +86,8 @@ public function put(
?int $retries = null,
?int $priority = null,
?array $tags = null,
?array $depends = null
?array $depends = null,
?BackoffStrategyDTO $backoffStrategyDTO = null
): string {
try {
$jid = $jid ?: str_replace('-', '', Uuid::uuid4()->toString());
Expand Down Expand Up @@ -120,7 +122,9 @@ public function put(
'retries',
is_null($retries) ? 5 : $retries,
'depends',
json_encode($depends ?: [], JSON_UNESCAPED_SLASHES)
json_encode($depends ?: [], JSON_UNESCAPED_SLASHES),
'backoff',
json_encode($backoffStrategyDTO ? $backoffStrategyDTO->toArray() : [], JSON_UNESCAPED_SLASHES)
);

$this->getEventsManager()->fire(new QueueEvent\AfterEnqueue($this, $jid, $data->toArray(), $className));
Expand Down
66 changes: 51 additions & 15 deletions src/qless-core/qless.lua
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,10 @@ Qless.config.defaults = {
['histogram-history'] = 7,
['jobs-history-count'] = 50000,
['jobs-history'] = 604800,
['jobs-failed-history'] = 604800
['jobs-failed-history'] = 604800,
-- retries logic
['backoff-initial-delay'] = 0, -- Default delay in seconds. 0 means disabled.
['backoff-factor'] = 3 -- Exponential factor.
}

Qless.config.get = function(key, default)
Expand Down Expand Up @@ -1584,19 +1587,25 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...)
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. self.name, 'failed' , -1)
end

redis.call('hmset', QlessJob.ns .. jid,
'jid' , jid,
'klass' , klass,
'data' , raw_data,
'priority' , priority,
'tags' , cjson.encode(tags),
'state' , ((delay > 0) and 'scheduled') or 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining', retries,
'time' , string.format("%.20f", now))
local job_fields = {
'jid' , jid,
'klass' , klass,
'data' , raw_data,
'priority' , priority,
'tags' , cjson.encode(tags),
'state' , ((delay > 0) and 'scheduled') or 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining', retries,
'time' , string.format("%.20f", now)
}
if options['backoff'] then
table.insert(job_fields, 'backoff')
table.insert(job_fields, cjson.encode(options['backoff']))
end
Comment thread
coderabbitai[bot] marked this conversation as resolved.
redis.call('hmset', QlessJob.ns .. jid, job_fields)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

for i, j in ipairs(depends) do
local state = redis.call('hget', QlessJob.ns .. j, 'state')
Expand Down Expand Up @@ -1954,7 +1963,34 @@ function QlessQueue:invalidate_locks(now, count)
redis.call('zadd', 'ql:failed-jobs-list', now, jid)
clearOldFailedJobs(now)
else
table.insert(jids, jid)
local backoff_json = redis.call('hget', Qless.ns .. jid, 'backoff')
local backoff_config = {}
if backoff_json then
backoff_config = cjson.decode(backoff_json)
end

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
local initial_delay = tonumber(backoff_config['initial_delay'])
local backoff_factor = tonumber(backoff_config['factor'])
if initial_delay == nil then
initial_delay = tonumber(Qless.config.get('backoff-initial-delay', 0))
end
if backoff_factor == nil then
backoff_factor = tonumber(Qless.config.get('backoff-factor', 3))
end
if initial_delay == 0 then
table.insert(jids, jid)
else
local job = Qless.job(jid)
local job_history = job:history()
local retry_count = #job_history - 1
if retry_count < 0 then retry_count = 0 end

local delay = initial_delay * (backoff_factor ^ retry_count)

self.locks.remove(jid)
self.scheduled.add(now + delay, jid)
redis.call('hset', QlessJob.ns .. jid, 'state', 'scheduled')
end
end
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
end
end
Expand Down
Loading