Skip to content

Commit a640175

Browse files
committed
feat: enhance PQueue to maintain unconsumed messages after consumer restart
Signed-off-by: Vladislav Polyakov <[email protected]>
1 parent a18efc8 commit a640175

File tree

2 files changed

+88
-37
lines changed

2 files changed

+88
-37
lines changed

packages/topic/src/queue.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,20 @@ describe('PQueue', () => {
5252

5353
await expect(queue.shift()).rejects.toThrow('Queue closed');
5454
});
55+
56+
it('should keep unconsumed messages after consumer restart', async () => {
57+
const queue = new PQueue<number>();
58+
59+
const shiftPromise = queue.shift();
60+
queue.restartConsumer();
61+
queue.push(1, 1);
62+
queue.push(2, 2);
63+
64+
await expect(shiftPromise).rejects.toThrow('Consumer restarted');
65+
66+
expect(queue.size).toBe(2);
67+
expect(await queue.shift()).toBe(2);
68+
expect(await queue.shift()).toBe(1);
69+
expect(queue.size).toBe(0);
70+
});
5571
});

packages/topic/src/queue.ts

Lines changed: 72 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,99 @@
1-
export class PQueue<T> implements AsyncIterable<T>, Disposable {
2-
private queue: { value: T; priority: number }[] = [];
1+
export class PQueue<T> {
2+
private heap: { value: T; priority: number }[] = [];
33
private closed = false;
4-
private resolvers: ((value: T) => void)[] = [];
4+
private pendingShifts: ((value: IteratorResult<T>) => void)[] = [];
5+
private pendingRejects: ((reason?: any) => void)[] = [];
56

6-
push(value: T, priority: number = 0) {
7-
if (this.closed) throw new Error("Queue closed");
8-
9-
let item = { value, priority };
10-
let inserted = false;
7+
get size(): number {
8+
return this.heap.length;
9+
}
1110

12-
for (let i = 0; i < this.queue.length; i++) {
13-
if (this.queue[i].priority < priority) {
14-
this.queue.splice(i, 0, item);
15-
inserted = true;
16-
break;
17-
}
11+
push(value: T, priority: number = 0) {
12+
if (this.closed) {
13+
throw new Error('Queue closed');
1814
}
1915

20-
if (!inserted) {
21-
this.queue.push(item);
16+
let left = 0;
17+
let right = this.heap.length;
18+
while (left < right) {
19+
let mid = (left + right) >> 1;
20+
if (this.heap[mid].priority < priority) {
21+
right = mid;
22+
} else {
23+
left = mid + 1;
24+
}
2225
}
26+
this.heap.splice(left, 0, { value, priority });
2327

24-
if (this.resolvers.length) {
25-
const nextItem = this.queue.shift()!;
26-
this.resolvers.shift()?.(nextItem.value);
28+
if (this.pendingShifts.length > 0) {
29+
this.pendingRejects.shift()!;
30+
let next = this.heap.shift()!;
31+
let resolve = this.pendingShifts.shift()!;
32+
resolve({ value: next.value, done: false });
2733
}
2834
}
2935

3036
async shift(): Promise<T> {
31-
if (this.queue.length) {
32-
return this.queue.shift()!.value;
37+
if (this.closed && this.heap.length === 0) {
38+
throw new Error('Queue closed');
39+
}
40+
41+
if (this.heap.length > 0) {
42+
let next = this.heap.shift()!;
43+
return next.value;
3344
}
34-
if (this.closed) throw new Error("Queue closed");
35-
return new Promise<T>((resolve) => {
36-
this.resolvers.push(resolve);
45+
46+
return new Promise<T>((resolve, reject) => {
47+
this.pendingRejects.push(reject);
48+
this.pendingShifts.push(({ value, done }) => {
49+
if (done) {
50+
reject(new Error('Queue closed'));
51+
} else {
52+
resolve(value);
53+
}
54+
});
3755
});
3856
}
3957

4058
close() {
4159
this.closed = true;
42-
while (this.resolvers.length) {
43-
this.resolvers.shift()?.(undefined as any);
60+
while (this.pendingShifts.length > 0) {
61+
this.pendingRejects.shift()!;
62+
let resolve = this.pendingShifts.shift()!;
63+
resolve({ value: undefined as any, done: true });
4464
}
4565
}
4666

47-
async next(): Promise<IteratorResult<T>> {
48-
try {
49-
return { value: await this.shift(), done: false };
50-
} catch {
51-
return { value: undefined, done: true };
67+
restartConsumer() {
68+
while (this.pendingShifts.length > 0) {
69+
this.pendingShifts.shift()!;
70+
let reject = this.pendingRejects.shift()!;
71+
reject(new Error('Consumer restarted'));
5272
}
5373
}
5474

55-
[Symbol.asyncIterator]() {
56-
return this;
75+
async next(): Promise<IteratorResult<T>> {
76+
if (this.closed && this.heap.length === 0) {
77+
return { value: undefined as any, done: true };
78+
}
79+
80+
if (this.heap.length > 0) {
81+
let next = this.heap.shift()!;
82+
return { value: next.value, done: false };
83+
}
84+
85+
return new Promise<IteratorResult<T>>((resolve, reject) => {
86+
this.pendingShifts.push(resolve);
87+
this.pendingRejects.push(reject);
88+
});
5789
}
5890

59-
[Symbol.dispose]() {
60-
this.close();
61-
this.queue.length = 0;
62-
this.resolvers.length = 0;
91+
async *[Symbol.asyncIterator]() {
92+
while (true) {
93+
// eslint-disable-next-line no-await-in-loop
94+
let { value, done } = await this.next();
95+
if (done) break;
96+
yield value;
97+
}
6398
}
6499
}

0 commit comments

Comments
 (0)