Skip to content

Commit 92b0513

Browse files
authored
Support for QuestDB arrays (#33)
What This Enables This PR allows you to stream multi-dimensional array data from Kafka to QuestDB. Previously, any message containing arrays would cause the connector to crash with Unsupported type errors. Now you can store sensor readings, machine learning features, financial time series, and other array-based data. Real-World Use Cases Now Supported IoT & Sensor Data { "device_id": "sensor_001", "temperature_readings": [20.1, 20.3, 20.5, 20.7], "location": "warehouse_a" } Machine Learning Features { "model_id": "classifier_v2", "feature_matrix": [[0.1, 0.8], [0.3, 0.6], [0.9, 0.2]], "prediction_confidence": 0.94 } Financial Market Data { "symbol": "AAPL", "ohlc_5min": [[150.1, 151.2, 149.8, 150.9], [150.9, 152.1, 150.3, 151.5]], "volume": [12500, 13200] } Your array data flows through to QuestDB and gets stored as native arrays, queryable with QuestDB's array functions. Data Format Support With Kafka Connect Schema // Your existing schema definitions work Schema arraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(); Schema-Free JSON (Most Common) // Just send your JSON - arrays are auto-detected {"sensor_data": [23.1, 24.5, 22.8]}` Current Limitations Supported 1D, 2D, 3D arrays (current QuestDB version) Numeric data (integers and floats) Both schema and schema-free Kafka messages Nested arrays in complex JSON structures Not Yet Supported String arrays - ["apple", "banana"] Mixed-type arrays - [1, "text", 3.14] Null elements - [1.0, null, 3.0]] Empty Arrays - this is a temporary limitation, empty arrays will be supported in future versions Future Support Up to 32 dimensions coming in next QuestDB version Performance optimizations
1 parent d616611 commit 92b0513

File tree

6 files changed

+859
-22
lines changed

6 files changed

+859
-22
lines changed

connector/src/main/java/io/questdb/kafka/BufferingSender.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package io.questdb.kafka;
22

33
import io.questdb.client.Sender;
4+
import io.questdb.cutlass.line.array.DoubleArray;
5+
import io.questdb.cutlass.line.array.LongArray;
46
import io.questdb.std.BoolList;
57
import io.questdb.std.LongList;
8+
import io.questdb.std.bytes.DirectByteSlice;
69

710
import java.time.Instant;
811
import java.time.temporal.ChronoUnit;
@@ -34,6 +37,12 @@ final class BufferingSender implements Sender {
3437
private final List<CharSequence> symbolColumnNames = new ArrayList<>(DEFAULT_CAPACITY);
3538
private final List<CharSequence> symbolColumnValues = new ArrayList<>(DEFAULT_CAPACITY);
3639
private final Set<CharSequence> symbolColumns = new HashSet<>();
40+
private final List<CharSequence> doubleArrayNames = new ArrayList<>(DEFAULT_CAPACITY);
41+
private final List<double[]> doubleArrayValues = new ArrayList<>(DEFAULT_CAPACITY);
42+
private final List<CharSequence> doubleArray2DNames = new ArrayList<>(DEFAULT_CAPACITY);
43+
private final List<double[][]> doubleArray2DValues = new ArrayList<>(DEFAULT_CAPACITY);
44+
private final List<CharSequence> doubleArray3DNames = new ArrayList<>(DEFAULT_CAPACITY);
45+
private final List<double[][][]> doubleArray3DValues = new ArrayList<>(DEFAULT_CAPACITY);
3746

3847
BufferingSender(Sender sender, String symbolColumns) {
3948
this.sender = sender;
@@ -97,6 +106,11 @@ public Sender boolColumn(CharSequence name, boolean value) {
97106
return this;
98107
}
99108

109+
@Override
110+
public DirectByteSlice bufferView() {
111+
throw new UnsupportedOperationException("not implemented");
112+
}
113+
100114
@Override
101115
public void cancelRow() {
102116
symbolColumnNames.clear();
@@ -111,6 +125,12 @@ public void cancelRow() {
111125
boolValues.clear();
112126
timestampNames.clear();
113127
timestampValues.clear();
128+
doubleArrayNames.clear();
129+
doubleArrayValues.clear();
130+
doubleArray2DNames.clear();
131+
doubleArray2DValues.clear();
132+
doubleArray3DNames.clear();
133+
doubleArray3DValues.clear();
114134

115135
sender.cancelRow();
116136
}
@@ -193,6 +213,30 @@ private void transferFields() {
193213
}
194214
timestampNames.clear();
195215
timestampValues.clear();
216+
217+
for (int i = 0, n = doubleArrayNames.size(); i < n; i++) {
218+
CharSequence fieldName = doubleArrayNames.get(i);
219+
double[] fieldValue = doubleArrayValues.get(i);
220+
sender.doubleArray(fieldName, fieldValue);
221+
}
222+
doubleArrayNames.clear();
223+
doubleArrayValues.clear();
224+
225+
for (int i = 0, n = doubleArray2DNames.size(); i < n; i++) {
226+
CharSequence fieldName = doubleArray2DNames.get(i);
227+
double[][] fieldValue = doubleArray2DValues.get(i);
228+
sender.doubleArray(fieldName, fieldValue);
229+
}
230+
doubleArray2DNames.clear();
231+
doubleArray2DValues.clear();
232+
233+
for (int i = 0, n = doubleArray3DNames.size(); i < n; i++) {
234+
CharSequence fieldName = doubleArray3DNames.get(i);
235+
double[][][] fieldValue = doubleArray3DValues.get(i);
236+
sender.doubleArray(fieldName, fieldValue);
237+
}
238+
doubleArray3DNames.clear();
239+
doubleArray3DValues.clear();
196240
}
197241

198242
private static long unitToMicros(long value, ChronoUnit unit) {
@@ -230,4 +274,50 @@ public void flush() {
230274
public void close() {
231275
sender.close();
232276
}
277+
278+
@Override
279+
public Sender doubleArray(CharSequence charSequence, double[] doubles) {
280+
doubleArrayNames.add(charSequence);
281+
doubleArrayValues.add(doubles);
282+
return this;
283+
}
284+
285+
@Override
286+
public Sender doubleArray(CharSequence charSequence, double[][] doubles) {
287+
doubleArray2DNames.add(charSequence);
288+
doubleArray2DValues.add(doubles);
289+
return this;
290+
}
291+
292+
@Override
293+
public Sender doubleArray(CharSequence charSequence, double[][][] doubles) {
294+
doubleArray3DNames.add(charSequence);
295+
doubleArray3DValues.add(doubles);
296+
return this;
297+
}
298+
299+
@Override
300+
public Sender doubleArray(CharSequence charSequence, DoubleArray doubleArray) {
301+
throw new UnsupportedOperationException("not implemented");
302+
}
303+
304+
@Override
305+
public Sender longArray(CharSequence charSequence, long[] longs) {
306+
throw new UnsupportedOperationException("not implemented");
307+
}
308+
309+
@Override
310+
public Sender longArray(CharSequence charSequence, long[][] longs) {
311+
throw new UnsupportedOperationException("not implemented");
312+
}
313+
314+
@Override
315+
public Sender longArray(CharSequence charSequence, long[][][] longs) {
316+
throw new UnsupportedOperationException("not implemented");
317+
}
318+
319+
@Override
320+
public Sender longArray(CharSequence charSequence, LongArray longArray) {
321+
throw new UnsupportedOperationException("not implemented");
322+
}
233323
}

0 commit comments

Comments
 (0)