Skip to content

Commit 452ba1f

Browse files
committed
refactor: add transaction support to writer functions and introduce TX type
Signed-off-by: Vladislav Polyakov <[email protected]>
1 parent b6bf6de commit 452ba1f

File tree

7 files changed

+58
-3
lines changed

7 files changed

+58
-3
lines changed

packages/topic/src/writer/_flush.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import type { PQueue } from "../queue.js";
44
import { _batch_messages } from "./_batch_messages.js";
55
import { _emit_write_request } from "./_write_request.js";
66
import { MAX_INFLIGHT_COUNT } from "./constants.js";
7+
import type { TX } from "./tx.js";
78

89
export function _flush(ctx: {
10+
readonly tx?: TX
911
readonly queue: PQueue<StreamWriteMessage_FromClient>,
1012
readonly codec?: CompressionCodec, // Codec to use for compression
1113
readonly buffer: Map<bigint, StreamWriteMessage_WriteRequest_MessageData>; // Map of sequence numbers to messages in the buffer

packages/topic/src/writer/_init_reponse.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ import type { StreamWriteMessage_FromClient, StreamWriteMessage_InitResponse, St
22
import type { CompressionCodec } from "../codec.js";
33
import type { PQueue } from "../queue.js";
44
import { _flush } from "./_flush.js";
5+
import type { TX } from "./tx.js";
56

67
export function _on_init_response(ctx: {
8+
readonly tx?: TX
79
readonly queue: PQueue<StreamWriteMessage_FromClient>,
810
readonly codec?: CompressionCodec, // Codec to use for compression
911
readonly buffer: Map<bigint, StreamWriteMessage_WriteRequest_MessageData>; // Map of sequence numbers to messages in the buffer

packages/topic/src/writer/_write.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import type { CompressionCodec } from "../codec.js";
55
import type { PQueue } from "../queue.js";
66
import { _flush } from "./_flush.js";
77
import { MAX_PAYLOAD_SIZE } from "./constants.js";
8+
import type { TX } from "./tx.js";
89

910
export function _write(ctx: {
11+
readonly tx?: TX
1012
readonly queue: PQueue<StreamWriteMessage_FromClient>,
1113

1214
readonly codec?: CompressionCodec, // Codec to use for compression

packages/topic/src/writer/_write_request.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ import { create } from "@bufbuild/protobuf";
22
import { Codec, type StreamWriteMessage_FromClient, StreamWriteMessage_FromClientSchema, type StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic";
33
import { type CompressionCodec } from "../codec.js";
44
import type { PQueue } from "../queue.js";
5+
import type { TX } from "./tx.js";
56

67
export function _emit_write_request(ctx: {
8+
readonly tx?: TX
79
readonly queue: PQueue<StreamWriteMessage_FromClient>,
810
readonly codec?: CompressionCodec, // Codec to use for compression
911
}, messages: StreamWriteMessage_WriteRequest_MessageData[]) {
@@ -13,6 +15,10 @@ export function _emit_write_request(ctx: {
1315
value: {
1416
messages,
1517
codec: ctx.codec?.codec || Codec.RAW,
18+
tx: ctx.tx ? {
19+
id: ctx.tx.transactionId,
20+
session: ctx.tx.sessionId
21+
} : undefined
1622
}
1723
}
1824
}));

packages/topic/src/writer/_write_response.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_Mes
22
import type { CompressionCodec } from "../codec.js";
33
import type { PQueue } from "../queue.js";
44
import { _flush } from "./_flush.js";
5+
import type { TX } from "./tx.js";
56

67
export function _on_write_response(ctx: {
8+
readonly tx?: TX
79
readonly queue: PQueue<StreamWriteMessage_FromClient>,
810
readonly codec?: CompressionCodec, // Codec to use for compression
911
readonly buffer: Map<bigint, StreamWriteMessage_WriteRequest_MessageData>; // Map of sequence numbers to messages in the buffer

packages/topic/src/writer/index.ts

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ import { _send_update_token_request } from "./_update_token.js";
1818
import { _write } from "./_write.js";
1919
import { _on_write_response } from "./_write_response.js";
2020
import { MAX_BUFFER_SIZE } from "./constants.js";
21+
import type { TX } from "./tx.js";
2122

2223
export type TopicWriterOptions = {
24+
// Transaction identity.
25+
// If provided, the writer will use the transaction for writing messages.
26+
tx?: TX
2327
// Path to the topic to write to.
2428
// Example: "/Root/my-topic"
2529
topic: string
@@ -118,9 +122,13 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
118122
// When the writer stream is not ready, it will queue messages to be sent later.
119123
let isReady = false;
120124
// Flag to indicate if the writer is closed.
121-
// When the writer is closed, it will not accept new messages and will reject all pending write requests.
122-
// This is useful to ensure that the writer does not leak resources and can be closed gracefully.
125+
// When the writer is closed, it will not accept new messages.
126+
// The writer will still process and acknowledge any messages that were already sent.
123127
let isClosed = false;
128+
// Flag to indicate if the writer is disposed.
129+
// When the writer is disposed, it will not accept new messages and will reject all pending write requests.
130+
// This is useful to ensure that the writer does not leak resources and can be closed gracefully.
131+
let isDisposed = false;
124132
// Queue for messages to be written.
125133
// This is used to store messages that are not yet sent to the topic service.
126134
let queue: {
@@ -153,6 +161,7 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
153161

154162
for (const item of qq) {
155163
_write({
164+
tx: options.tx,
156165
queue: outgoing,
157166
codec: codec,
158167
lastSeqNo: (lastSeqNo || item.extra.seqNo)!,
@@ -184,6 +193,7 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
184193
await driver.ready(signal);
185194

186195
_flush({
196+
tx: options.tx,
187197
queue: outgoing,
188198
codec: codec,
189199
buffer,
@@ -270,6 +280,7 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
270280
break
271281
case 'writeResponse':
272282
_on_write_response({
283+
tx: options.tx,
273284
queue: outgoing,
274285
codec: codec,
275286
buffer,
@@ -322,6 +333,10 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
322333
throw new Error('Writer is closed, cannot write messages');
323334
}
324335

336+
if (isDisposed) {
337+
throw new Error('Writer is disposed, cannot write messages');
338+
}
339+
325340
if (!extra.seqNo && isSeqNoProvided) {
326341
throw new Error('Missing sequence number for message. Sequence number is provided by the user previously, so after that all messages must have seqNo provided');
327342
}
@@ -344,6 +359,7 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
344359
}
345360

346361
return _write({
362+
tx: options.tx,
347363
queue: outgoing,
348364
codec: codec,
349365
buffer,
@@ -363,7 +379,7 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
363379
// If a reason is provided, it will be used to reject all pending acks and write requests.
364380
// If no reason is provided, it will use a default error message.
365381
function close(reason?: Error) {
366-
if (isClosed) {
382+
if (isDisposed) {
367383
return;
368384
}
369385

@@ -395,9 +411,18 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
395411
}
396412

397413
isClosed = true;
414+
isDisposed = true;
398415
dbg('writer closed, reason: %O', reason);
399416
}
400417

418+
// Before committing the transaction, require all messages to be written and acknowledged.
419+
options.tx?.registerPrecommitHook(async () => {
420+
// Close the writer. Do not accept new messages.
421+
isClosed = true;
422+
// Wait for all messages to be flushed.
423+
await flush();
424+
})
425+
401426
return {
402427
flush,
403428
write,
@@ -413,3 +438,14 @@ export function createTopicWriter(driver: Driver, options: TopicWriterOptions):
413438
}
414439
}
415440
}
441+
442+
export function createTopicTxWriter(
443+
driver: Driver,
444+
tx: { registerPrecommitHook: (fn: () => Promise<void> | void) => void, sessionId: string, transactionId: string },
445+
options: TopicWriterOptions
446+
): TopicWriter {
447+
return createTopicWriter(driver, {
448+
...options,
449+
tx
450+
})
451+
}

packages/topic/src/writer/tx.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export type TX = {
2+
sessionId: string
3+
transactionId: string
4+
registerPrecommitHook: (fn: () => Promise<void> | void) => void
5+
}

0 commit comments

Comments
 (0)