Skip to content

Commit 9a6f62f

Browse files
committed
feat: add integration tests for TopicReader to validate message reading functionality
Signed-off-by: Vladislav Polyakov <[email protected]>
1 parent f576a20 commit 9a6f62f

File tree

2 files changed

+97
-33
lines changed

2 files changed

+97
-33
lines changed

packages/topic/src/reader.ts

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import { nextTick } from "node:process";
44

55
import { create, protoInt64, toJson } from "@bufbuild/protobuf";
66
import { type Duration, DurationSchema, type Timestamp, timestampFromDate, timestampMs } from "@bufbuild/protobuf/wkt";
7+
import { abortable } from "@ydbjs/abortable";
78
import { StatusIds_StatusCode } from "@ydbjs/api/operation";
89
import { Codec, type OffsetsRange, OffsetsRangeSchema, type StreamReadMessage_CommitOffsetRequest_PartitionCommitOffset, StreamReadMessage_CommitOffsetRequest_PartitionCommitOffsetSchema, type StreamReadMessage_FromClient, StreamReadMessage_FromClientSchema, type StreamReadMessage_FromServer, StreamReadMessage_FromServerSchema, type StreamReadMessage_InitRequest_TopicReadSettings, StreamReadMessage_InitRequest_TopicReadSettingsSchema, type StreamReadMessage_ReadResponse, TopicServiceDefinition, TransactionIdentitySchema, UpdateOffsetsInTransactionRequestSchema } from "@ydbjs/api/topic";
910
import type { Driver } from "@ydbjs/core";
11+
import { loggers } from "@ydbjs/debug";
1012
import { YDBError } from "@ydbjs/error";
1113
import { type RetryConfig, retry } from "@ydbjs/retry";
1214
import { backoff, combine, jitter } from "@ydbjs/retry/strategy";
13-
import debug from "debug";
1415
import type { StringValue } from "ms";
1516
import ms from "ms";
1617

@@ -20,7 +21,7 @@ import { TopicMessage } from "./message.js";
2021
import { TopicPartitionSession } from "./partition-session.js";
2122
import type { TX } from "./tx.js";
2223

23-
const dbg = debug('ydbjs').extend('topic').extend('reader')
24+
let dbg = loggers.topic.extend('reader')
2425

2526
type FromClientEmitterMap = {
2627
"message": [StreamReadMessage_FromClient]
@@ -197,23 +198,23 @@ export class TopicReader implements Disposable {
197198

198199
let dbgrpc = dbg.extend('grpc')
199200
this.#fromClientEmitter.on('message', (msg) => {
200-
dbgrpc('%s %o', msg.$typeName, toJson(StreamReadMessage_FromClientSchema, msg))
201+
dbgrpc.log('%s %o', msg.$typeName, toJson(StreamReadMessage_FromClientSchema, msg))
201202
})
202203

203204
// Log all messages from server.
204205
this.#fromServerEmitter.on('message', (msg) => {
205-
dbgrpc('%s %o', msg.$typeName, toJson(StreamReadMessage_FromServerSchema, msg))
206+
dbgrpc.log('%s %o', msg.$typeName, toJson(StreamReadMessage_FromServerSchema, msg))
206207
})
207208

208209
// Handle messages from server.
209210
this.#fromServerEmitter.on('message', async (message) => {
210211
if (this.#disposed) {
211-
dbg('error: receive "%s" after dispose', message.serverMessage.value?.$typeName)
212+
dbg.log('error: receive "%s" after dispose', message.serverMessage.value?.$typeName)
212213
return
213214
}
214215

