This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new de8663a  KAFKA-7157: Fix handling of nulls in TimestampConverter 
(#7070)
de8663a is described below

commit de8663a0835d6d60a5d601626a9c301530acfed3
Author: Robert Yokota <rayok...@gmail.com>
AuthorDate: Fri Jul 12 10:12:20 2019 -0700

    KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)
    
    Fix handling of nulls in TimestampConverter.
    
    Authors: Valeria Vasylieva <valeria.vasyli...@gmail.com>, Robert Yokota 
<rayok...@gmail.com>
    Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch 
<rha...@gmail.com>
---
 .../connect/transforms/TimestampConverter.java     |  52 +++--
 .../connect/transforms/TimestampConverterTest.java | 233 +++++++++++++++++++--
 2 files changed, 243 insertions(+), 42 deletions(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 8557441..f32253e 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -47,7 +47,7 @@ import java.util.Set;
 import java.util.TimeZone;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
-import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
 
 public abstract class TimestampConverter<R extends ConnectRecord<R>> 
implements Transformation<R> {
 
@@ -85,6 +85,10 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
 
     private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
+    public static final Schema OPTIONAL_DATE_SCHEMA = 
org.apache.kafka.connect.data.Date.builder().optional().schema();
+    public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = 
Timestamp.builder().optional().schema();
+    public static final Schema OPTIONAL_TIME_SCHEMA = 
Time.builder().optional().schema();
+
     private interface TimestampTranslator {
         /**
          * Convert from the type-specific format to the universal 
java.util.Date format
@@ -94,7 +98,7 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
         /**
          * Get the schema for this format.
          */
-        Schema typeSchema();
+        Schema typeSchema(boolean isOptional);
 
         /**
          * Convert from the universal java.util.Date format to the 
type-specific format
@@ -118,8 +122,8 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
             }
 
             @Override
-            public Schema typeSchema() {
-                return Schema.STRING_SCHEMA;
+            public Schema typeSchema(boolean isOptional) {
+                return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : 
Schema.STRING_SCHEMA;
             }
 
             @Override
@@ -139,8 +143,8 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
             }
 
             @Override
-            public Schema typeSchema() {
-                return Schema.INT64_SCHEMA;
+            public Schema typeSchema(boolean isOptional) {
+                return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : 
Schema.INT64_SCHEMA;
             }
 
             @Override
@@ -159,8 +163,8 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
             }
 
             @Override
-            public Schema typeSchema() {
-                return org.apache.kafka.connect.data.Date.SCHEMA;
+            public Schema typeSchema(boolean isOptional) {
+                return isOptional ? OPTIONAL_DATE_SCHEMA : 
org.apache.kafka.connect.data.Date.SCHEMA;
             }
 
             @Override
@@ -185,8 +189,8 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
             }
 
             @Override
-            public Schema typeSchema() {
-                return Time.SCHEMA;
+            public Schema typeSchema(boolean isOptional) {
+                return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
             }
 
             @Override
@@ -212,8 +216,8 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
             }
 
             @Override
-            public Schema typeSchema() {
-                return Timestamp.SCHEMA;
+            public Schema typeSchema(boolean isOptional) {
+                return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : 
Timestamp.SCHEMA;
             }
 
             @Override
@@ -330,16 +334,16 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
         if (config.field.isEmpty()) {
             Object value = operatingValue(record);
             // New schema is determined by the requested target timestamp type
-            Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
+            Schema updatedSchema = 
TRANSLATORS.get(config.type).typeSchema(schema.isOptional());
             return newRecord(record, updatedSchema, convertTimestamp(value, 
timestampTypeFromSchema(schema)));
         } else {
-            final Struct value = requireStruct(operatingValue(record), 
PURPOSE);
-            Schema updatedSchema = schemaUpdateCache.get(value.schema());
+            final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
+            Schema updatedSchema = schemaUpdateCache.get(schema);
             if (updatedSchema == null) {
                 SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
                 for (Field field : schema.fields()) {
                     if (field.name().equals(config.field)) {
-                        builder.field(field.name(), 
TRANSLATORS.get(config.type).typeSchema());
+                        builder.field(field.name(), 
TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional()));
                     } else {
                         builder.field(field.name(), field.schema());
                     }
@@ -361,6 +365,9 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
     }
 
     private Struct applyValueWithSchema(Struct value, Schema updatedSchema) {
+        if (value == null) {
+            return null;
+        }
         Struct updatedValue = new Struct(updatedSchema);
         for (Field field : value.schema().fields()) {
             final Object updatedFieldValue;
@@ -375,11 +382,11 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
     }
 
     private R applySchemaless(R record) {
-        if (config.field.isEmpty()) {
-            Object value = operatingValue(record);
-            return newRecord(record, null, convertTimestamp(value));
+        Object rawValue = operatingValue(record);
+        if (rawValue == null || config.field.isEmpty()) {
+            return newRecord(record, null, convertTimestamp(rawValue));
         } else {
-            final Map<String, Object> value = 
requireMap(operatingValue(record), PURPOSE);
+            final Map<String, Object> value = requireMap(rawValue, PURPOSE);
             final HashMap<String, Object> updatedValue = new HashMap<>(value);
             updatedValue.put(config.field, 
convertTimestamp(value.get(config.field)));
             return newRecord(record, null, updatedValue);
@@ -424,11 +431,14 @@ public abstract class TimestampConverter<R extends 
ConnectRecord<R>> implements
 
     /**
      * Convert the given timestamp to the target timestamp format.
-     * @param timestamp the input timestamp
+     * @param timestamp the input timestamp, may be null
      * @param timestampFormat the format of the timestamp, or null if the 
format should be inferred
      * @return the converted timestamp
      */
     private Object convertTimestamp(Object timestamp, String timestampFormat) {
+        if (timestamp == null) {
+            return null;
+        }
         if (timestampFormat == null) {
             timestampFormat = inferTimestampType(timestamp);
         }
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
index 475066f..3a1920e 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -105,13 +106,12 @@ public class TimestampConverterTest {
         xformValue.configure(config);
     }
 
-
     // Conversions without schemas (most flexible Timestamp -> other types)
 
     @Test
     public void testSchemalessIdentity() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -120,7 +120,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessTimestampToDate() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Date"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE.getTime(), transformed.value());
@@ -129,7 +129,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessTimestampToTime() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Time"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(TIME.getTime(), transformed.value());
@@ -138,7 +138,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessTimestampToUnix() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "unix"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@@ -150,7 +150,7 @@ public class TimestampConverterTest {
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
         xformValue.configure(config);
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
@@ -162,7 +162,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessDateToTimestamp() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE.getTime()));
 
         assertNull(transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -172,7 +172,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessTimeToTimestamp() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -182,7 +182,7 @@ public class TimestampConverterTest {
     @Test
     public void testSchemalessUnixToTimestamp() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_UNIX));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -194,7 +194,7 @@ public class TimestampConverterTest {
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
         xformValue.configure(config);
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME_STRING));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -206,7 +206,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaIdentity() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -215,7 +215,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaTimestampToDate() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Date"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Date.SCHEMA, transformed.valueSchema());
         assertEquals(DATE.getTime(), transformed.value());
@@ -224,7 +224,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaTimestampToTime() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Time"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Time.SCHEMA, transformed.valueSchema());
         assertEquals(TIME.getTime(), transformed.value());
@@ -233,7 +233,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaTimestampToUnix() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "unix"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@@ -245,19 +245,70 @@ public class TimestampConverterTest {
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
         xformValue.configure(config);
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, 
DATE_PLUS_TIME.getTime()));
 
         assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
     }
 
