Skip to content

Commit b2ed45a

Browse files
authored
feat: add partition template variable for dynamic table naming (#32)
#24 introduced templating with ${topic} and ${key} variables. This change adds support for ${partition} variable. This enables partitioning strategies where records from different Kafka partitions can be routed to separate QuestDB tables.
1 parent fe1ea66 commit b2ed45a

File tree

3 files changed

+86
-1
lines changed

3 files changed

+86
-1
lines changed

connector/src/main/java/io/questdb/kafka/Templating.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ private Templating() {
5151
partials.add(record -> record.key() == null ? "null" : record.key().toString());
5252
break;
5353
}
54+
case "partition": {
55+
// assumption: sink records always have a non-null kafkaPartition()
56+
partials.add(record -> String.valueOf(record.kafkaPartition()));
57+
break;
58+
}
5459
default: {
5560
throw new ConnectException("Unknown template in table name, table template: '" + template + "'");
5661
}

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,44 @@ public void testTableTemplateWithKey_withSchema(boolean useHttp) {
198198
httpPort);
199199
}
200200

201+
@ParameterizedTest
202+
@ValueSource(booleans = {true, false})
203+
public void testTableTemplateWithKeyAndPartition_withSchema(boolean useHttp) {
204+
connect.kafka().createTopic(topicName, 3);
205+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
206+
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${topic}.${key}_${partition}");
207+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
208+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
209+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
210+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
211+
.field("firstname", Schema.STRING_SCHEMA)
212+
.field("lastname", Schema.STRING_SCHEMA)
213+
.field("age", Schema.INT8_SCHEMA)
214+
.build();
215+
216+
Struct john = new Struct(schema)
217+
.put("firstname", "John")
218+
.put("lastname", "Doe")
219+
.put("age", (byte) 42);
220+
221+
Struct jane = new Struct(schema)
222+
.put("firstname", "Jane")
223+
.put("lastname", "Doe")
224+
.put("age", (byte) 41);
225+
226+
connect.kafka().produce(topicName, 1, "john", new String(converter.fromConnectData(topicName, schema, john)));
227+
connect.kafka().produce(topicName, 2, "jane", new String(converter.fromConnectData(topicName, schema, jane)));
228+
229+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
230+
+ "\"John\",\"Doe\",42\r\n",
231+
"select firstname,lastname,age from " + topicName + "." + "john_1",
232+
httpPort);
233+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
234+
+ "\"Jane\",\"Doe\",41\r\n",
235+
"select firstname,lastname,age from " + topicName + "." + "jane_2",
236+
httpPort);
237+
}
238+
201239
@Test
202240
public void testTombstoneRecordFilter() {
203241
connect.kafka().createTopic(topicName, 1);
@@ -331,6 +369,30 @@ public void testTableTemplateWithKey_schemaless(boolean useHttp) {
331369
httpPort);
332370
}
333371

372+
@ParameterizedTest
373+
@ValueSource(booleans = {true, false})
374+
public void testTableTemplateWithKeyAndPartition_schemaless(boolean useHttp) {
375+
connect.kafka().createTopic(topicName, 3);
376+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
377+
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "literal_${topic}_literal_${key}_literal_${partition}");
378+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
379+
props.put("value.converter.schemas.enable", "false");
380+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
381+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
382+
383+
connect.kafka().produce(topicName, 1, "john", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
384+
connect.kafka().produce(topicName, 2, "jane", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");
385+
386+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
387+
+ "\"John\",\"Doe\",42\r\n",
388+
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "john_literal_1",
389+
httpPort);
390+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
391+
+ "\"Jane\",\"Doe\",41\r\n",
392+
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "jane_literal_2",
393+
httpPort);
394+
}
395+
334396
@ParameterizedTest
335397
@ValueSource(booleans = {true, false})
336398
public void testDeadLetterQueue_wrongJson(boolean useHttp) {

connector/src/test/java/io/questdb/kafka/TemplatingTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ public void testPlainTableName() {
1616
assertTableName(fn, record, "table");
1717
}
1818

19+
@Test
20+
public void testPartition() {
21+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("table_${partition}");
22+
SinkRecord record = newSinkRecord("mytopic", "key", 42);
23+
assertTableName(fn, record, "table_42");
24+
}
25+
1926
@Test
2027
public void testEmptyTableName() {
2128
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("");
@@ -51,6 +58,13 @@ public void testTopicWithNullKey() {
5158
assertTableName(fn, record, "mytopic_null");
5259
}
5360

61+
@Test
62+
public void testTopicWithNullKeyAndPartition() {
63+
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}_${partition}");
64+
SinkRecord record = newSinkRecord("mytopic", null, 3);
65+
assertTableName(fn, record, "mytopic_null_3");
66+
}
67+
5468
@Test
5569
public void testMissingClosingBrackets() {
5670
assertIllegalTemplate("${topic", "Unbalanced brackets in a table template, missing closing '}', table template: '${topic'");
@@ -99,7 +113,11 @@ private static void assertTableName(Function<SinkRecord, ? extends CharSequence>
99113
}
100114

101115
private static SinkRecord newSinkRecord(String topic, String key) {
102-
return new SinkRecord(topic, 0, null, key, null, null, 0);
116+
return newSinkRecord(topic, key, 0);
117+
}
118+
119+
private static SinkRecord newSinkRecord(String topic, String key, int partition) {
120+
return new SinkRecord(topic, partition, null, key, null, null, 0);
103121
}
104122

105123
}

0 commit comments

Comments
 (0)