Skip to content

Commit 9c0a99a

Browse files
committed
support for 2D and 3D arrays
1 parent e719d7e commit 9c0a99a

File tree

2 files changed

+289
-4
lines changed

2 files changed

+289
-4
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 140 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ private void handleArray(String name, Object value, Schema schema) {
566566

567567
if (elementType == Schema.Type.FLOAT32 || elementType == Schema.Type.FLOAT64) {
568568
List<?> list = (List<?>) value;
569-
// todo: do not allocate new arrays
569+
// todo: do not allocate new arrays, depends on https://github.com/questdb/questdb/pull/5996
570570
double[] doubleArray = new double[list.size()];
571571
for (int i = 0; i < list.size(); i++) {
572572
Object element = list.get(i);
@@ -577,8 +577,61 @@ private void handleArray(String name, Object value, Schema schema) {
577577
}
578578
sender.doubleArray(name, doubleArray);
579579
} else if (elementType == Schema.Type.ARRAY) {
580-
// todo: handle multidimensional arrays
581-
onUnsupportedType(name, "Multidimensional ARRAY");
580+
Schema nestedValueSchema = valueSchema.valueSchema();
581+
if (nestedValueSchema != null && (nestedValueSchema.type() == Schema.Type.FLOAT32 || nestedValueSchema.type() == Schema.Type.FLOAT64)) {
582+
List<?> list = (List<?>) value;
583+
double[][] doubleArray2D = new double[list.size()][];
584+
for (int i = 0; i < list.size(); i++) {
585+
Object row = list.get(i);
586+
if (row == null) {
587+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
588+
}
589+
List<?> rowList = (List<?>) row;
590+
doubleArray2D[i] = new double[rowList.size()];
591+
for (int j = 0; j < rowList.size(); j++) {
592+
Object element = rowList.get(j);
593+
if (element == null) {
594+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
595+
}
596+
doubleArray2D[i][j] = ((Number) element).doubleValue();
597+
}
598+
}
599+
sender.doubleArray(name, doubleArray2D);
600+
} else if (nestedValueSchema != null && nestedValueSchema.type() == Schema.Type.ARRAY) {
601+
Schema nestedNestedValueSchema = nestedValueSchema.valueSchema();
602+
if (nestedNestedValueSchema != null && (nestedNestedValueSchema.type() == Schema.Type.FLOAT32 || nestedNestedValueSchema.type() == Schema.Type.FLOAT64)) {
603+
List<?> list = (List<?>) value;
604+
double[][][] doubleArray3D = new double[list.size()][][];
605+
for (int i = 0; i < list.size(); i++) {
606+
Object matrix = list.get(i);
607+
if (matrix == null) {
608+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
609+
}
610+
List<?> matrixList = (List<?>) matrix;
611+
doubleArray3D[i] = new double[matrixList.size()][];
612+
for (int j = 0; j < matrixList.size(); j++) {
613+
Object row = matrixList.get(j);
614+
if (row == null) {
615+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
616+
}
617+
List<?> rowList = (List<?>) row;
618+
doubleArray3D[i][j] = new double[rowList.size()];
619+
for (int k = 0; k < rowList.size(); k++) {
620+
Object element = rowList.get(k);
621+
if (element == null) {
622+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
623+
}
624+
doubleArray3D[i][j][k] = ((Number) element).doubleValue();
625+
}
626+
}
627+
}
628+
sender.doubleArray(name, doubleArray3D);
629+
} else {
630+
onUnsupportedType(name, "Multidimensional ARRAY with unsupported element type");
631+
}
632+
} else {
633+
onUnsupportedType(name, "Multidimensional ARRAY with unsupported element type");
634+
}
582635
} else {
583636
onUnsupportedType(name, "ARRAY<" + elementType + ">");
584637
}
@@ -609,7 +662,90 @@ private void handleArrayWithoutSchema(String name, List<?> list) {
609662
}
610663
sender.doubleArray(name, doubleArray);
611664
} else if (firstElement instanceof List) {
612-
onUnsupportedType(name, "Multidimensional ARRAY");
665+
List<?> firstList = (List<?>) firstElement;
666+
if (firstList.isEmpty()) {
667+
throw new InvalidDataException("QuestDB 2D array cannot contain empty rows");
668+
}
669+
Object firstNestedElement = firstList.get(0);
670+
if (firstNestedElement == null) {
671+
throw new InvalidDataException("QuestDB 2D array elements cannot be null");
672+
}
673+
674+
if (firstNestedElement instanceof Number) {
675+
double[][] doubleArray2D = new double[list.size()][];
676+
for (int i = 0; i < list.size(); i++) {
677+
Object row = list.get(i);
678+
if (row == null) {
679+
throw new InvalidDataException("QuestDB 2D array rows cannot be null");
680+
}
681+
if (!(row instanceof List)) {
682+
throw new InvalidDataException("QuestDB 2D array rows must be Lists");
683+
}
684+
List<?> rowList = (List<?>) row;
685+
doubleArray2D[i] = new double[rowList.size()];
686+
for (int j = 0; j < rowList.size(); j++) {
687+
Object element = rowList.get(j);
688+
if (element == null) {
689+
throw new InvalidDataException("QuestDB 2D array elements cannot be null");
690+
}
691+
if (!(element instanceof Number)) {
692+
throw new InvalidDataException("QuestDB 2D array elements must be Numbers");
693+
}
694+
doubleArray2D[i][j] = ((Number) element).doubleValue();
695+
}
696+
}
697+
sender.doubleArray(name, doubleArray2D);
698+
} else if (firstNestedElement instanceof List) {
699+
List<?> firstNestedList = (List<?>) firstNestedElement;
700+
if (firstNestedList.isEmpty()) {
701+
throw new InvalidDataException("QuestDB 3D array cannot contain empty matrices");
702+
}
703+
Object firstNestedNestedElement = firstNestedList.get(0);
704+
if (firstNestedNestedElement == null) {
705+
throw new InvalidDataException("QuestDB 3D array elements cannot be null");
706+
}
707+
708+
if (firstNestedNestedElement instanceof Number) {
709+
double[][][] doubleArray3D = new double[list.size()][][];
710+
for (int i = 0; i < list.size(); i++) {
711+
Object matrix = list.get(i);
712+
if (matrix == null) {
713+
throw new InvalidDataException("QuestDB 3D array matrices cannot be null");
714+
}
715+
if (!(matrix instanceof List)) {
716+
throw new InvalidDataException("QuestDB 3D array matrices must be Lists");
717+
}
718+
List<?> matrixList = (List<?>) matrix;
719+
doubleArray3D[i] = new double[matrixList.size()][];
720+
for (int j = 0; j < matrixList.size(); j++) {
721+
Object row = matrixList.get(j);
722+
if (row == null) {
723+
throw new InvalidDataException("QuestDB 3D array rows cannot be null");
724+
}
725+
if (!(row instanceof List)) {
726+
throw new InvalidDataException("QuestDB 3D array rows must be Lists");
727+
}
728+
List<?> rowList = (List<?>) row;
729+
doubleArray3D[i][j] = new double[rowList.size()];
730+
for (int k = 0; k < rowList.size(); k++) {
731+
Object element = rowList.get(k);
732+
if (element == null) {
733+
throw new InvalidDataException("QuestDB 3D array elements cannot be null");
734+
}
735+
if (!(element instanceof Number)) {
736+
throw new InvalidDataException("QuestDB 3D array elements must be Numbers");
737+
}
738+
doubleArray3D[i][j][k] = ((Number) element).doubleValue();
739+
}
740+
}
741+
}
742+
sender.doubleArray(name, doubleArray3D);
743+
} else {
744+
onUnsupportedType(name, "3D ARRAY with unsupported element type: " + firstNestedNestedElement.getClass().getSimpleName());
745+
}
746+
} else {
747+
onUnsupportedType(name, "2D ARRAY with unsupported element type: " + firstNestedElement.getClass().getSimpleName());
748+
}
613749
} else {
614750
onUnsupportedType(name, "ARRAY<" + firstElement.getClass().getSimpleName() + ">");
615751
}

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2250,4 +2250,153 @@ public void testArrayWithSkipUnsupportedTypes() {
22502250
httpPort
22512251
);
22522252
}
2253+
2254+
@ParameterizedTest
2255+
@ValueSource(booleans = {true, false})
2256+
public void test2DDoubleArraySupport(boolean useHttp) {
2257+
connect.kafka().createTopic(topicName, 1);
2258+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
2259+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2260+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2261+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2262+
2263+
// Create schema with 2D double array
2264+
Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build();
2265+
Schema arraySchema = SchemaBuilder.array(innerArraySchema).build();
2266+
Schema schema = SchemaBuilder.struct()
2267+
.name("com.example.Matrix")
2268+
.field("matrix_id", Schema.STRING_SCHEMA)
2269+
.field("data", arraySchema)
2270+
.build();
2271+
2272+
// Create 2D array data: [[1.0, 2.0], [3.0, 4.0]]
2273+
Struct struct = new Struct(schema)
2274+
.put("matrix_id", "matrix1")
2275+
.put("data", Arrays.asList(
2276+
Arrays.asList(1.0, 2.0),
2277+
Arrays.asList(3.0, 4.0)
2278+
));
2279+
2280+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2281+
2282+
QuestDBUtils.assertSqlEventually(
2283+
"\"matrix_id\",\"data\"\r\n" +
2284+
"\"matrix1\",\"[[1.0,2.0],[3.0,4.0]]\"\r\n",
2285+
"select matrix_id, data from " + topicName,
2286+
httpPort
2287+
);
2288+
}
2289+
2290+
@ParameterizedTest
2291+
@ValueSource(booleans = {true, false})
2292+
public void test3DDoubleArraySupport(boolean useHttp) {
2293+
connect.kafka().createTopic(topicName, 1);
2294+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
2295+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2296+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2297+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2298+
2299+
// Create schema with 3D double array
2300+
Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build();
2301+
Schema middleArraySchema = SchemaBuilder.array(innerArraySchema).build();
2302+
Schema arraySchema = SchemaBuilder.array(middleArraySchema).build();
2303+
Schema schema = SchemaBuilder.struct()
2304+
.name("com.example.Tensor")
2305+
.field("tensor_id", Schema.STRING_SCHEMA)
2306+
.field("data", arraySchema)
2307+
.build();
2308+
2309+
// Create 3D array data: [[[1.0, 2.0]], [[3.0, 4.0]]]
2310+
Struct struct = new Struct(schema)
2311+
.put("tensor_id", "tensor1")
2312+
.put("data", Arrays.asList(
2313+
Arrays.asList(Arrays.asList(1.0, 2.0)),
2314+
Arrays.asList(Arrays.asList(3.0, 4.0))
2315+
));
2316+
2317+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2318+
2319+
QuestDBUtils.assertSqlEventually(
2320+
"\"tensor_id\",\"data\"\r\n" +
2321+
"\"tensor1\",\"[[[1.0,2.0]],[[3.0,4.0]]]\"\r\n",
2322+
"select tensor_id, data from " + topicName,
2323+
httpPort
2324+
);
2325+
}
2326+
2327+
@Test
2328+
public void testSchemaless2DArraySupport() {
2329+
connect.kafka().createTopic(topicName, 1);
2330+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2331+
props.put("value.converter.schemas.enable", "false");
2332+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2333+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2334+
2335+
// Send JSON with 2D array
2336+
String json = "{\"experiment\":\"test1\",\"results\":[[1.5,2.5],[3.5,4.5]]}";
2337+
connect.kafka().produce(topicName, json);
2338+
2339+
QuestDBUtils.assertSqlEventually(
2340+
"\"experiment\",\"results\"\r\n" +
2341+
"\"test1\",\"[[1.5,2.5],[3.5,4.5]]\"\r\n",
2342+
"select experiment, results from " + topicName,
2343+
httpPort
2344+
);
2345+
}
2346+
2347+
@Test
2348+
public void testSchemaless3DArraySupport() {
2349+
connect.kafka().createTopic(topicName, 1);
2350+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2351+
props.put("value.converter.schemas.enable", "false");
2352+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2353+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2354+
2355+
// Send JSON with 3D array
2356+
String json = "{\"model\":\"cnn1\",\"weights\":[[[0.1,0.2]],[[0.3,0.4]]]}";
2357+
connect.kafka().produce(topicName, json);
2358+
2359+
QuestDBUtils.assertSqlEventually(
2360+
"\"model\",\"weights\"\r\n" +
2361+
"\"cnn1\",\"[[[0.1,0.2]],[[0.3,0.4]]]\"\r\n",
2362+
"select model, weights from " + topicName,
2363+
httpPort
2364+
);
2365+
}
2366+
2367+
@ParameterizedTest
2368+
@ValueSource(booleans = {true, false})
2369+
public void test2DFloatArraySupport(boolean useHttp) {
2370+
connect.kafka().createTopic(topicName, 1);
2371+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
2372+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2373+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2374+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2375+
2376+
// Create schema with 2D float array
2377+
Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT32_SCHEMA).build();
2378+
Schema arraySchema = SchemaBuilder.array(innerArraySchema).build();
2379+
Schema schema = SchemaBuilder.struct()
2380+
.name("com.example.FloatMatrix")
2381+
.field("id", Schema.STRING_SCHEMA)
2382+
.field("values", arraySchema)
2383+
.build();
2384+
2385+
// Create 2D array data with float values
2386+
Struct struct = new Struct(schema)
2387+
.put("id", "float_matrix1")
2388+
.put("values", Arrays.asList(
2389+
Arrays.asList(1.1f, 2.2f),
2390+
Arrays.asList(3.3f, 4.4f)
2391+
));
2392+
2393+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2394+
2395+
QuestDBUtils.assertSqlEventually(
2396+
"\"id\",\"values\"\r\n" +
2397+
"\"float_matrix1\",\"[[1.100000023841858,2.200000047683716],[3.299999952316284,4.400000095367432]]\"\r\n",
2398+
"select id, \"values\" from " + topicName,
2399+
httpPort
2400+
);
2401+
}
22532402
}

0 commit comments

Comments
 (0)