Skip to content

Commit 98f35bc

Browse files
authored
perf(scheduler): save offset value when every is provided (#3142)
1 parent c302854 commit 98f35bc

File tree

8 files changed

+96
-60
lines changed

8 files changed

+96
-60
lines changed

src/classes/job-scheduler.ts

Lines changed: 11 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,17 @@ export class JobScheduler extends QueueBase {
9393
}
9494

9595
let nextMillis: number;
96-
let newOffset = offset || 0;
97-
96+
let newOffset: number | null = null;
9897
if (every) {
9998
const prevSlot = Math.floor(startMillis / every) * every;
99+
100+
newOffset = typeof offset === 'number' ? offset : startMillis - prevSlot;
101+
100102
const nextSlot = prevSlot + every;
101103
if (prevMillis || offset) {
102104
nextMillis = nextSlot;
103105
} else {
104106
nextMillis = prevSlot;
105-
newOffset = startMillis - prevSlot;
106-
107-
// newOffset should always be positive, but we do an extra safety check
108-
newOffset = newOffset < 0 ? 0 : newOffset;
109107
}
110108
} else if (pattern) {
111109
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
@@ -162,6 +160,7 @@ export class JobScheduler extends QueueBase {
162160
pattern,
163161
every,
164162
limit,
163+
offset: newOffset,
165164
},
166165
Job.optsAsJSON(mergedOpts),
167166
producerId,
@@ -245,8 +244,8 @@ export class JobScheduler extends QueueBase {
245244

246245
mergedOpts.repeat = {
247246
...opts.repeat,
248-
count: currentCount,
249247
offset,
248+
count: currentCount,
250249
endDate: opts.repeat?.endDate
251250
? new Date(opts.repeat.endDate).getTime()
252251
: undefined,
@@ -255,51 +254,6 @@ export class JobScheduler extends QueueBase {
255254
return mergedOpts;
256255
}
257256

258-
private createNextJob<T = any, R = any, N extends string = string>(
259-
client: RedisClient,
260-
name: N,
261-
nextMillis: number,
262-
offset: number,
263-
jobSchedulerId: string,
264-
opts: JobsOptions,
265-
data: T,
266-
currentCount: number,
267-
// The job id of the job that produced this next iteration
268-
producerId?: string,
269-
) {
270-
//
271-
// Generate unique job id for this iteration.
272-
//
273-
const jobId = this.getSchedulerNextJobId({
274-
jobSchedulerId,
275-
nextMillis,
276-
});
277-
278-
const now = Date.now();
279-
const delay = nextMillis + offset - now;
280-
281-
const mergedOpts = {
282-
...opts,
283-
jobId,
284-
delay: delay < 0 ? 0 : delay,
285-
timestamp: now,
286-
prevMillis: nextMillis,
287-
repeatJobKey: jobSchedulerId,
288-
};
289-
290-
mergedOpts.repeat = { ...opts.repeat, count: currentCount };
291-
292-
const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
293-
job.addJob(client);
294-
295-
if (producerId) {
296-
const producerJobKey = this.toKey(producerId);
297-
client.hset(producerJobKey, 'nrjid', job.id);
298-
}
299-
300-
return job;
301-
}
302-
303257
async removeJobScheduler(jobSchedulerId: string): Promise<number> {
304258
return this.scripts.removeJobScheduler(jobSchedulerId);
305259
}
@@ -347,7 +301,11 @@ export class JobScheduler extends QueueBase {
347301
}
348302

349303
if (jobData.every) {
350-
jobSchedulerData.every = jobData.every;
304+
jobSchedulerData.every = parseInt(jobData.every);
305+
}
306+
307+
if (jobData.offset) {
308+
jobSchedulerData.offset = parseInt(jobData.offset);
351309
}
352310

353311
if (jobData.data || jobData.opts) {

src/classes/scripts.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ export class Scripts {
419419
nextMillis: number,
420420
templateData: string,
421421
delayedJobOpts: JobsOptions,
422-
// The job id of the job that produced this next iteration
422+
// The job id of the job that produced this next iteration - TODO: remove in next breaking change
423423
producerId?: string,
424424
): Promise<string | null> {
425425
const client = await this.queue.client;

src/commands/includes/storeJobScheduler.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ local function storeJobScheduler(schedulerId, schedulerKey, repeatKey, nextMilli
3131
table.insert(optionalValues, opts['every'])
3232
end
3333

34+
if opts['offset'] then
35+
table.insert(optionalValues, "offset")
36+
table.insert(optionalValues, opts['offset'])
37+
end
38+
3439
local jsonTemplateOpts = cjson.encode(templateOpts)
3540
if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then
3641
table.insert(optionalValues, "opts")

src/interfaces/job-scheduler-json.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ export interface JobSchedulerJson<D = any> {
1414
endDate?: number;
1515
tz?: string;
1616
pattern?: string;
17-
every?: string;
17+
every?: number;
1818
next?: number;
19+
offset?: number;
1920
template?: JobSchedulerTemplateJson<D>;
2021
}

src/interfaces/repeat-options.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ export interface RepeatOptions extends Omit<ParserOptions, 'iterator'> {
4343

4444
/**
4545
* Offset in milliseconds to affect the next iteration time
46-
*
4746
* */
4847
offset?: number;
4948

src/interfaces/repeatable-options.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ export type RepeatableOptions = {
55
limit?: number;
66
pattern?: string;
77
every?: number;
8+
offset?: number;
89
};

tests/test_job_scheduler.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ describe('Job Scheduler', function () {
288288
name: 'test',
289289
next: 1486439520000,
290290
iterationCount: 1,
291-
every: '240000',
291+
every: 240000,
292+
offset: 120003,
292293
});
293294

294295
await this.clock.tickAsync(ONE_MINUTE);
@@ -334,7 +335,8 @@ describe('Job Scheduler', function () {
334335
name: 'test',
335336
next: 1486439700000,
336337
iterationCount: 2,
337-
every: '60000',
338+
every: 60000,
339+
offset: 0,
338340
});
339341

340342
const count = await queue.getJobCountByTypes('delayed');
@@ -1103,9 +1105,16 @@ describe('Job Scheduler', function () {
11031105
this.clock.setSystemTime(date);
11041106
const nextTick = 2 * ONE_SECOND;
11051107

1108+
let iterationCount = 0;
11061109
const worker = new Worker(
11071110
queueName,
1108-
async () => {
1111+
async job => {
1112+
if (iterationCount === 0) {
1113+
expect(job.opts.delay).to.be.eq(0);
1114+
} else {
1115+
expect(job.opts.delay).to.be.eq(2000);
1116+
}
1117+
iterationCount++;
11091118
this.clock.tick(nextTick);
11101119
},
11111120
{ autorun: false, connection, prefix },

tests/test_job_scheduler_stress.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { default as IORedis } from 'ioredis';
44
import { beforeEach, describe, it, before, after as afterAll } from 'mocha';
55

66
import { v4 } from 'uuid';
7-
import { Queue, QueueEvents, Repeat, Worker } from '../src/classes';
7+
import { Job, Queue, QueueEvents, Repeat, Worker } from '../src/classes';
88
import { delay, removeAllQueueData } from '../src/utils';
99

1010
const ONE_SECOND = 1000;
@@ -215,4 +215,67 @@ describe('Job Scheduler Stress', function () {
215215
expect(completedJobs).to.be.eql(1);
216216
await queue.close();
217217
});
218+
219+
describe("when using 'every' option and jobs are moved to active some time after delay", function () {
220+
it('should repeat every 2 seconds and start immediately', async function () {
221+
let iterationCount = 0;
222+
const worker = new Worker(
223+
queueName,
224+
async job => {
225+
if (iterationCount === 0) {
226+
expect(job.opts.delay).to.be.eq(0);
227+
} else {
228+
expect(job.opts.delay).to.be.gte(1850);
229+
}
230+
iterationCount++;
231+
},
232+
{ autorun: false, connection, prefix },
233+
);
234+
235+
let prev: Job;
236+
let counter = 0;
237+
238+
const completing = new Promise<void>((resolve, reject) => {
239+
worker.on('completed', async job => {
240+
try {
241+
if (prev) {
242+
expect(prev.timestamp).to.be.lte(job.timestamp);
243+
expect(job.processedOn! - prev.processedOn!).to.be.gte(1900);
244+
}
245+
prev = job;
246+
counter++;
247+
if (counter === 5) {
248+
resolve();
249+
}
250+
} catch (err) {
251+
console.log(err);
252+
reject(err);
253+
}
254+
});
255+
});
256+
257+
await queue.upsertJobScheduler(
258+
'repeat',
259+
{
260+
every: 2000,
261+
},
262+
{ data: { foo: 'bar' } },
263+
);
264+
265+
const waitingCountBefore = await queue.getWaitingCount();
266+
expect(waitingCountBefore).to.be.eq(1);
267+
268+
worker.run();
269+
270+
await completing;
271+
272+
const waitingCount = await queue.getWaitingCount();
273+
expect(waitingCount).to.be.eq(0);
274+
275+
const delayedCountAfter = await queue.getDelayedCount();
276+
expect(delayedCountAfter).to.be.eq(1);
277+
278+
await worker.close();
279+
});
280+
});
218281
});

0 commit comments

Comments
 (0)