Skip to content

Commit 994d4f3

Browse files
committed
a test with Avro object and array
1 parent 225f7a2 commit 994d4f3

File tree

4 files changed

+76
-7
lines changed

4 files changed

+76
-7
lines changed

integration-tests/avro-schema-registry/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
<outputDirectory>${project.build.directory}/generated/avro</outputDirectory>
100100
<imports>
101101
<import>${project.basedir}/src/main/resources/avro/Student.avsc</import>
102+
<import>${project.basedir}/src/main/resources/avro/SensorReading.avsc</import>
102103
</imports>
103104
</configuration>
104105
</execution>
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"namespace": "io.questdb.kafka.domain",
3+
"type": "record",
4+
"name": "SensorReading",
5+
"fields": [
6+
{
7+
"name": "sensorId",
8+
"type": "string"
9+
},
10+
{
11+
"name": "timestamp",
12+
"type": {
13+
"type": "long",
14+
"logicalType": "timestamp-millis"
15+
}
16+
},
17+
{
18+
"name": "values",
19+
"type": {
20+
"type": "array",
21+
"items": "double"
22+
}
23+
},
24+
{
25+
"name": "location",
26+
"type": ["null", "string"],
27+
"default": null
28+
}
29+
]
30+
}

integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.debezium.testing.testcontainers.ConnectorConfiguration;
66
import io.debezium.testing.testcontainers.DebeziumContainer;
77
import io.questdb.client.Sender;
8+
import io.questdb.kafka.domain.SensorReading;
89
import io.questdb.kafka.domain.Student;
910
import org.apache.avro.Schema;
1011
import org.apache.avro.generic.GenericData;
@@ -17,9 +18,11 @@
1718
import org.jetbrains.annotations.NotNull;
1819
import org.junit.jupiter.api.Test;
1920
import org.junit.jupiter.api.extension.RegisterExtension;
21+
import org.slf4j.LoggerFactory;
2022
import org.testcontainers.containers.GenericContainer;
2123
import org.testcontainers.containers.KafkaContainer;
2224
import org.testcontainers.containers.Network;
25+
import org.testcontainers.containers.output.Slf4jLogConsumer;
2326
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
2427
import org.testcontainers.containers.wait.strategy.Wait;
2528
import org.testcontainers.junit.jupiter.Container;
@@ -28,6 +31,7 @@
2831
import org.testcontainers.utility.MountableFile;
2932

3033
import java.time.Instant;
34+
import java.util.Arrays;
3135
import java.util.Properties;
3236

3337
import static java.time.Duration.ofMinutes;
@@ -52,10 +56,10 @@ public class AvroSchemaRegistryIT {
5256
.withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "0@kafka:9094");
5357

5458
@Container
55-
private final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0")
59+
private final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:9.0.1")
5660
.withNetwork(network)
5761
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
58-
// .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
62+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
5963
.withEnv("QDB_CAIRO_COMMIT_LAG", "100")
6064
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
6165

@@ -108,7 +112,7 @@ public void testSmoke() throws Exception {
108112
producer.send(new ProducerRecord<>(topicName, "foo", student)).get();
109113
}
110114

111-
startConnector(topicName);
115+
startConnector(topicName, "birthday");
112116
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n"
113117
+ "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\"\r\n",
114118
"select * from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT));
@@ -125,7 +129,7 @@ public void testSchemaEvolution() throws Exception {
125129
.build();
126130
producer.send(new ProducerRecord<>(topicName, "foo", student)).get();
127131
}
128-
startConnector(topicName);
132+
startConnector(topicName, "birthday");
129133

130134
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n"
131135
+ "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\"\r\n",
@@ -146,7 +150,41 @@ public void testSchemaEvolution() throws Exception {
146150
"select * from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT));
147151
}
148152

149-
private void startConnector(String topicName) {
153+
@Test
154+
public void testAvroRecordsWithArrays() throws Exception {
155+
String topicName = "sensors";
156+
157+
// sensor reading with array of double values
158+
try (Producer<String, SensorReading> producer = new KafkaProducer<>(producerProps())) {
159+
SensorReading reading = SensorReading.newBuilder()
160+
.setSensorId("sensor-001")
161+
.setTimestamp(Instant.parse("2024-01-01T10:00:00Z"))
162+
.setValues(Arrays.asList(22.5, 23.1, 22.8, 23.3, 22.9))
163+
.setLocation("Building A")
164+
.build();
165+
producer.send(new ProducerRecord<>(topicName, "key1", reading)).get();
166+
167+
// Send another reading
168+
SensorReading reading2 = SensorReading.newBuilder()
169+
.setSensorId("sensor-002")
170+
.setTimestamp(Instant.parse("2024-01-01T10:05:00Z"))
171+
.setValues(Arrays.asList(18.2, 18.5, 18.3))
172+
.setLocation(null)
173+
.build();
174+
producer.send(new ProducerRecord<>(topicName, "key2", reading2)).get();
175+
}
176+
177+
startConnector(topicName, "timestamp");
178+
179+
QuestDBUtils.assertSqlEventually(
180+
"\"sensorId\",\"values\",\"location\",\"timestamp\"\r\n" +
181+
"\"sensor-001\",\"[22.5,23.1,22.8,23.3,22.9]\",\"Building A\",\"2024-01-01T10:00:00.000000Z\"\r\n" +
182+
"\"sensor-002\",\"[18.2,18.5,18.3]\",,\"2024-01-01T10:05:00.000000Z\"\r\n",
183+
"select sensorId, \"values\", location, timestamp from " + topicName + " order by timestamp",
184+
questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT));
185+
}
186+
187+
private void startConnector(String topicName, String timestampName) {
150188
String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":" + QuestDBUtils.QUESTDB_HTTP_PORT + ";auto_flush_rows=1;";
151189
ConnectorConfiguration connector = ConnectorConfiguration.create()
152190
.with("connector.class", QuestDBSinkConnector.class.getName())
@@ -155,7 +193,7 @@ private void startConnector(String topicName) {
155193
.with("value.converter", "io.confluent.connect.avro.AvroConverter")
156194
.with("value.converter.schema.registry.url", "http://" + schemaRegistry.getNetworkAliases().get(0) + ":8081")
157195
.with("topics", topicName)
158-
.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birthday")
196+
.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, timestampName)
159197
.with(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false")
160198
.with("client.conf.string", confString);
161199
connectContainer.registerConnector("my-connector", connector);

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<kafka.scala.version>2.13</kafka.scala.version>
2121
<kafka.version>3.3.1</kafka.version>
2222
<junit.version>5.9.0</junit.version>
23-
<testcontainers.version>1.19.7</testcontainers.version>
23+
<testcontainers.version>1.21.3</testcontainers.version>
2424
<slf4j.version>1.7.36</slf4j.version>
2525
<awaitability.version>4.1.0</awaitability.version>
2626
<okhttp.version>4.10.0</okhttp.version>

0 commit comments

Comments
 (0)