Skip to content

Commit ac7c47c

Browse files
committed
feat(e2e): add end-to-end tests and configuration files for topic operations
Signed-off-by: Vladislav Polyakov <[email protected]>
1 parent b184d00 commit ac7c47c

File tree

5 files changed

+421
-0
lines changed

5 files changed

+421
-0
lines changed

e2e/package.json

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"name": "@ydbjs/e2e",
3+
"version": "6.0.0",
4+
"private": true,
5+
"type": "module",
6+
"engines": {
7+
"node": ">=20.19.0",
8+
"npm": ">=10"
9+
},
10+
"engineStrict": true,
11+
"scripts": {
12+
"test": "vitest --run --project e2e",
13+
"test:debug": "FORCE_COLOR=3 DEBUG_COLORS=1 DEBUG=ydbjs:* vitest --project e2e --inspect --no-file-parallelism"
14+
},
15+
"dependencies": {
16+
"@bufbuild/protobuf": "^2.6.0",
17+
"@ydbjs/api": "*",
18+
"@ydbjs/auth": "*",
19+
"@ydbjs/core": "*",
20+
"@ydbjs/debug": "*",
21+
"@ydbjs/query": "*",
22+
"@ydbjs/topic": "*"
23+
},
24+
"publishConfig": {
25+
"access": "restricted"
26+
}
27+
}

