This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd44dc7  KAFKA-6684: Support casting Connect values with bytes schema 
to string
fd44dc7 is described below

commit fd44dc7fb210614349a873cdd82087ef5677f583
Author: Amit Sela <amitsel...@gmail.com>
AuthorDate: Sun Sep 30 22:24:09 2018 -0700

    KAFKA-6684: Support casting Connect values with bytes schema to string
    
    Allow to cast LogicalType to string by calling the serialized (Java) 
object's toString().
    
    Added tests for `BigDecimal` and `Date` as whole record and as fields.
    
    Author: Amit Sela <amitsel...@gmail.com>
    
    Reviewers: Randall Hauch <rha...@gmail.com>, Robert Yokota 
<rayok...@gmail.com>, Ewen Cheslack-Postava <e...@confluent.io>
    
    Closes #4820 from amitsela/cast-transform-bytes
---
 .../java/org/apache/kafka/connect/data/Values.java |  2 +-
 .../org/apache/kafka/connect/transforms/Cast.java  | 62 ++++++++++++++--------
 .../apache/kafka/connect/transforms/CastTest.java  | 42 ++++++++++++++-
 3 files changed, 83 insertions(+), 23 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index c944745..c2bd9f4 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -713,7 +713,7 @@ public class Values {
         return DOUBLEQOUTE.matcher(replace1).replaceAll("\\\\\"");
     }
 
