Skip to content

Commit 5bd5958

Browse files
committed
refactor: add transaction support to TopicReader with readInTx method and offset management
Signed-off-by: Vladislav Polyakov <[email protected]>
1 parent 8981b33 commit 5bd5958

File tree

1 file changed

+133
-1
lines changed

1 file changed

+133
-1
lines changed

packages/topic/src/reader.ts

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { nextTick } from "node:process";
55
import { create, protoInt64, toJson } from "@bufbuild/protobuf";
66
import { type Duration, DurationSchema, type Timestamp, timestampFromDate, timestampMs } from "@bufbuild/protobuf/wkt";
77
import { StatusIds_StatusCode } from "@ydbjs/api/operation";
8-
import { Codec, type OffsetsRange, OffsetsRangeSchema, type StreamReadMessage_CommitOffsetRequest_PartitionCommitOffset, StreamReadMessage_CommitOffsetRequest_PartitionCommitOffsetSchema, type StreamReadMessage_FromClient, StreamReadMessage_FromClientSchema, type StreamReadMessage_FromServer, StreamReadMessage_FromServerSchema, type StreamReadMessage_InitRequest_TopicReadSettings, StreamReadMessage_InitRequest_TopicReadSettingsSchema, type StreamReadMessage_ReadResponse, TopicServiceDefinition } from "@ydbjs/api/topic";
8+
import { Codec, type OffsetsRange, OffsetsRangeSchema, type StreamReadMessage_CommitOffsetRequest_PartitionCommitOffset, StreamReadMessage_CommitOffsetRequest_PartitionCommitOffsetSchema, type StreamReadMessage_FromClient, StreamReadMessage_FromClientSchema, type StreamReadMessage_FromServer, StreamReadMessage_FromServerSchema, type StreamReadMessage_InitRequest_TopicReadSettings, StreamReadMessage_InitRequest_TopicReadSettingsSchema, type StreamReadMessage_ReadResponse, TopicServiceDefinition, UpdateOffsetsInTransactionRequestSchema, TransactionIdentitySchema } from "@ydbjs/api/topic";
99
import type { Driver } from "@ydbjs/core";
1010
import { YDBError } from "@ydbjs/error";
1111
import { type RetryConfig, retry } from "@ydbjs/retry";
@@ -18,6 +18,7 @@ import { AsyncEventEmitter } from "./aee.js";
1818
import { type CodecMap, defaultCodecMap } from "./codec.js";
1919
import { TopicMessage } from "./message.js";
2020
import { TopicPartitionSession } from "./partition-session.js";
21+
import type { TX } from "./tx.js";
2122

2223
const dbg = debug('ydbjs').extend('topic').extend('reader')
2324