e2e/topic/read-write-tx.test.ts

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import { afterEach, beforeEach, expect, inject, test } from 'vitest'
2+
import { Driver } from '@ydbjs/core'
3+
import { createTopicReader, createTopicTxReader } from '@ydbjs/topic/reader'
4+
import { createTopicTxWriter, createTopicWriter } from '@ydbjs/topic/writer'
5+
import { CreateTopicRequestSchema, DropTopicRequestSchema, TopicServiceDefinition } from '@ydbjs/api/topic'
6+
import { create } from '@bufbuild/protobuf'
7+
import { query } from '@ydbjs/query'
8+
9+
// #region setup
10+
declare module 'vitest' {
11+
export interface ProvidedContext {
12+
connectionString: string
13+
}
14+
}
15+
16+
let driver = new Driver(inject('connectionString'), {
17+
'ydb.sdk.enable_discovery': false,
18+
})
19+
20+
await driver.ready()
21+
22+
let topicService = driver.createClient(TopicServiceDefinition)
23+
24+
let testTopicName: string
25+
let testProducerName: string
26+
let testConsumerName: string
27+
28+
beforeEach(async () => {
29+
testTopicName = `test-topic-integration-${Date.now()}`
30+
testProducerName = `test-producer-${Date.now()}`
31+
testConsumerName = `test-consumer-${Date.now()}`
32+
33+
await topicService.createTopic(
34+
create(CreateTopicRequestSchema, {
35+
path: testTopicName,
36+
partitioningSettings: {
37+
minActivePartitions: 1n,
38+
maxActivePartitions: 100n,
39+
},
40+
consumers: [
41+
{
42+
name: testConsumerName,
43+
},
44+
],
45+
})
46+
)
47+
})
48+
49+
afterEach(async () => {
50+
await topicService.dropTopic(
51+
create(DropTopicRequestSchema, {
52+
path: testTopicName,
53+
})
54+
)
55+
})
56+
// #endregion
57+
58+
test('writes and reads in tx', async () => {
59+
await using yql = query(driver)
60+
61+
await using writer = createTopicWriter(driver, {
62+
topic: testTopicName,
63+
producer: testProducerName,
64+
})
65+
66+
// Write a message outside of a transaction (1)
67+
writer.write(Buffer.from('written', 'utf-8'), { seqNo: 1n })
68+
await writer.close()
69+
70+
// Begin a transaction
71+
await yql.begin({ idempotent: true }, async (tx) => {
72+
await using readerTx = createTopicTxReader(driver, {
73+
tx,
74+
topic: testTopicName,
75+
consumer: testConsumerName,
76+
})
77+
78+
await using writerTx = createTopicTxWriter(driver, tx, {
79+
topic: testTopicName,
80+
producer: testProducerName,
81+
})
82+
83+
// Write a message inside the transaction (2)
84+
writerTx.write(Buffer.from('written in tx', 'utf-8'), { seqNo: 2n })
85+
await writerTx.flush()
86+
87+
// Read messages inside the transaction.
88+
// Expect to see the message written outside the transaction (1).
89+
// Expect NOT to see the message written in the transaction (2).
90+
for await (let batch of readerTx.read()) {
91+
expect(batch).toHaveLength(1)
92+
93+
let message = batch[0]!
94+
expect(message.seqNo).toBe(1n)
95+
expect(message.offset).toBe(0n)
96+
expect(message.payload).toStrictEqual(Buffer.from('written', 'utf-8'))
97+
break
98+
}
99+
})
100+
101+
await using reader = createTopicReader(driver, {
102+
topic: testTopicName,
103+
consumer: testConsumerName,
104+
})
105+
106+
// Read messages outside of the transaction.
107+
// Expect to see the message written in the transaction (2).
108+
// Expect NOT to see the message written outside the transaction (1).
109+
for await (let batch of reader.read()) {
110+
expect(batch).toHaveLength(1)
111+
112+
let message = batch[0]!
113+
expect(message.seqNo).toBe(2n)
114+
expect(message.offset).toBe(1n)
115+
expect(message.payload).toStrictEqual(Buffer.from('written in tx', 'utf-8'))
116+
await reader.commit(batch)
117+
break
118+
}
119+
})
120+
121+
test('rollbacks reads', async () => {
122+
await using yql = query(driver)
123+
124+
await using writer = createTopicWriter(driver, {
125+
topic: testTopicName,
126+
producer: testProducerName,
127+
})
128+
129+
// Write a message outside of a transaction (1)
130+
writer.write(Buffer.from('written', 'utf-8'), { seqNo: 1n })
131+
await writer.close()
132+
133+
await yql
134+
.begin({ idempotent: true }, async (tx) => {
135+
await using readerTx = createTopicTxReader(driver, {
136+
tx,
137+
topic: testTopicName,
138+
consumer: testConsumerName,
139+
})
140+
141+
142+
// Read messages inside the transaction.
143+
// Expect to see the message written outside the transaction (1).
144+
for await (let batch of readerTx.read()) {
145+
expect(batch).toHaveLength(1)
146+
expect(batch[0]?.payload).toStrictEqual(Buffer.from('written', 'utf-8'))
147+
break
148+
}
149+
150+
// Simulate a transaction failure. User error is always non-retriable.
151+
throw new Error('User error')
152+
})
153+
.catch((error) => {
154+
expect(error).toBeInstanceOf(Error)
155+
expect(error.message).toBe('Transaction failed.')
156+
expect((error as Error).cause).toBeInstanceOf(Error)
157+
expect(((error as Error).cause as Error).message).toBe('User error')
158+
})
159+
160+
await using reader = createTopicReader(driver, {
161+
topic: testTopicName,
162+
consumer: testConsumerName,
163+
})
164+
165+
// Read messages outside of the transaction.
166+
// Expect to see the message written outside the transaction (1).
167+
for await (let batch of reader.read()) {
168+
expect(batch).toHaveLength(1)
169+
170+
let message = batch[0]!
171+
expect(message.seqNo).toBe(1n)
172+
expect(message.offset).toBe(0n)
173+
expect(message.payload).toStrictEqual(Buffer.from('written', 'utf-8'))
174+
await reader.commit(batch)
175+
break
176+
}
177+
})
178+
179+
test('rollbacks writes', async () => {
180+
await using yql = query(driver)
181+
182+
await using reader = createTopicReader(driver, {
183+
topic: testTopicName,
184+
consumer: testConsumerName,
185+
})
186+
187+
await yql
188+
.begin({ idempotent: true }, async (tx) => {
189+
await using writerTx = createTopicTxWriter(driver, tx, {
190+
topic: testTopicName,
191+
producer: testProducerName,
192+
})
193+
194+
// Write a message inside the transaction (2)
195+
writerTx.write(Buffer.from('written in tx', 'utf-8'), { seqNo: 2n })
196+
await writerTx.flush()
197+
198+
// Simulate a transaction failure. User error is always non-retriable.
199+
throw new Error('User error')
200+
})
201+
.catch((error) => {
202+
expect(error).toBeInstanceOf(Error)
203+
expect(error.message).toBe('Transaction failed.')
204+
expect((error as Error).cause).toBeInstanceOf(Error)
205+
expect(((error as Error).cause as Error).message).toBe('User error')
206+
})
207+
208+
// Read messages outside of the transaction.
209+
// Expect NOT to see the message written inside the transaction (2).
210+
for await (let batch of reader.read({ waitMs: 1000 })) {
211+
expect(batch).toHaveLength(0)
212+
break
213+
}
214+
})