-    protected static DateFormat dateFormatFor(java.util.Date value) {
+    public static DateFormat dateFormatFor(java.util.Date value) {
         if (value.getTime() < MILLIS_PER_DAY) {
             return new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN);
         }
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index a593c7b..07ccd37 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.transforms.util.SchemaUtil;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -78,9 +79,16 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
 
     private static final String PURPOSE = "cast types";
 
-    private static final Set<Schema.Type> SUPPORTED_CAST_TYPES = EnumSet.of(
+    private static final Set<Schema.Type> SUPPORTED_CAST_INPUT_TYPES = 
EnumSet.of(
             Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, 
Schema.Type.INT64,
-                    Schema.Type.FLOAT32, Schema.Type.FLOAT64, 
Schema.Type.BOOLEAN, Schema.Type.STRING
+                    Schema.Type.FLOAT32, Schema.Type.FLOAT64, 
Schema.Type.BOOLEAN,
+                            Schema.Type.STRING, Schema.Type.BYTES
+    );
+
+    private static final Set<Schema.Type> SUPPORTED_CAST_OUTPUT_TYPES = 
EnumSet.of(
+            Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, 
Schema.Type.INT64,
+                    Schema.Type.FLOAT32, Schema.Type.FLOAT64, 
Schema.Type.BOOLEAN,
+                            Schema.Type.STRING
     );
 
     // As a special case for casting the entire value (e.g. the incoming key 
is a int64 but you know it could be an
@@ -120,14 +128,14 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
 
     private R applySchemaless(R record) {
         if (wholeValueCastType != null) {
-            return newRecord(record, null, 
castValueToType(operatingValue(record), wholeValueCastType));
+            return newRecord(record, null, castValueToType(null, 
operatingValue(record), wholeValueCastType));
         }
 
         final Map<String, Object> value = requireMap(operatingValue(record), 
PURPOSE);
         final HashMap<String, Object> updatedValue = new HashMap<>(value);
         for (Map.Entry<String, Schema.Type> fieldSpec : casts.entrySet()) {
             String field = fieldSpec.getKey();
-            updatedValue.put(field, castValueToType(value.get(field), 
fieldSpec.getValue()));
+            updatedValue.put(field, castValueToType(null, value.get(field), 
fieldSpec.getValue()));
         }
         return newRecord(record, null, updatedValue);
     }
@@ -138,7 +146,7 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
 
         // Whole-record casting
         if (wholeValueCastType != null)
-            return newRecord(record, updatedSchema, 
castValueToType(operatingValue(record), wholeValueCastType));
+            return newRecord(record, updatedSchema, 
castValueToType(valueSchema, operatingValue(record), wholeValueCastType));
 
         // Casting within a struct
         final Struct value = requireStruct(operatingValue(record), PURPOSE);
@@ -147,7 +155,7 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
         for (Field field : value.schema().fields()) {
             final Object origFieldValue = value.get(field);
             final Schema.Type targetType = casts.get(field.name());
-            final Object newFieldValue = targetType != null ? 
castValueToType(origFieldValue, targetType) : origFieldValue;
+            final Object newFieldValue = targetType != null ? 
castValueToType(field.schema(), origFieldValue, targetType) : origFieldValue;
             updatedValue.put(updatedSchema.field(field.name()), newFieldValue);
         }
         return newRecord(record, updatedSchema, updatedValue);
@@ -168,8 +176,10 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
                     SchemaBuilder fieldBuilder = 
convertFieldType(casts.get(field.name()));
                     if (field.schema().isOptional())
                         fieldBuilder.optional();
-                    if (field.schema().defaultValue() != null)
-                        
fieldBuilder.defaultValue(castValueToType(field.schema().defaultValue(), 
fieldBuilder.type()));
+                    if (field.schema().defaultValue() != null) {
+                        Schema fieldSchema = field.schema();
+                        fieldBuilder.defaultValue(castValueToType(fieldSchema, 
fieldSchema.defaultValue(), fieldBuilder.type()));
+                    }
                     builder.field(field.name(), fieldBuilder.build());
                 } else {
                     builder.field(field.name(), field.schema());
@@ -180,7 +190,7 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
         if (valueSchema.isOptional())
             builder.optional();
         if (valueSchema.defaultValue() != null)
-            builder.defaultValue(castValueToType(valueSchema.defaultValue(), 
builder.type()));
+            builder.defaultValue(castValueToType(valueSchema, 
valueSchema.defaultValue(), builder.type()));
 
         updatedSchema = builder.build();
         schemaUpdateCache.put(valueSchema, updatedSchema);
@@ -211,11 +221,12 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
 
     }
 
-    private static Object castValueToType(Object value, Schema.Type 
targetType) {
+    private static Object castValueToType(Schema schema, Object value, 
Schema.Type targetType) {
         try {
             if (value == null) return null;
 
-            Schema.Type inferredType = 
ConnectSchema.schemaType(value.getClass());
+            Schema.Type inferredType = schema == null ? 
ConnectSchema.schemaType(value.getClass()) :
+                    schema.type();
             if (inferredType == null) {
                 throw new DataException("Cast transformation was passed a 
value of type " + value.getClass()
                         + " which is not supported by Connect's data API");
@@ -326,7 +337,12 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
     }
 
     private static String castToString(Object value) {
-        return value.toString();
+        if (value instanceof java.util.Date) {
+            java.util.Date dateValue = (java.util.Date) value;
+            return Values.dateFormatFor(dateValue).format(dateValue);
+        } else {
+            return value.toString();
+        }
     }
 
     protected abstract Schema operatingSchema(R record);
@@ -369,15 +385,19 @@ public abstract class Cast<R extends ConnectRecord<R>> 
implements Transformation
     }
 
     private static Schema.Type validCastType(Schema.Type type, FieldType 
fieldType) {
-        if (!SUPPORTED_CAST_TYPES.contains(type)) {
-            String message = "Cast transformation does not support casting 
to/from " + type
-                    + "; supported types are " + SUPPORTED_CAST_TYPES;
-            switch (fieldType) {
-                case INPUT:
-                    throw new DataException(message);
-                case OUTPUT:
-                    throw new ConfigException(message);
-            }
+        switch (fieldType) {
+            case INPUT:
+                if (!SUPPORTED_CAST_INPUT_TYPES.contains(type)) {
+                    throw new DataException("Cast transformation does not 
support casting from " +
+                        type + "; supported types are " + 
SUPPORTED_CAST_INPUT_TYPES);
+                }
+                break;
+            case OUTPUT:
+                if (!SUPPORTED_CAST_OUTPUT_TYPES.contains(type)) {
+                    throw new ConfigException("Cast transformation does not 
support casting to " +
+                        type + "; supported types are " + 
SUPPORTED_CAST_OUTPUT_TYPES);
+                }
+                break;
         }
         return type;
     }
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index 06fbe31..c568afb 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -18,15 +18,18 @@
 package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.After;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -39,6 +42,7 @@ import static org.junit.Assert.assertTrue;
 public class CastTest {
     private final Cast<SourceRecord> xformKey = new Cast.Key<>();
     private final Cast<SourceRecord> xformValue = new Cast.Value<>();
+    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
 
     @After
     public void teardown() {
@@ -62,6 +66,11 @@ public class CastTest {
     }
 
     @Test(expected = ConfigException.class)
+    public void testUnsupportedTargetType() {
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:bytes"));
+    }
+
+    @Test(expected = ConfigException.class)
     public void testConfigInvalidMap() {
         xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int8:extra"));
     }
@@ -172,6 +181,28 @@ public class CastTest {
     }
 
     @Test
+    public void castWholeBigDecimalRecordValueWithSchemaString() {
+        BigDecimal bigDecimal = new BigDecimal(42);
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"string"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
+                Decimal.schema(bigDecimal.scale()), bigDecimal));
+
+        assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
+        assertEquals("42", transformed.value());
+    }
+
+    @Test
+    public void castWholeDateRecordValueWithSchemaString() {
+        Date timestamp = new Date(MILLIS_PER_DAY + 1); // day + 1msec to get a 
timestamp formatting.
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"string"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
+                Timestamp.SCHEMA, timestamp));
+
+        assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
+        assertEquals(Values.dateFormatFor(timestamp).format(timestamp), 
transformed.value());
+    }
+
+    @Test
     public void castWholeRecordDefaultValue() {
         // Validate default value in schema is correctly converted
         xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int32"));
@@ -292,7 +323,8 @@ public class CastTest {
 
     @Test
     public void castFieldsWithSchema() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
+        Date day = new Date(MILLIS_PER_DAY);
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,bigdecimal:string,date:string,optional:int32"));
 
         // Include an optional fields and fields with defaults to validate 
their values are passed through properly
         SchemaBuilder builder = SchemaBuilder.struct();
@@ -305,6 +337,8 @@ public class CastTest {
         builder.field("float64", 
SchemaBuilder.float64().defaultValue(-1.125).build());
         builder.field("boolean", Schema.BOOLEAN_SCHEMA);
         builder.field("string", Schema.STRING_SCHEMA);
+        builder.field("bigdecimal", Decimal.schema(new 
BigDecimal(42).scale()));
+        builder.field("date", Timestamp.SCHEMA);
         builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA);
         builder.field("timestamp", Timestamp.SCHEMA);
         Schema supportedTypesSchema = builder.build();
@@ -317,6 +351,8 @@ public class CastTest {
         recordValue.put("float32", 32.f);
         recordValue.put("float64", -64.);
         recordValue.put("boolean", true);
+        recordValue.put("bigdecimal", new BigDecimal(42));
+        recordValue.put("date", day);
         recordValue.put("string", "42");
         recordValue.put("timestamp", new Date(0));
         // optional field intentionally omitted
@@ -335,6 +371,8 @@ public class CastTest {
         assertEquals(true, ((Struct) 
transformed.value()).schema().field("float64").schema().defaultValue());
         assertEquals((byte) 1, ((Struct) transformed.value()).get("boolean"));
         assertEquals(42, ((Struct) transformed.value()).get("string"));
+        assertEquals("42", ((Struct) transformed.value()).get("bigdecimal"));
+        assertEquals(Values.dateFormatFor(day).format(day), ((Struct) 
transformed.value()).get("date"));
         assertEquals(new Date(0), ((Struct) 
transformed.value()).get("timestamp"));
         assertNull(((Struct) transformed.value()).get("optional"));
 
@@ -347,6 +385,8 @@ public class CastTest {
         assertEquals(Schema.BOOLEAN_SCHEMA.type(), 
transformedSchema.field("float64").schema().type());
         assertEquals(Schema.INT8_SCHEMA.type(), 
transformedSchema.field("boolean").schema().type());
         assertEquals(Schema.INT32_SCHEMA.type(), 
transformedSchema.field("string").schema().type());
+        assertEquals(Schema.STRING_SCHEMA.type(), 
transformedSchema.field("bigdecimal").schema().type());
+        assertEquals(Schema.STRING_SCHEMA.type(), 
transformedSchema.field("date").schema().type());
         assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), 
transformedSchema.field("optional").schema().type());
         // The following fields are not changed
         assertEquals(Timestamp.SCHEMA.type(), 
transformedSchema.field("timestamp").schema().type());

Reply via email to