This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new c6ffe50  KAFKA-6605: Fix NPE in Flatten when optional Struct is null 
(#5705)
c6ffe50 is described below

commit c6ffe50c6b857c9685c39808421fe729bdde4d92
Author: MichaƂ Borowiecki <mbo...@gmail.com>
AuthorDate: Fri Jul 12 16:27:33 2019 +0100

    KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)
    
    Correct the Flatten SMT to properly handle null key or value `Struct` 
instances.
    
    Author: Michal Borowiecki <michal.borowie...@openbet.com>
    Reviewers: Arjun Satish <ar...@confluent.io>, Robert Yokota 
<rayok...@gmail.com>, Randall Hauch <rha...@gmail.com>
---
 .../apache/kafka/connect/transforms/Flatten.java   | 29 ++++++++++------
 .../kafka/connect/transforms/FlattenTest.java      | 40 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 11 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index c5e4000..d7d2144 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class Flatten<R extends ConnectRecord<R>> implements 
Transformation<R> {
 
@@ -136,20 +136,24 @@ public abstract class Flatten<R extends ConnectRecord<R>> 
implements Transformat
     }
 
     private R applyWithSchema(R record) {
-        final Struct value = requireStruct(operatingValue(record), PURPOSE);
+        final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
 
-        Schema updatedSchema = schemaUpdateCache.get(value.schema());
+        Schema schema = operatingSchema(record);
+        Schema updatedSchema = schemaUpdateCache.get(schema);
         if (updatedSchema == null) {
-            final SchemaBuilder builder = 
SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
-            Struct defaultValue = (Struct) value.schema().defaultValue();
-            buildUpdatedSchema(value.schema(), "", builder, 
value.schema().isOptional(), defaultValue);
+            final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
+            Struct defaultValue = (Struct) schema.defaultValue();
+            buildUpdatedSchema(schema, "", builder, schema.isOptional(), 
defaultValue);
             updatedSchema = builder.build();
-            schemaUpdateCache.put(value.schema(), updatedSchema);
+            schemaUpdateCache.put(schema, updatedSchema);
+        }
+        if (value == null) {
+            return newRecord(record, updatedSchema, null);
+        } else {
+            final Struct updatedValue = new Struct(updatedSchema);
+            buildWithSchema(value, "", updatedValue);
+            return newRecord(record, updatedSchema, updatedValue);
         }
-
-        final Struct updatedValue = new Struct(updatedSchema);
-        buildWithSchema(value, "", updatedValue);
-        return newRecord(record, updatedSchema, updatedValue);
     }
 
     /**
@@ -216,6 +220,9 @@ public abstract class Flatten<R extends ConnectRecord<R>> 
implements Transformat
     }
 
     private void buildWithSchema(Struct record, String fieldNamePrefix, Struct 
newRecord) {
+        if (record == null) {
+            return;
+        }
         for (Field field : record.schema().fields()) {
             final String fieldName = fieldName(fieldNamePrefix, field.name());
             switch (field.schema().type()) {
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index d709054..430bba6 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -182,6 +182,46 @@ public class FlattenTest {
     }
 
     @Test
+    public void testOptionalStruct() {
+        xformValue.configure(Collections.<String, String>emptyMap());
+
+        SchemaBuilder builder = SchemaBuilder.struct().optional();
+        builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+        Schema schema = builder.build();
+
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
+            "topic", 0,
+            schema, null));
+
+        assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+        assertNull(transformed.value());
+    }
+
+    @Test
+    public void testOptionalNestedStruct() {
+        xformValue.configure(Collections.<String, String>emptyMap());
+
+        SchemaBuilder builder = SchemaBuilder.struct().optional();
+        builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
+        Schema supportedTypesSchema = builder.build();
+
+        builder = SchemaBuilder.struct();
+        builder.field("B", supportedTypesSchema);
+        Schema oneLevelNestedSchema = builder.build();
+
+        Struct oneLevelNestedStruct = new Struct(oneLevelNestedSchema);
+        oneLevelNestedStruct.put("B", null);
+
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
+            "topic", 0,
+            oneLevelNestedSchema, oneLevelNestedStruct));
+
+        assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
+        Struct transformedStruct = (Struct) transformed.value();
+        assertNull(transformedStruct.get("B.opt_int32"));
+    }
+
+    @Test
     public void testOptionalFieldMap() {
         xformValue.configure(Collections.<String, String>emptyMap());
 

Reply via email to