+    // Null-value conversions schemaless
+
+    @Test
+    public void testSchemalessNullValueToString() {
+        testSchemalessNullValueConversion("string");
+        testSchemalessNullFieldConversion("string");
+    }
+    @Test
+    public void testSchemalessNullValueToDate() {
+        testSchemalessNullValueConversion("Date");
+        testSchemalessNullFieldConversion("Date");
+    }
+    @Test
+    public void testSchemalessNullValueToTimestamp() {
+        testSchemalessNullValueConversion("Timestamp");
+        testSchemalessNullFieldConversion("Timestamp");
+    }
+    @Test
+    public void testSchemalessNullValueToUnix() {
+        testSchemalessNullValueConversion("unix");
+        testSchemalessNullFieldConversion("unix");
+    }
+
+    @Test
+    public void testSchemalessNullValueToTime() {
+        testSchemalessNullValueConversion("Time");
+        testSchemalessNullFieldConversion("Time");
+    }
+
+    private void testSchemalessNullValueConversion(String targetType) {
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
+        config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
+        xformValue.configure(config);
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(null));
+
+        assertNull(transformed.valueSchema());
+        assertNull(transformed.value());
+    }
+
+    private void testSchemalessNullFieldConversion(String targetType) {
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
+        config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
+        config.put(TimestampConverter.FIELD_CONFIG, "ts");
+        xformValue.configure(config);
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(null));
+
+        assertNull(transformed.valueSchema());
+        assertNull(transformed.value());
+    }
 
     // Conversions with schemas (core types -> most flexible Timestamp format)
 
     @Test
     public void testWithSchemaDateToTimestamp() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Date.SCHEMA, DATE.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Date.SCHEMA, DATE.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -267,7 +318,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaTimeToTimestamp() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Time.SCHEMA, TIME.getTime()));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Time.SCHEMA, TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -277,7 +328,7 @@ public class TimestampConverterTest {
     @Test
     public void testWithSchemaUnixToTimestamp() {
         
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Schema.INT64_SCHEMA, 
DATE_PLUS_TIME_UNIX));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -289,12 +340,145 @@ public class TimestampConverterTest {
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
         xformValue.configure(config);
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(Schema.STRING_SCHEMA, 
DATE_PLUS_TIME_STRING));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
     }
 
