Skip to content

Commit 6b43f4d

Browse files
committed
refactor: streamline message handling and abort logic in TopicReader
Signed-off-by: Vladislav Polyakov <[email protected]>
1 parent b57706e commit 6b43f4d

File tree

1 file changed

+4
-12
lines changed

1 file changed

+4
-12
lines changed

packages/topic/src/reader.ts

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -647,17 +647,12 @@ export class TopicReader implements Disposable {
647647
throw new Error('Read aborted', { cause: signal.reason });
648648
}
649649

650-
let ready = false;
651650
let active = true;
652651
let messageHandler = (message: StreamReadMessage_FromServer) => {
653652
if (signal.aborted) {
654653
return;
655654
}
656655

657-
if (message.serverMessage.case === 'initResponse' && message.status === StatusIds_StatusCode.SUCCESS) {
658-
ready = true;
659-
}
660-
661656
if (message.serverMessage.case != 'readResponse') {
662657
return;
663658
}
@@ -734,13 +729,10 @@ export class TopicReader implements Disposable {
734729
this.#fromServerEmitter.once('message', waiter.resolve)
735730

736731
// TODO: process cases then waitMs aborted earlier when read session is ready
737-
await Promise.race([
738-
waiter.promise,
739-
once(signal, 'abort'),
740-
once(AbortSignal.timeout(waitMs), 'abort'),
741-
])
742-
743-
this.#fromServerEmitter.removeListener('message', waiter.resolve)
732+
await abortable(AbortSignal.any([signal, AbortSignal.timeout(waitMs)]), waiter.promise)
733+
.finally(() => {
734+
this.#fromServerEmitter.removeListener('message', waiter.resolve)
735+
})
744736

745737
// If the signal is already aborted, throw an error immediately.
746738
if (signal.aborted) {

0 commit comments

Comments
 (0)