Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ Task.startPolling();
await Task.schedule('sayHello', new Date(Date.now() + 1000));
```

## Polling Options

You can pass `getCurrentTime()` to `startPolling()` to supply a custom clock for polling,
which is useful for demos or tests that advance time automatically.

```javascript
let currentTime = new Date('2023-06-01T00:00:00.000Z');

Task.startPolling({
getCurrentTime: () => new Date(currentTime)
});

currentTime = new Date(currentTime.valueOf() + 60_000);
```

## Params

The 2nd param to `Task.schedule()` is an object that this framework will call the handler function with.
Expand Down Expand Up @@ -57,4 +72,4 @@ await Task.schedule(
{ name: 'Friend' },
5000
);
```
```
18 changes: 13 additions & 5 deletions src/taskSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,20 @@ taskSchema.methods.sideEffect = async function sideEffect(fn, params) {
taskSchema.statics.startPolling = function startPolling(options) {
const interval = options?.interval ?? 1000;
const workerName = options?.workerName;
const pollOptions = workerName ? { workerName } : null;
const getCurrentTime = options?.getCurrentTime;
const pollOptions = {
...(workerName ? { workerName } : {}),
...(getCurrentTime ? { getCurrentTime } : {})
};
let cancelled = false;
let timeout = null;
const Task = this;
if (!this._cancel) {
doPoll.call(this);
this._cancel = () => {
cancelled = true;
clearTimeout(timeout);
Task._cancel = null;
};
}
return this._cancel;
Expand All @@ -136,7 +142,7 @@ taskSchema.statics.startPolling = function startPolling(options) {
const Task = this;

// Expire tasks that have timed out (refactored to separate function)
await Task.expireTimedOutTasks();
await Task.expireTimedOutTasks({ getCurrentTime });

this._currentPoll = this.poll(pollOptions);
await this._currentPoll.then(
Expand All @@ -152,8 +158,9 @@ taskSchema.statics.startPolling = function startPolling(options) {
};

// Refactor logic for expiring timed out tasks here
taskSchema.statics.expireTimedOutTasks = async function expireTimedOutTasks() {
const now = time.now();
taskSchema.statics.expireTimedOutTasks = async function expireTimedOutTasks(options = {}) {
const getCurrentTime = options.getCurrentTime;
const now = typeof getCurrentTime === 'function' ? getCurrentTime() : time.now();
const Task = this;
while (true) {
const task = await Task.findOneAndUpdate(
Expand Down Expand Up @@ -251,13 +258,14 @@ taskSchema.statics.removeAllHandlers = function removeAllHandlers() {
taskSchema.statics.poll = async function poll(opts) {
const parallel = (opts && opts.parallel) || 1;
const workerName = opts?.workerName;
const getCurrentTime = opts?.getCurrentTime;

const additionalParams = workerName ? { workerName } : {};

while (true) {
const tasksInProgress = [];
for (let i = 0; i < parallel; ++i) {
const now = time.now();
const now = typeof getCurrentTime === 'function' ? getCurrentTime() : time.now();
const task = await this.findOneAndUpdate(
Comment on lines 265 to 269
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep custom clock consistent with execute() timing

When startPolling() supplies getCurrentTime, the poller now computes now from that custom clock, but execute() still uses time.now() for startedRunningAt and the schedulingTimeoutAt check. That creates inconsistent timing if callers advance a fake clock (e.g., for tests or demos): tasks that are overdue under the custom clock can still execute because execute() compares schedulingTimeoutAt against real time. This is a behavioral regression compared to the previous global time.now override; if custom time is meant to control polling, consider threading the same clock into execute() (or at least the scheduling timeout check) so timeouts behave predictably under getCurrentTime.

Useful? React with 👍 / 👎.

{ status: 'pending', scheduledAt: { $lte: now } },
{
Expand Down
31 changes: 31 additions & 0 deletions test/task.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,37 @@ describe('Task', function() {
cancel();
});

it('allows startPolling() to use getCurrentTime()', async function() {
let resolve;
const p = new Promise((_resolve) => {
resolve = _resolve;
});
Task.registerHandler('getAnswer', (params, task) => {
resolve({ task, params });
return 42;
});

await Task.schedule('getAnswer', new Date(time.now().valueOf() + 1000), {
question: 'calculating...'
});

const getCurrentTime = sinon.stub().callsFake(() => new Date(now.valueOf() + 2000));
cancel = Task.startPolling({ interval: 100, getCurrentTime });

const res = await p;
assert.ok(getCurrentTime.called);
assert.deepEqual(res.params, { question: 'calculating...' });

await Task._currentPoll;

const task = await Task.findById(res.task._id);
assert.ok(task);
assert.equal(task.status, 'succeeded');
assert.strictEqual(task.result, 42);

cancel();
});

it('catches errors in task', async function() {
let resolve;
let reject;
Expand Down
Loading