+    // Null-value conversions with schema
+
+    @Test
+    public void testWithSchemaNullValueToTimestamp() {
+        testWithSchemaNullValueConversion("Timestamp", 
Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+        testWithSchemaNullValueConversion("Timestamp", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+        testWithSchemaNullValueConversion("Timestamp", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+        testWithSchemaNullValueConversion("Timestamp", 
Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+        testWithSchemaNullValueConversion("Timestamp", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullFieldToTimestamp() {
+        testWithSchemaNullFieldConversion("Timestamp", 
Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+        testWithSchemaNullFieldConversion("Timestamp", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+        testWithSchemaNullFieldConversion("Timestamp", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+        testWithSchemaNullFieldConversion("Timestamp", 
Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+        testWithSchemaNullFieldConversion("Timestamp", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullValueToUnix() {
+        testWithSchemaNullValueConversion("unix", 
Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+        testWithSchemaNullValueConversion("unix", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+        testWithSchemaNullValueConversion("unix", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+        testWithSchemaNullValueConversion("unix", 
Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+        testWithSchemaNullValueConversion("unix", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullFieldToUnix() {
+        testWithSchemaNullFieldConversion("unix", 
Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+        testWithSchemaNullFieldConversion("unix", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+        testWithSchemaNullFieldConversion("unix", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+        testWithSchemaNullFieldConversion("unix", 
Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+        testWithSchemaNullFieldConversion("unix", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullValueToTime() {
+        testWithSchemaNullValueConversion("Time", 
Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
+        testWithSchemaNullValueConversion("Time", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, 
TimestampConverter.OPTIONAL_TIME_SCHEMA);
+        testWithSchemaNullValueConversion("Time", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, 
TimestampConverter.OPTIONAL_TIME_SCHEMA);
+        testWithSchemaNullValueConversion("Time", 
Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
+        testWithSchemaNullValueConversion("Time", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, 
TimestampConverter.OPTIONAL_TIME_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullFieldToTime() {
+        testWithSchemaNullFieldConversion("Time", 
Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
+        testWithSchemaNullFieldConversion("Time", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, 
TimestampConverter.OPTIONAL_TIME_SCHEMA);
+        testWithSchemaNullFieldConversion("Time", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, 
TimestampConverter.OPTIONAL_TIME_SCHEMA);
+        testWithSchemaNullFieldConversion("Time", 
Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
+        testWithSchemaNullFieldConversion("Time", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, 
TimestampConverter.OPTIONAL_TIME_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullValueToDate() {
+        testWithSchemaNullValueConversion("Date", 
Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
+        testWithSchemaNullValueConversion("Date", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, 
TimestampConverter.OPTIONAL_DATE_SCHEMA);
+        testWithSchemaNullValueConversion("Date", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, 
TimestampConverter.OPTIONAL_DATE_SCHEMA);
+        testWithSchemaNullValueConversion("Date", 
Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
+        testWithSchemaNullValueConversion("Date", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, 
TimestampConverter.OPTIONAL_DATE_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullFieldToDate() {
+        testWithSchemaNullFieldConversion("Date", 
Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
+        testWithSchemaNullFieldConversion("Date", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, 
TimestampConverter.OPTIONAL_DATE_SCHEMA);
+        testWithSchemaNullFieldConversion("Date", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, 
TimestampConverter.OPTIONAL_DATE_SCHEMA);
+        testWithSchemaNullFieldConversion("Date", 
Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
+        testWithSchemaNullFieldConversion("Date", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, 
TimestampConverter.OPTIONAL_DATE_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullValueToString() {
+        testWithSchemaNullValueConversion("string", 
Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        testWithSchemaNullValueConversion("string", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        testWithSchemaNullValueConversion("string", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        testWithSchemaNullValueConversion("string", 
Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        testWithSchemaNullValueConversion("string", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+    }
+
+    @Test
+    public void testWithSchemaNullFieldToString() {
+        testWithSchemaNullFieldConversion("string", 
Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        testWithSchemaNullFieldConversion("string", 
TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        testWithSchemaNullFieldConversion("string", 
TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        testWithSchemaNullFieldConversion("string", 
Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+        testWithSchemaNullFieldConversion("string", 
TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
+    }
+
+    private void testWithSchemaNullValueConversion(String targetType, Schema 
originalSchema, Schema expectedSchema) {
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
+        config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
+        xformValue.configure(config);
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(originalSchema, null));
+
+        assertEquals(expectedSchema, transformed.valueSchema());
+        assertNull(transformed.value());
+    }
+
+    private void testWithSchemaNullFieldConversion(String targetType, Schema 
originalSchema, Schema expectedSchema) {
+        Map<String, String> config = new HashMap<>();
+        config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
+        config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
+        config.put(TimestampConverter.FIELD_CONFIG, "ts");
+        xformValue.configure(config);
+        SchemaBuilder structSchema = SchemaBuilder.struct()
+                .field("ts", originalSchema)
+                .field("other", Schema.STRING_SCHEMA);
+
+        SchemaBuilder expectedStructSchema = SchemaBuilder.struct()
+                .field("ts", expectedSchema)
+                .field("other", Schema.STRING_SCHEMA);
+
+        Struct original = new Struct(structSchema);
+        original.put("ts", null);
+        original.put("other", "test");
+
+        // Struct field is null
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(structSchema.build(), original));
+
+        assertEquals(expectedStructSchema.build(), transformed.valueSchema());
+        assertNull(requireStruct(transformed.value(), "").get("ts"));
+
+        // entire Struct is null
+        transformed = 
xformValue.apply(createRecordWithSchema(structSchema.optional().build(), null));
+
+        assertEquals(expectedStructSchema.optional().build(), 
transformed.valueSchema());
+        assertNull(transformed.value());
+    }
 
     // Convert field instead of entire key/value
 
@@ -306,7 +490,7 @@ public class TimestampConverterTest {
         xformValue.configure(config);
 
         Object value = Collections.singletonMap("ts", 
DATE_PLUS_TIME.getTime());
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, value));
+        SourceRecord transformed = 
xformValue.apply(createRecordSchemaless(value));
 
         assertNull(transformed.valueSchema());
         assertEquals(Collections.singletonMap("ts", DATE.getTime()), 
transformed.value());
@@ -328,7 +512,7 @@ public class TimestampConverterTest {
         original.put("ts", DATE_PLUS_TIME_UNIX);
         original.put("other", "test");
 
-        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, structWithTimestampFieldSchema, original));
+        SourceRecord transformed = 
xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, 
original));
 
         Schema expectedSchema = SchemaBuilder.struct()
                 .field("ts", Timestamp.SCHEMA)
@@ -351,4 +535,11 @@ public class TimestampConverterTest {
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());
     }
 
+    private SourceRecord createRecordWithSchema(Schema schema, Object value) {
+        return new SourceRecord(null, null, "topic", 0, schema, value);
+    }
+
+    private SourceRecord createRecordSchemaless(Object value) {
+        return createRecordWithSchema(null, value);
+    }
 }

Reply via email to