Skip to content

Commit df731e4

Browse files
champ-isaacfhussonnois
authored andcommitted
process the same headers but different order of columns for CSV files
1 parent 669a804 commit df731e4

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java

+16
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public abstract class AbstractDelimitedRowFilter<T extends AbstractRecordFilter<
4141
private DelimitedRowFilterConfig configs;
4242

4343
private StructSchema schema;
44+
private String cachedHeaders;
4445

4546
private final Map<Integer, TypedField> columnsTypesByIndex = new HashMap<>();
4647

@@ -103,6 +104,10 @@ public RecordsIterable<TypedStruct> apply(final FilterContext context,
103104
if (schema == null || isSchemaDynamic()) {
104105
inferSchemaFromRecord(record, columnValues.length);
105106
}
107+
if (schema != null && configs.extractColumnName() != null && shouldInferSchema(record)) {
108+
inferSchemaFromRecord(record, columnValues.length);
109+
}
110+
106111
final TypedStruct struct = buildStructForFields(columnValues);
107112
return RecordsIterable.of(struct);
108113
}
@@ -115,12 +120,23 @@ public boolean isSchemaDynamic() {
115120
configs.isAutoGenerateColumnNames();
116121
}
117122

123+
private boolean shouldInferSchema(TypedStruct record) {
124+
if (cachedHeaders == null) {
125+
return false;
126+
}
127+
final String fieldName = configs.extractColumnName();
128+
String field = record.first(fieldName).getString();
129+
return cachedHeaders.length() == field.length() && !cachedHeaders.equals(field);
130+
}
131+
118132
private void inferSchemaFromRecord(final TypedStruct record, int numColumns) {
119133
schema = Schema.struct();
120134

121135
if (configs.extractColumnName() != null) {
122136
final String fieldName = configs.extractColumnName();
123137
String field = record.first(fieldName).getString();
138+
cachedHeaders = field;
139+
124140
if (field == null) {
125141
throw new FilterException(
126142
"Cannot find field for name '" + fieldName + "' to determine columns names"

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java

+94
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,100 @@ public void setUp() {
3939
configs.put(CSVFilter.PARSER_SEPARATOR_CONFIG, ";");
4040
}
4141

42+
@Test
43+
public void should_extract_column_names_from_diff_order_headers() {
44+
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
45+
filter.configure(configs, alias -> null);
46+
47+
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
48+
Assert.assertNotNull(output);
49+
Assert.assertEquals(1, output.size());
50+
51+
final TypedStruct record = output.iterator().next();
52+
Assert.assertEquals("value1", record.getString("col1"));
53+
Assert.assertEquals("2", record.getString("col2"));
54+
Assert.assertEquals("true", record.getString("col3"));
55+
56+
final TypedStruct input1 = TypedStruct.create()
57+
.put("message", "false;3;value2")
58+
.put("headers", Arrays.asList("col3;col2;col1"));
59+
RecordsIterable<TypedStruct> output1 = filter.apply(null, input1, false);
60+
Assert.assertNotNull(output1);
61+
Assert.assertEquals(1, output1.size());
62+
63+
final TypedStruct record1 = output1.iterator().next();
64+
Assert.assertEquals("value2", record1.getString("col1"));
65+
Assert.assertEquals("3", record1.getString("col2"));
66+
Assert.assertEquals("false", record1.getString("col3"));
67+
68+
final TypedStruct input2 = TypedStruct.create()
69+
.put("message", "4;false;value3")
70+
.put("headers", Arrays.asList("col2;col3;col1"));
71+
72+
RecordsIterable<TypedStruct> output2 = filter.apply(null, input2, false);
73+
Assert.assertNotNull(output2);
74+
Assert.assertEquals(1, output2.size());
75+
76+
final TypedStruct record2 = output2.iterator().next();
77+
Assert.assertEquals("value3", record2.getString("col1"));
78+
Assert.assertEquals("4", record2.getString("col2"));
79+
Assert.assertEquals("false", record2.getString("col3"));
80+
}
81+
82+
@Test
83+
public void should_extract_column_names_from_diff_order_headers_and_null_value() {
84+
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
85+
filter.configure(configs, alias -> null);
86+
87+
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
88+
Assert.assertNotNull(output);
89+
Assert.assertEquals(1, output.size());
90+
91+
final TypedStruct record = output.iterator().next();
92+
Assert.assertEquals("value1", record.getString("col1"));
93+
Assert.assertEquals("2", record.getString("col2"));
94+
Assert.assertEquals("true", record.getString("col3"));
95+
96+
final TypedStruct input1 = TypedStruct.create()
97+
.put("message", "false;;")
98+
.put("headers", Arrays.asList("col3;col2;col1"));
99+
RecordsIterable<TypedStruct> output1 = filter.apply(null, input1, false);
100+
Assert.assertNotNull(output1);
101+
Assert.assertEquals(1, output1.size());
102+
103+
final TypedStruct record1 = output1.iterator().next();
104+
Assert.assertNull(record1.getString("col1"));
105+
Assert.assertNull(record1.getString("col2"));
106+
Assert.assertEquals("false", record1.getString("col3"));
107+
}
108+
109+
@Test
110+
public void should_extract_column_names_from_diff_order_headers_and_diff_size() {
111+
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
112+
filter.configure(configs, alias -> null);
113+
114+
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
115+
Assert.assertNotNull(output);
116+
Assert.assertEquals(1, output.size());
117+
118+
final TypedStruct record = output.iterator().next();
119+
Assert.assertEquals("value1", record.getString("col1"));
120+
Assert.assertEquals("2", record.getString("col2"));
121+
Assert.assertEquals("true", record.getString("col3"));
122+
123+
final TypedStruct input1 = TypedStruct.create()
124+
.put("message", "false;4;")
125+
.put("headers", Arrays.asList("col3;col2"));
126+
RecordsIterable<TypedStruct> output1 = filter.apply(null, input1, false);
127+
Assert.assertNotNull(output1);
128+
Assert.assertEquals(1, output1.size());
129+
130+
final TypedStruct record1 = output1.iterator().next();
131+
Assert.assertEquals("false", record1.getString("col1"));
132+
Assert.assertEquals("4", record1.getString("col2"));
133+
Assert.assertNull(record1.getString("col3"));
134+
}
135+
42136
@Test
43137
public void should_auto_generate_schema_given_no_schema_field() {
44138
filter.configure(configs, alias -> null);

0 commit comments

Comments
 (0)