Skip to content

Commit 17baf2a

Browse files
committed
NIFI-14331 Allow for removing all undefined elements (not only top level keys) from the JSON.
1 parent 783e3e6 commit 17baf2a

File tree

6 files changed

+95
-22
lines changed

6 files changed

+95
-22
lines changed

nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,10 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche
136136
final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
137137

138138
final Map<String, Object> values = new LinkedHashMap<>(schema.getFieldCount() * 2);
139-
final JsonNode jsonNodeForSerialization;
140139

141140
if (dropUnknown) {
142-
jsonNodeForSerialization = jsonNode.deepCopy();
143-
144141
// Delete unknown fields for updated serialized representation
145-
final Iterator<Map.Entry<String, JsonNode>> fields = jsonNodeForSerialization.fields();
142+
final Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();
146143
while (fields.hasNext()) {
147144
final Map.Entry<String, JsonNode> field = fields.next();
148145
final String fieldName = field.getKey();
@@ -172,8 +169,6 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche
172169
values.put(fieldName, value);
173170
}
174171
} else {
175-
jsonNodeForSerialization = jsonNode;
176-
177172
final Iterator<String> fieldNames = jsonNode.fieldNames();
178173
while (fieldNames.hasNext()) {
179174
final String fieldName = fieldNames.next();
@@ -194,7 +189,7 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche
194189
}
195190
}
196191

197-
final Supplier<String> supplier = jsonNodeForSerialization::toString;
192+
final Supplier<String> supplier = jsonNode::toString;
198193
return new MapRecord(schema, values, SerializedForm.of(supplier, "application/json"), false, dropUnknown);
199194
}
200195

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@
632632
<exclude>src/test/resources/TestConvertRecord/input/person_dropfield.json</exclude>
633633
<exclude>src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc</exclude>
634634
<exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
635+
<exclude>src/test/resources/TestConvertRecord/schema/personJob.avsc</exclude>
635636
<exclude>src/test/resources/TestCountText/jabberwocky.txt</exclude>
636637
<exclude>src/test/resources/TestExtractGrok/apache.log</exclude>
637638
<exclude>src/test/resources/TestExtractGrok/patterns</exclude>

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.nio.file.Path;
5151
import java.nio.file.Paths;
5252
import java.util.HashMap;
53+
import java.util.List;
5354
import java.util.Map;
5455

5556
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -411,21 +412,21 @@ public void testDateConversionWithUTCMinusTimezone() throws Exception {
411412

412413
@Test
413414
public void testJSONDroppingUnknownFields() throws InitializationException, IOException {
415+
final String personJobSchema =
416+
Files.readString(Paths.get("src/test/resources/TestConvertRecord/schema/personJob.avsc"));
414417
final JsonTreeReader jsonReader = new JsonTreeReader();
415418
runner.addControllerService(READER_ID, jsonReader);
416-
417419
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
418-
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
420+
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, personJobSchema);
419421
runner.enableControllerService(jsonReader);
420-
421422
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
422423
runner.addControllerService(WRITER_ID, jsonWriter);
423424
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
424-
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
425+
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, personJobSchema);
425426
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
426427
runner.enableControllerService(jsonWriter);
427428

428-
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_dropfield.json"));
429+
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/personJob_dropfield.json"));
429430

430431
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
431432
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
@@ -434,7 +435,10 @@ public void testJSONDroppingUnknownFields() throws InitializationException, IOEx
434435
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
435436

436437
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
437-
assertFalse(flowFile.getContent().contains("fieldThatShouldBeRemoved"));
438+
final List<String> undefinedTopLevelKeys = List.of("undefinedKey", "undefinedObjectArray", "undefinedObject", "undefinedScalarArray");
439+
440+
undefinedTopLevelKeys.forEach(undefinedTopLevelKey -> assertFalse(flowFile.getContent().contains(undefinedTopLevelKey)));
441+
assertFalse(flowFile.getContent().contains("undefinedKeyInObject"));
438442
}
439443

440444
@Test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
[
2+
{
3+
"id": 485,
4+
"name": {
5+
"last": "Doe",
6+
"first": "John",
7+
"undefinedKeyInObject": "whatever"
8+
},
9+
"status": "ACTIVE",
10+
"undefinedKey" : "whatever",
11+
"jobs": [
12+
{
13+
"jobId": "1",
14+
"jobName": "someJob",
15+
"undefinedKeyInObject": "whatever"
16+
}
17+
],
18+
"undefinedObjectArray" : [
19+
{
20+
"elementId" : "1",
21+
"elementName" : "someName"
22+
}
23+
],
24+
"undefinedObject" : {
25+
"elementId" : "2",
26+
"elementName" : "someName"
27+
},
28+
"undefinedScalarArray" : [1, 2, 3]
29+
}
30+
]

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json

-9
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"name": "personWithJobRecord",
3+
"namespace": "nifi",
4+
"type": "record",
5+
"fields": [
6+
{ "name": "id", "type": "int" },
7+
{ "name": "name", "type": {
8+
"type": "record",
9+
"name": "nameRecord",
10+
"fields": [
11+
{ "name": "last", "type": "string" },
12+
{ "name": "first", "type": "string" }
13+
]
14+
}
15+
},
16+
{
17+
"name": "status",
18+
"type": ["null", {
19+
"type": "enum",
20+
"name": "statusEnum",
21+
"symbols": [
22+
"ACTIVE",
23+
"INACTIVE"
24+
]
25+
}],
26+
"default": null
27+
},
28+
{
29+
"name" : "jobs",
30+
"type" : [
31+
"null",
32+
{
33+
"type" : "array",
34+
"items": {
35+
"type": "record",
36+
"name": "jobType",
37+
"fields": [
38+
{
39+
"name": "jobId",
40+
"type": ["null", "string"]
41+
},
42+
{
43+
"name": "jobName",
44+
"type": ["null", "string"]
45+
}
46+
]
47+
}
48+
}
49+
]
50+
}
51+
]
52+
}

0 commit comments

Comments
 (0)