215216
if (message.serverMessage.case === 'initResponse') {
216-
dbg(`read session identifier: %s`, message.serverMessage.value.sessionId);
217+
dbg.log('read session identifier: %s', message.serverMessage.value.sessionId)
217218

218219
this.#readMore(this.#freeBufferSize)
219220
}
@@ -222,7 +223,7 @@ export class TopicReader implements Disposable {
222223
assert.ok(message.serverMessage.value.partitionSession, 'startPartitionSessionRequest must have partitionSession');
223224
assert.ok(message.serverMessage.value.partitionOffsets, 'startPartitionSessionRequest must have partitionOffsets');
224225

225-
dbg('receive partition with id %s', message.serverMessage.value.partitionSession.partitionId);
226+
dbg.log('receive partition with id %s', message.serverMessage.value.partitionSession.partitionId);
226227

227228
// Create a new partition session.
228229
let partitionSession: TopicPartitionSession = new TopicPartitionSession(
@@ -244,7 +245,7 @@ export class TopicReader implements Disposable {
244245
let partitionOffsets = message.serverMessage.value.partitionOffsets;
245246

246247
let response = await this.#options.onPartitionSessionStart(partitionSession, committedOffset, partitionOffsets).catch((error) => {
247-
dbg('error: onPartitionSessionStart error: %O', error);
248+
dbg.log('error: onPartitionSessionStart error: %O', error);
248249
this.#fromClientEmitter.emit('error', error);
249250

250251
return undefined;
@@ -273,22 +274,22 @@ export class TopicReader implements Disposable {
273274

274275
let partitionSession = this.#partitionSessions.get(message.serverMessage.value.partitionSessionId);
275276
if (!partitionSession) {
276-
dbg('error: stopPartitionSessionRequest for unknown partitionSessionId=%s', message.serverMessage.value.partitionSessionId);
277+
dbg.log('error: stopPartitionSessionRequest for unknown partitionSessionId=%s', message.serverMessage.value.partitionSessionId);
277278
return;
278279
}
279280

280281
if (this.#options.onPartitionSessionStop) {
281282
let committedOffset = message.serverMessage.value.committedOffset || 0n;
282283

283284
await this.#options.onPartitionSessionStop(partitionSession, committedOffset).catch((err) => {
284-
dbg('error: onPartitionSessionStop error: %O', err);
285+
dbg.log('error: onPartitionSessionStop error: %O', err);
285286
this.#fromClientEmitter.emit('error', err);
286287
});
287288
}
288289

289290
// If graceful stop is not requested, we can stop the partition session immediately.
290291
if (!message.serverMessage.value.graceful) {
291-
dbg('stop partition session %s without graceful stop', partitionSession.partitionSessionId);
292+
dbg.log('stop partition session %s without graceful stop', partitionSession.partitionSessionId);
292293
partitionSession.stop();
293294

294295
// Remove all messages from the buffer that belong to this partition session.
@@ -354,7 +355,7 @@ export class TopicReader implements Disposable {
354355

355356
let partitionSession = this.#partitionSessions.get(message.serverMessage.value.partitionSessionId);
356357
if (!partitionSession) {
357-
dbg('error: endPartitionSession for unknown partitionSessionId=%s', message.serverMessage.value.partitionSessionId);
358+
dbg.log('error: endPartitionSession for unknown partitionSessionId=%s', message.serverMessage.value.partitionSessionId);
358359
return;
359360
}
360361

@@ -368,7 +369,7 @@ export class TopicReader implements Disposable {
368369
for (let part of message.serverMessage.value.partitionsCommittedOffsets) {
369370
let partitionSession = this.#partitionSessions.get(part.partitionSessionId);
370371
if (!partitionSession) {
371-
dbg('error: commitOffsetResponse for unknown partitionSessionId=%s', part.partitionSessionId);
372+
dbg.log('error: commitOffsetResponse for unknown partitionSessionId=%s', part.partitionSessionId);
372373
continue;
373374
}
374375

@@ -427,32 +428,32 @@ export class TopicReader implements Disposable {
427428
budget: Infinity,
428429
strategy: combine(jitter(50), backoff(50, 5000)),
429430
retry(error) {
430-
dbg('retrying stream read due to %O', error);
431+
dbg.log('retrying stream read due to %O', error);
431432
return true;
432433
},
433434
}
434435

435436
try {
436437
// TODO: handle user errors (for example tx errors). Ex: use abort signal
437-
await retry(retryConfig, async () => {
438+
await retry(retryConfig, async (signal) => {
438439
using outgoing = new AsyncEventEmitter<StreamReadMessage_FromClient>(this.#fromClientEmitter, 'message')
439440

440-
dbg('connecting to the stream with consumer %s', this.#options.consumer);
441+
dbg.log('connecting to the stream with consumer %s', this.#options.consumer);
441442

442443
let stream = this.#driver
443444
.createClient(TopicServiceDefinition)
444445
.streamRead(outgoing, { signal });
445446

446447
// If we have buffered messages, we need to clear them before connecting to the stream.
447448
if (this.#buffer.length) {
448-
dbg('has %d messages in the buffer before connecting to the stream, clearing it', this.#buffer.length);
449+
dbg.log('has %d messages in the buffer before connecting to the stream, clearing it', this.#buffer.length);
449450
this.#buffer.length = 0; // Clear the buffer before connecting to the stream
450451
this.#freeBufferSize = this.#maxBufferSize; // Reset free buffer size
451452
}
452453

453454
// Stop all partition sessions before connecting to the stream
454455
if (this.#partitionSessions.size) {
455-
dbg('has %d partition sessions before connecting to the stream, stopping them', this.#partitionSessions.size);
456+
dbg.log('has %d partition sessions before connecting to the stream, stopping them', this.#partitionSessions.size);
456457

457458
for (let partitionSession of this.#partitionSessions.values()) {
458459
partitionSession.stop();
@@ -463,7 +464,7 @@ export class TopicReader implements Disposable {
463464

464465
// If we have pending commits, we need to reject and drop them before connecting to the stream.
465466
if (this.#pendingCommits.size) {
466-
dbg('has pending commits, before connecting to the stream, rejecting them');
467+
dbg.log('has pending commits, before connecting to the stream, rejecting them');
467468

468469
for (let [partitionSessionId, pendingCommits] of this.#pendingCommits) {
469470
for (let commit of pendingCommits) {
@@ -492,16 +493,19 @@ export class TopicReader implements Disposable {
492493

493494
if (event.status !== StatusIds_StatusCode.SUCCESS) {
494495
let error = new YDBError(event.status, event.issues)
495-
dbg('received error from server: %s', error.message);
496+
dbg.log('received error from server: %s', error.message);
496497
throw error;
497498
}
498499

499500
this.#fromServerEmitter.emit('message', event);
500501
}
501502
});
502503
} catch (error) {
503-
dbg('error: %O', error);
504+
if (error instanceof Error && error.name === 'AbortError') {
505+
return
506+
}
504507

508+
dbg.log('error: %O', error);
505509
this.#fromServerEmitter.emit('error', error);
506510
} finally {
507511
this.#fromServerEmitter.emit('end');
@@ -583,11 +587,11 @@ export class TopicReader implements Disposable {
583587
*/
584588
#readMore(bytes: bigint): void {
585589
if (this.#disposed) {
586-
dbg('error: readMore called after dispose');
590+
dbg.log('error: readMore called after dispose');
587591
return;
588592
}
589593

590-
dbg('request read next %d bytes', bytes);
594+
dbg.log('request read next %d bytes', bytes);
591595
this.#fromClientEmitter.emit("message", create(StreamReadMessage_FromClientSchema, {
592596
clientMessage: {
593597
case: 'readRequest',
@@ -643,17 +647,22 @@ export class TopicReader implements Disposable {
643647
throw new Error('Read aborted', { cause: signal.reason });
644648
}
645649

650+
let ready = false;
646651
let active = true;
647652
let messageHandler = (message: StreamReadMessage_FromServer) => {
648653
if (signal.aborted) {
649654
return;
650655
}
651656

657+
if (message.serverMessage.case === 'initResponse' && message.status === StatusIds_StatusCode.SUCCESS) {
658+
ready = true;
659+
}
660+
652661
if (message.serverMessage.case != 'readResponse') {
653662
return;
654663
}
655664

656-
dbg('reader received %d bytes', message.serverMessage.value.bytesSize);
665+
dbg.log('reader received %d bytes', message.serverMessage.value.bytesSize);
657666

658667
this.#buffer.push(message.serverMessage.value);
659668
this.#freeBufferSize -= message.serverMessage.value.bytesSize;
@@ -724,6 +733,7 @@ export class TopicReader implements Disposable {
724733
let waiter = Promise.withResolvers()
725734
this.#fromServerEmitter.once('message', waiter.resolve)
726735

736+
// TODO: process cases then waitMs aborted earlier when read session is ready
727737
await Promise.race([
728738
waiter.promise,
729739
once(signal, 'abort'),
@@ -787,12 +797,12 @@ export class TopicReader implements Disposable {
787797

788798
let partitionSession = this.#partitionSessions.get(pd.partitionSessionId);
789799
if (!partitionSession) {
790-
dbg('error: readResponse for unknown partitionSessionId=%s', pd.partitionSessionId);
800+
dbg.log('error: readResponse for unknown partitionSessionId=%s', pd.partitionSessionId);
791801
continue;
792802
}
793803

794804
if (partitionSession.isStopped) {
795-
dbg('error: readResponse for stopped partitionSessionId=%s', pd.partitionSessionId);
805+
dbg.log('error: readResponse for stopped partitionSessionId=%s', pd.partitionSessionId);
796806
continue;
797807
}
798808

@@ -809,16 +819,16 @@ export class TopicReader implements Disposable {
809819
let payload = msg.data;
810820
if (batch.codec !== Codec.UNSPECIFIED) {
811821
if (!this.#codecs.has(batch.codec)) {
812-
dbg('error: codec %s is not supported', batch.codec);
822+
dbg.log('error: codec %s is not supported', batch.codec);
813823
throw new Error(`Codec ${batch.codec} is not supported`);
814824
}
815825

816826
// Decompress the message data using the provided decompress function
817827
try {
818828
// eslint-disable-next-line no-await-in-loop
819-
payload = await this.#codecs.get(batch.codec)!.decompress(msg.data);
829+
payload = this.#codecs.get(batch.codec)!.decompress(msg.data);
820830
} catch (error) {
821-
dbg('error: decompression failed for message with codec %s: %O', batch.codec, error);
831+
dbg.log('error: decompression failed for message with codec %s: %O', batch.codec, error);
822832

823833
throw new Error(`Decompression failed for message with codec ${batch.codec}`, { cause: error });
824834
}
@@ -864,10 +874,10 @@ export class TopicReader implements Disposable {
864874
}
865875
}
866876

867-
dbg('return %d messages, buffer size is %d bytes, free buffer size is %d bytes', messages.length, this.#maxBufferSize - this.#freeBufferSize, this.#freeBufferSize);
877+
dbg.log('return %d messages, buffer size is %d bytes, free buffer size is %d bytes', messages.length, this.#maxBufferSize - this.#freeBufferSize, this.#freeBufferSize);
868878

869879
if (releasableBufferBytes > 0n) {
870-
dbg('releasing %d bytes from buffer', releasableBufferBytes);
880+
dbg.log('releasing %d bytes from buffer', releasableBufferBytes);
871881
this.#freeBufferSize += releasableBufferBytes;
872882
this.#readMore(releasableBufferBytes);
873883
}
@@ -1119,7 +1129,7 @@ export class TopicReader implements Disposable {
11191129

11201130
// Convert our optimized Map structure into the API's expected format
11211131
for (let [partitionSessionId, partOffsets] of offsets.entries()) {
1122-
dbg('committing offsets for partition session %s: %o', partitionSessionId, partOffsets);
1132+
dbg.log('committing offsets for partition session %s: %o', partitionSessionId, partOffsets);
11231133

11241134
commitOffsets.push(create(StreamReadMessage_CommitOffsetRequest_PartitionCommitOffsetSchema, {
11251135
partitionSessionId,
@@ -1168,7 +1178,7 @@ export class TopicReader implements Disposable {
11681178
return; // Already disposed, nothing to do
11691179
}
11701180
this.#disposed = true;
1171-
dbg('disposing TopicReader for consumer %s', this.#options.consumer);
1181+
dbg.log('disposing TopicReader for consumer %s', this.#options.consumer);
11721182

11731183
this.#buffer.length = 0 // Clear the buffer to release memory
11741184
this.#freeBufferSize = this.#maxBufferSize; // Reset free buffer size to max buffer size

packages/topic/tests/reader.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { afterEach, beforeEach, expect, inject, test } from 'vitest'
2+
3+
import { create } from '@bufbuild/protobuf'
4+
import { CreateTopicRequestSchema, DropTopicRequestSchema, TopicServiceDefinition } from '@ydbjs/api/topic'
5+
import { Driver } from '@ydbjs/core'
6+
7+
import { TopicReader } from '../src/reader.js'
8+
9+
let driver = new Driver(inject('connectionString'), {
10+
'ydb.sdk.enable_discovery': false
11+
})
12+
await driver.ready()
13+
14+
let topicService = driver.createClient(TopicServiceDefinition)
15+
16+
let testTopicName: string
17+
let testConsumerName: string
18+
19+
beforeEach(async () => {
20+
testTopicName = `test-topic-integration-${Date.now()}`
21+
testConsumerName = `test-consumer-${Date.now()}`
22+
23+
await topicService.createTopic(
24+
create(CreateTopicRequestSchema, {
25+
path: testTopicName,
26+
partitioningSettings: {
27+
minActivePartitions: 1n,
28+
maxActivePartitions: 100n,
29+
},
30+
consumers: [
31+
{
32+
name: testConsumerName,
33+
},
34+
],
35+
})
36+
)
37+
})
38+
39+
afterEach(async () => {
40+
await topicService.dropTopic(create(DropTopicRequestSchema, {
41+
path: testTopicName,
42+
}))
43+
})
44+
45+
test('reads single message from topic', async () => {
46+
await using reader = new TopicReader(driver, { topic: testTopicName, consumer: testConsumerName })
47+
48+
for await (let batch of reader.read({ limit: 1, waitMs: 100 })) {
49+
// Process each batch of messages
50+
expect(Array.isArray(batch)).toBe(true)
51+
52+
return
53+
}
54+
})

0 commit comments

Comments
 (0)