@@ -140,6 +141,8 @@ export class TopicReader implements Disposable {
140141
// pending commits that are not yet resolved.
141142
#pendingCommits: Map<bigint, TopicCommitPromise[]> = new Map(); // partitionSessionId -> TopicCommitPromise[]
142143

144+
#txReadMessages = new Map(); // partitionSessionId -> TopicMessage[]
145+
143146
/**
144147
* Creates a new TopicReader instance.
145148
* @param driver - The YDB driver instance to use for communication with the YDB server.
@@ -890,6 +893,135 @@ export class TopicReader implements Disposable {
890893
}
891894
}
892895

896+
readInTx(
897+
tx: TX,
898+
options: { limit?: number, waitMs?: number, signal?: AbortSignal } = {}
899+
): AsyncIterable<TopicMessage[]> {
900+
let base = this.read(options);
901+
902+
tx.registerPrecommitHook(async () => {
903+
let messages = this.#consumeTxReadMessages();
904+
if (messages.length === 0) return;
905+
await this.#commitTxOffsets(messages, { id: tx.transactionId, session: tx.sessionId });
906+
});
907+
908+
return {
909+
[Symbol.asyncIterator]: () => {
910+
let it = base[Symbol.asyncIterator]();
911+
return {
912+
next: async (): Promise<IteratorResult<TopicMessage[]>> => {
913+
let res = await it.next();
914+
if (!res.done && res.value && res.value.length > 0) {
915+
for (let msg of res.value) {
916+
let partitionSession = msg.partitionSession.deref();
917+
if (!partitionSession) continue;
918+
let id = partitionSession.partitionSessionId;
919+
if (!this.#txReadMessages.has(id)) {
920+
this.#txReadMessages.set(id, []);
921+
}
922+
this.#txReadMessages.get(id)!.push(msg);
923+
}
924+
}
925+
return res;
926+
},
927+
return: async (value?: any): Promise<IteratorResult<TopicMessage[]>> => {
928+
if (typeof it.return === 'function') {
929+
await it.return(value);
930+
}
931+
return { value, done: true };
932+
}
933+
};
934+
}
935+
};
936+
}
937+
938+
#consumeTxReadMessages() {
939+
let arr: TopicMessage[] = [];
940+
for (let msgs of this.#txReadMessages.values()) arr.push(...msgs);
941+
this.#txReadMessages = new Map();
942+
return arr;
943+
}
944+
945+
async #commitTxOffsets(
946+
messages: TopicMessage[],
947+
tx: { id: string, session: string }
948+
): Promise<void> {
949+
// Check if tx is valid
950+
if (!tx.id || !tx.session) return;
951+
952+
// Map to group and organize offsets by partition session ID
953+
let offsets: Map<bigint, OffsetsRange[]> = new Map();
954+
// Map to store topic/partition info for each partition session
955+
let topicPartitionInfo: Map<bigint, { topicPath: string, partitionId: bigint }> = new Map();
956+
957+
// Process each message to be committed
958+
for (let msg of messages) {
959+
// Each message must be alive
960+
if (!msg.alive) continue;
961+
962+
let partitionSession = msg.partitionSession.deref();
963+
if (!partitionSession) continue;
964+
965+
let id = partitionSession.partitionSessionId;
966+
let topicPath = partitionSession.topicPath;
967+
let partitionId = partitionSession.partitionId;
968+
topicPartitionInfo.set(id, { topicPath, partitionId });
969+
let offset = msg.offset!;
970+
971+
// Initialize empty array for this partition if it doesn't exist yet
972+
if (!offsets.has(id)) {
973+
offsets.set(id, []);
974+
}
975+
976+
let partOffsets = offsets.get(id)!;
977+
978+
// Optimize storage by merging consecutive offsets into ranges
979+
if (partOffsets.length > 0) {
980+
let last = partOffsets[partOffsets.length - 1];
981+
if (offset === last.end) {
982+
// If the new offset is consecutive to the last range, extend the range
983+
last.end = offset + 1n;
984+
} else if (offset > last.end) {
985+
// If there's a gap between offsets, create a new range
986+
partOffsets.push(create(OffsetsRangeSchema, { start: offset, end: offset + 1n }));
987+
} else {
988+
// If offset <= last.end, it's either out of order or a duplicate.
989+
throw new Error(`Message with offset ${offset} is out of order or duplicate for partition session ${id}`);
990+
}
991+
} else {
992+
// First offset for this partition, create initial range
993+
partOffsets.push(create(OffsetsRangeSchema, { start: offset, end: offset + 1n }));
994+
}
995+
}
996+
997+
// Convert our optimized Map structure into the API's expected format in a single pass
998+
let topics: { path: string, partitions: { partitionId: bigint, partitionOffsets: OffsetsRange[] }[] }[] = [];
999+
let topicMap = new Map<string, typeof topics[number]>();
1000+
1001+
for (let [id, partOffsets] of offsets.entries()) {
1002+
let { topicPath, partitionId } = topicPartitionInfo.get(id)!;
1003+
let topicEntry = topicMap.get(topicPath);
1004+
if (!topicEntry) {
1005+
topicEntry = { path: topicPath, partitions: [] };
1006+
topicMap.set(topicPath, topicEntry);
1007+
topics.push(topicEntry);
1008+
}
1009+
topicEntry.partitions.push({ partitionId, partitionOffsets: partOffsets });
1010+
}
1011+
1012+
// Build and send the request
1013+
let req = create(UpdateOffsetsInTransactionRequestSchema, {
1014+
tx: create(TransactionIdentitySchema, tx),
1015+
topics,
1016+
consumer: this.#options.consumer,
1017+
});
1018+
let client = this.#driver.createClient(TopicServiceDefinition);
1019+
let resp = await client.updateOffsetsInTransaction(req);
1020+
if (resp.operation!.status !== StatusIds_StatusCode.SUCCESS) {
1021+
throw new YDBError(resp.operation!.status, resp.operation!.issues);
1022+
}
1023+
}
1024+
8931025
/**
8941026
* Commits offsets for the provided messages.
8951027
*

0 commit comments

Comments
 (0)