This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push: new c6ffe50 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) c6ffe50 is described below commit c6ffe50c6b857c9685c39808421fe729bdde4d92 Author: MichaĆ Borowiecki <mbo...@gmail.com> AuthorDate: Fri Jul 12 16:27:33 2019 +0100 KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705) Correct the Flatten SMT to properly handle null key or value `Struct` instances. Author: Michal Borowiecki <michal.borowie...@openbet.com> Reviewers: Arjun Satish <ar...@confluent.io>, Robert Yokota <rayok...@gmail.com>, Randall Hauch <rha...@gmail.com> --- .../apache/kafka/connect/transforms/Flatten.java | 29 ++++++++++------ .../kafka/connect/transforms/FlattenTest.java | 40 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java index c5e4000..d7d2144 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java @@ -35,7 +35,7 @@ import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class Flatten<R extends ConnectRecord<R>> implements Transformation<R> { @@ -136,20 +136,24 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat } private R applyWithSchema(R record) { - final Struct value = requireStruct(operatingValue(record), PURPOSE); + final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); - Schema updatedSchema = schemaUpdateCache.get(value.schema()); + Schema schema = operatingSchema(record); + Schema updatedSchema = schemaUpdateCache.get(schema); if (updatedSchema == null) { - final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct()); - Struct defaultValue = (Struct) value.schema().defaultValue(); - buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(), defaultValue); + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + Struct defaultValue = (Struct) schema.defaultValue(); + buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue); updatedSchema = builder.build(); - schemaUpdateCache.put(value.schema(), updatedSchema); + schemaUpdateCache.put(schema, updatedSchema); + } + if (value == null) { + return newRecord(record, updatedSchema, null); + } else { + final Struct updatedValue = new Struct(updatedSchema); + buildWithSchema(value, "", updatedValue); + return newRecord(record, updatedSchema, updatedValue); } - - final Struct updatedValue = new Struct(updatedSchema); - buildWithSchema(value, "", updatedValue); - return newRecord(record, updatedSchema, updatedValue); } /** @@ -216,6 +220,9 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat } private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord) { + if (record == null) { + return; + } for (Field field : record.schema().fields()) { final String fieldName = fieldName(fieldNamePrefix, field.name()); switch (field.schema().type()) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index d709054..430bba6 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -182,6 +182,46 @@ public class FlattenTest { } @Test + public void testOptionalStruct() { + xformValue.configure(Collections.<String, String>emptyMap()); + + SchemaBuilder builder = SchemaBuilder.struct().optional(); + builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); + Schema schema = builder.build(); + + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, + "topic", 0, + schema, null)); + + assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); + assertNull(transformed.value()); + } + + @Test + public void testOptionalNestedStruct() { + xformValue.configure(Collections.<String, String>emptyMap()); + + SchemaBuilder builder = SchemaBuilder.struct().optional(); + builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); + Schema supportedTypesSchema = builder.build(); + + builder = SchemaBuilder.struct(); + builder.field("B", supportedTypesSchema); + Schema oneLevelNestedSchema = builder.build(); + + Struct oneLevelNestedStruct = new Struct(oneLevelNestedSchema); + oneLevelNestedStruct.put("B", null); + + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, + "topic", 0, + oneLevelNestedSchema, oneLevelNestedStruct)); + + assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()); + Struct transformedStruct = (Struct) transformed.value(); + assertNull(transformedStruct.get("B.opt_int32")); + } + + @Test public void testOptionalFieldMap() { xformValue.configure(Collections.<String, String>emptyMap());