This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fd44dc7 KAFKA-6684: Support casting Connect values with bytes schema to string fd44dc7 is described below commit fd44dc7fb210614349a873cdd82087ef5677f583 Author: Amit Sela <amitsel...@gmail.com> AuthorDate: Sun Sep 30 22:24:09 2018 -0700 KAFKA-6684: Support casting Connect values with bytes schema to string Allow to cast LogicalType to string by calling the serialized (Java) object's toString(). Added tests for `BigDecimal` and `Date` as whole record and as fields. Author: Amit Sela <amitsel...@gmail.com> Reviewers: Randall Hauch <rha...@gmail.com>, Robert Yokota <rayok...@gmail.com>, Ewen Cheslack-Postava <e...@confluent.io> Closes #4820 from amitsela/cast-transform-bytes --- .../java/org/apache/kafka/connect/data/Values.java | 2 +- .../org/apache/kafka/connect/transforms/Cast.java | 62 ++++++++++++++-------- .../apache/kafka/connect/transforms/CastTest.java | 42 ++++++++++++++- 3 files changed, 83 insertions(+), 23 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index c944745..c2bd9f4 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -713,7 +713,7 @@ public class Values { return DOUBLEQOUTE.matcher(replace1).replaceAll("\\\\\""); } - protected static DateFormat dateFormatFor(java.util.Date value) { + public static DateFormat dateFormatFor(java.util.Date value) { if (value.getTime() < MILLIS_PER_DAY) { return new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN); } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java index a593c7b..07ccd37 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.util.SchemaUtil; import org.apache.kafka.connect.transforms.util.SimpleConfig; @@ -78,9 +79,16 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation private static final String PURPOSE = "cast types"; - private static final Set<Schema.Type> SUPPORTED_CAST_TYPES = EnumSet.of( + private static final Set<Schema.Type> SUPPORTED_CAST_INPUT_TYPES = EnumSet.of( Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, Schema.Type.INT64, - Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN, Schema.Type.STRING + Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN, + Schema.Type.STRING, Schema.Type.BYTES + ); + + private static final Set<Schema.Type> SUPPORTED_CAST_OUTPUT_TYPES = EnumSet.of( + Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, Schema.Type.INT64, + Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN, + Schema.Type.STRING ); // As a special case for casting the entire value (e.g. the incoming key is a int64 but you know it could be an @@ -120,14 +128,14 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation private R applySchemaless(R record) { if (wholeValueCastType != null) { - return newRecord(record, null, castValueToType(operatingValue(record), wholeValueCastType)); + return newRecord(record, null, castValueToType(null, operatingValue(record), wholeValueCastType)); } final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE); final HashMap<String, Object> updatedValue = new HashMap<>(value); for (Map.Entry<String, Schema.Type> fieldSpec : casts.entrySet()) { String field = fieldSpec.getKey(); - updatedValue.put(field, castValueToType(value.get(field), fieldSpec.getValue())); + updatedValue.put(field, castValueToType(null, value.get(field), fieldSpec.getValue())); } return newRecord(record, null, updatedValue); } @@ -138,7 +146,7 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation // Whole-record casting if (wholeValueCastType != null) - return newRecord(record, updatedSchema, castValueToType(operatingValue(record), wholeValueCastType)); + return newRecord(record, updatedSchema, castValueToType(valueSchema, operatingValue(record), wholeValueCastType)); // Casting within a struct final Struct value = requireStruct(operatingValue(record), PURPOSE); @@ -147,7 +155,7 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation for (Field field : value.schema().fields()) { final Object origFieldValue = value.get(field); final Schema.Type targetType = casts.get(field.name()); - final Object newFieldValue = targetType != null ? castValueToType(origFieldValue, targetType) : origFieldValue; + final Object newFieldValue = targetType != null ? castValueToType(field.schema(), origFieldValue, targetType) : origFieldValue; updatedValue.put(updatedSchema.field(field.name()), newFieldValue); } return newRecord(record, updatedSchema, updatedValue); @@ -168,8 +176,10 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation SchemaBuilder fieldBuilder = convertFieldType(casts.get(field.name())); if (field.schema().isOptional()) fieldBuilder.optional(); - if (field.schema().defaultValue() != null) - fieldBuilder.defaultValue(castValueToType(field.schema().defaultValue(), fieldBuilder.type())); + if (field.schema().defaultValue() != null) { + Schema fieldSchema = field.schema(); + fieldBuilder.defaultValue(castValueToType(fieldSchema, fieldSchema.defaultValue(), fieldBuilder.type())); + } builder.field(field.name(), fieldBuilder.build()); } else { builder.field(field.name(), field.schema()); @@ -180,7 +190,7 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation if (valueSchema.isOptional()) builder.optional(); if (valueSchema.defaultValue() != null) - builder.defaultValue(castValueToType(valueSchema.defaultValue(), builder.type())); + builder.defaultValue(castValueToType(valueSchema, valueSchema.defaultValue(), builder.type())); updatedSchema = builder.build(); schemaUpdateCache.put(valueSchema, updatedSchema); @@ -211,11 +221,12 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation } - private static Object castValueToType(Object value, Schema.Type targetType) { + private static Object castValueToType(Schema schema, Object value, Schema.Type targetType) { try { if (value == null) return null; - Schema.Type inferredType = ConnectSchema.schemaType(value.getClass()); + Schema.Type inferredType = schema == null ? ConnectSchema.schemaType(value.getClass()) : + schema.type(); if (inferredType == null) { throw new DataException("Cast transformation was passed a value of type " + value.getClass() + " which is not supported by Connect's data API"); @@ -326,7 +337,12 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation } private static String castToString(Object value) { - return value.toString(); + if (value instanceof java.util.Date) { + java.util.Date dateValue = (java.util.Date) value; + return Values.dateFormatFor(dateValue).format(dateValue); + } else { + return value.toString(); + } } protected abstract Schema operatingSchema(R record); @@ -369,15 +385,19 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation } private static Schema.Type validCastType(Schema.Type type, FieldType fieldType) { - if (!SUPPORTED_CAST_TYPES.contains(type)) { - String message = "Cast transformation does not support casting to/from " + type - + "; supported types are " + SUPPORTED_CAST_TYPES; - switch (fieldType) { - case INPUT: - throw new DataException(message); - case OUTPUT: - throw new ConfigException(message); - } + switch (fieldType) { + case INPUT: + if (!SUPPORTED_CAST_INPUT_TYPES.contains(type)) { + throw new DataException("Cast transformation does not support casting from " + + type + "; supported types are " + SUPPORTED_CAST_INPUT_TYPES); + } + break; + case OUTPUT: + if (!SUPPORTED_CAST_OUTPUT_TYPES.contains(type)) { + throw new ConfigException("Cast transformation does not support casting to " + + type + "; supported types are " + SUPPORTED_CAST_OUTPUT_TYPES); + } + break; } return type; } diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java index 06fbe31..c568afb 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java @@ -18,15 +18,18 @@ package org.apache.kafka.connect.transforms; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Test; +import java.math.BigDecimal; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -39,6 +42,7 @@ import static org.junit.Assert.assertTrue; public class CastTest { private final Cast<SourceRecord> xformKey = new Cast.Key<>(); private final Cast<SourceRecord> xformValue = new Cast.Value<>(); + private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; @After public void teardown() { @@ -62,6 +66,11 @@ public class CastTest { } @Test(expected = ConfigException.class) + public void testUnsupportedTargetType() { + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:bytes")); + } + + @Test(expected = ConfigException.class) public void testConfigInvalidMap() { xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra")); } @@ -172,6 +181,28 @@ public class CastTest { } @Test + public void castWholeBigDecimalRecordValueWithSchemaString() { + BigDecimal bigDecimal = new BigDecimal(42); + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, + Decimal.schema(bigDecimal.scale()), bigDecimal)); + + assertEquals(Schema.Type.STRING, transformed.valueSchema().type()); + assertEquals("42", transformed.value()); + } + + @Test + public void castWholeDateRecordValueWithSchemaString() { + Date timestamp = new Date(MILLIS_PER_DAY + 1); // day + 1msec to get a timestamp formatting. + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, + Timestamp.SCHEMA, timestamp)); + + assertEquals(Schema.Type.STRING, transformed.valueSchema().type()); + assertEquals(Values.dateFormatFor(timestamp).format(timestamp), transformed.value()); + } + + @Test public void castWholeRecordDefaultValue() { // Validate default value in schema is correctly converted xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32")); @@ -292,7 +323,8 @@ public class CastTest { @Test public void castFieldsWithSchema() { - xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32")); + Date day = new Date(MILLIS_PER_DAY); + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,bigdecimal:string,date:string,optional:int32")); // Include an optional fields and fields with defaults to validate their values are passed through properly SchemaBuilder builder = SchemaBuilder.struct(); @@ -305,6 +337,8 @@ public class CastTest { builder.field("float64", SchemaBuilder.float64().defaultValue(-1.125).build()); builder.field("boolean", Schema.BOOLEAN_SCHEMA); builder.field("string", Schema.STRING_SCHEMA); + builder.field("bigdecimal", Decimal.schema(new BigDecimal(42).scale())); + builder.field("date", Timestamp.SCHEMA); builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA); builder.field("timestamp", Timestamp.SCHEMA); Schema supportedTypesSchema = builder.build(); @@ -317,6 +351,8 @@ public class CastTest { recordValue.put("float32", 32.f); recordValue.put("float64", -64.); recordValue.put("boolean", true); + recordValue.put("bigdecimal", new BigDecimal(42)); + recordValue.put("date", day); recordValue.put("string", "42"); recordValue.put("timestamp", new Date(0)); // optional field intentionally omitted @@ -335,6 +371,8 @@ public class CastTest { assertEquals(true, ((Struct) transformed.value()).schema().field("float64").schema().defaultValue()); assertEquals((byte) 1, ((Struct) transformed.value()).get("boolean")); assertEquals(42, ((Struct) transformed.value()).get("string")); + assertEquals("42", ((Struct) transformed.value()).get("bigdecimal")); + assertEquals(Values.dateFormatFor(day).format(day), ((Struct) transformed.value()).get("date")); assertEquals(new Date(0), ((Struct) transformed.value()).get("timestamp")); assertNull(((Struct) transformed.value()).get("optional")); @@ -347,6 +385,8 @@ public class CastTest { assertEquals(Schema.BOOLEAN_SCHEMA.type(), transformedSchema.field("float64").schema().type()); assertEquals(Schema.INT8_SCHEMA.type(), transformedSchema.field("boolean").schema().type()); assertEquals(Schema.INT32_SCHEMA.type(), transformedSchema.field("string").schema().type()); + assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("bigdecimal").schema().type()); + assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("date").schema().type()); assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), transformedSchema.field("optional").schema().type()); // The following fields are not changed assertEquals(Timestamp.SCHEMA.type(), transformedSchema.field("timestamp").schema().type());