e2e/topic/read-write.test.ts

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import { afterEach, assert, beforeEach, inject, test } from "vitest";
2+
import { Driver } from '@ydbjs/core'
3+
import { createTopicReader } from '@ydbjs/topic/reader'
4+
import { createTopicWriter } from '@ydbjs/topic/writer'
5+
import { CreateTopicRequestSchema, DropTopicRequestSchema, TopicServiceDefinition } from "@ydbjs/api/topic";
6+
import { create } from "@bufbuild/protobuf";
7+
import { once } from "node:events";
8+
9+
// #region setup
10+
declare module 'vitest' {
11+
export interface ProvidedContext {
12+
connectionString: string
13+
}
14+
}
15+
16+
let driver = new Driver(inject('connectionString'), {
17+
'ydb.sdk.enable_discovery': false
18+
})
19+
20+
await driver.ready()
21+
22+
let topicService = driver.createClient(TopicServiceDefinition)
23+
24+
let testTopicName: string
25+
let testProducerName: string
26+
let testConsumerName: string
27+
28+
beforeEach(async () => {
29+
testTopicName = `test-topic-integration-${Date.now()}`
30+
testProducerName = `test-producer-${Date.now()}`
31+
testConsumerName = `test-consumer-${Date.now()}`
32+
33+
await topicService.createTopic(
34+
create(CreateTopicRequestSchema, {
35+
path: testTopicName,
36+
partitioningSettings: {
37+
minActivePartitions: 1n,
38+
maxActivePartitions: 100n,
39+
},
40+
consumers: [
41+
{
42+
name: testConsumerName,
43+
},
44+
],
45+
})
46+
)
47+
})
48+
49+
afterEach(async () => {
50+
await topicService.dropTopic(create(DropTopicRequestSchema, {
51+
path: testTopicName,
52+
}))
53+
})
54+
// #endregion
55+
56+
test('writes and reads messages from a topic', async () => {
57+
await using writer = createTopicWriter(driver, {
58+
topic: testTopicName,
59+
producer: testProducerName,
60+
})
61+
62+
await using reader = createTopicReader(driver, {
63+
topic: testTopicName,
64+
consumer: testConsumerName,
65+
})
66+
67+
// Write a message to the topic
68+
writer.write(Buffer.from('Hello, world!', "utf-8"))
69+
70+
await writer.flush()
71+
72+
// Read the message from the topic
73+
for await (let batch of reader.read()) {
74+
assert.equal(batch.length, 1, 'Expected one message in batch');
75+
let message = batch[0]!;
76+
assert.equal(Buffer.from(message.payload).toString('utf-8'), 'Hello, world!')
77+
78+
await reader.commit(batch)
79+
80+
break
81+
}
82+
});
83+
84+
test('writes and reads concurrently', { timeout: 60_000 }, async (tc) => {
85+
const BATCH_SIZE = 1024;
86+
const MESSAGE_SIZE = 16 * 1024;
87+
const TOTAL_BATCHES = 16;
88+
const TOTAL_TRAFFIC = TOTAL_BATCHES * BATCH_SIZE * MESSAGE_SIZE;
89+
90+
await using writer = createTopicWriter(driver, {
91+
topic: testTopicName,
92+
producer: testProducerName,
93+
maxInflightCount: TOTAL_BATCHES * BATCH_SIZE
94+
})
95+
96+
await using reader = createTopicReader(driver, {
97+
topic: testTopicName,
98+
consumer: testConsumerName,
99+
maxBufferBytes: BigInt(TOTAL_TRAFFIC),
100+
})
101+
102+
let wb = 0
103+
let rb = 0
104+
let ctrl = new AbortController()
105+
let signal = AbortSignal.any([tc.signal, ctrl.signal, AbortSignal.timeout(25_000)])
106+
107+
// Write messages to the topic
108+
void (async () => {
109+
while (wb < TOTAL_TRAFFIC) {
110+
if (signal.aborted) break
111+
112+
for (let i = 0; i < BATCH_SIZE; i++) {
113+
writer.write(Buffer.alloc(MESSAGE_SIZE))
114+
}
115+
116+
// oxlint-disable-next-line no-await-in-loop
117+
118+
wb += MESSAGE_SIZE * BATCH_SIZE
119+
}
120+
121+
let start = performance.now()
122+
await writer.flush()
123+
console.log(`Write took ${performance.now() - start} ms`)
124+
console.log(`Throughput: ${(wb / (performance.now() - start)) * 1000 / 1024 / 1024} MiB/s`)
125+
})()
126+
127+
// Read messages from the topic
128+
void (async () => {
129+
let start = performance.now()
130+
131+
for await (let batch of reader.read({ signal })) {
132+
let promise = reader.commit(batch)
133+
134+
rb += MESSAGE_SIZE * batch.length
135+
136+
if (rb === TOTAL_TRAFFIC) {
137+
await promise
138+
ctrl.abort()
139+
}
140+
}
141+
142+
console.log(`Read took ${performance.now() - start} ms`)
143+
console.log(`Throughput: ${(rb / (performance.now() - start)) * 1000 / 1024 / 1024} MiB/s`)
144+
})()
145+
146+
let start = Date.now()
147+
await once(ctrl.signal, 'abort')
148+
await writer.close()
149+
await reader.close()
150+
151+
console.log(`Wrote ${wb} bytes and read ${rb} bytes in ${Date.now() - start} ms.`)
152+
console.log(`Throughput: ${(rb / (Date.now() - start)) * 1000 / 1024 / 1024} MiB/s`)
153+
})

0 commit comments

Comments
 (0)