[jira] [Commented] (KAFKA-5891) Cast transformation fails if record schema contains timestamp field
[ https://issues.apache.org/jira/browse/KAFKA-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167493#comment-16167493 ] Artem Plotnikov commented on KAFKA-5891: Seems like Kafka Connect's Cast transformation loses schema information (basically, schema name) while doing type casting. I was able to reproduce this problem with the following test in org.apache.kafka.connect.transforms.CastTest for current trunk repository branch: {code} @SuppressWarnings("unchecked") @Test public void castWholeRecordValueWithSchemaBooleanAndTimestampField() { final Cast xform = new Cast.Value<>(); xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64:boolean")); SchemaBuilder builder = SchemaBuilder.struct(); builder.field("int64", Schema.INT64_SCHEMA); builder.field("timestamp", Timestamp.SCHEMA); Schema supportedTypesSchema = builder.build(); Struct recordValue = new Struct(supportedTypesSchema); recordValue.put("int64", (long) 64); recordValue.put("timestamp", new java.sql.Timestamp(0L)); SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, supportedTypesSchema, recordValue)); assertEquals(true, ((Struct) transformed.value()).get("int64")); assertEquals(new java.sql.Timestamp(0L), ((Struct) transformed.value()).get("timestamp")); } {code} The problem is that Timestamp.SCHEMA has schema.type = 'INT64' and schema.name = "org.apache.kafka.connect.data.Timestamp", but org.apache.kafka.connect.transforms.Cast#getOrBuildSchema method copies schema.type only: {code} SchemaBuilder fieldBuilder = convertFieldType(casts.containsKey(field.name()) ? casts.get(field.name()) : field.schema().type()); {code} > Cast transformation fails if record schema contains timestamp field > --- > > Key: KAFKA-5891 > URL: https://issues.apache.org/jira/browse/KAFKA-5891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Artem Plotnikov > > I have the following simple type cast transformation: > {code} > name=postgresql-source-simple > connector.class=io.confluent.connect.jdbc.JdbcSourceConnector > tasks.max=1 > connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres=mysecretpassword > query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b > transforms=Cast > transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value > transforms.Cast.spec=a:boolean > mode=bulk > topic.prefix=clients > {code} > Which fails with the following exception in runtime: > {code} > [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:148) > org.apache.kafka.connect.errors.DataException: Invalid Java object for schema > type INT64: class java.sql.Timestamp for field: "null" > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) > at org.apache.kafka.connect.data.Struct.put(Struct.java:214) > at > org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) > at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > If I remove the transforms.* part of the connector it will work correctly. > Actually, it doesn't really matter which types I use in the transformation > for field 'a', just the existence of a timestamp field brings the exception. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5891) Cast transformation fails if record schema contains timestamp field
[ https://issues.apache.org/jira/browse/KAFKA-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artem Plotnikov updated KAFKA-5891: --- Description: I have the following simple type cast transformation: {code} name=postgresql-source-simple connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres=mysecretpassword query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b transforms=Cast transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value transforms.Cast.spec=a:boolean mode=bulk topic.prefix=clients {code} Which fails with the following exception in runtime: {code} [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.sql.Timestamp for field: "null" at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) at org.apache.kafka.connect.data.Struct.put(Struct.java:214) at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} If I remove the transforms.* part of the connector it will work correctly. Actually, it doesn't really matter which types I use in the transformation for field 'a', just the existence of a timestamp field brings the exception. was: I have the following simple type cast transformation: ``` name=postgresql-source-simple connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres=mysecretpassword query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b transforms=Cast transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value transforms.Cast.spec=a:boolean mode=bulk topic.prefix=clients ``` Which fails with the following exception in runtime: ``` [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.sql.Timestamp for field: "null" at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) at org.apache.kafka.connect.data.Struct.put(Struct.java:214) at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` If I remove the transforms.* part of the connector it will work correctly. Actually, it doesn't really matter which types I use in the transformation for field 'a', just the existence of a timestamp field brings the exception. > Cast transformation fails if record schema contains timestamp field > --- > >
[jira] [Created] (KAFKA-5891) Cast transformation fails if record schema contains timestamp field
Artem Plotnikov created KAFKA-5891: -- Summary: Cast transformation fails if record schema contains timestamp field Key: KAFKA-5891 URL: https://issues.apache.org/jira/browse/KAFKA-5891 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.11.0.0 Reporter: Artem Plotnikov I have the following simple type cast transformation: ``` name=postgresql-source-simple connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres=mysecretpassword query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b transforms=Cast transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value transforms.Cast.spec=a:boolean mode=bulk topic.prefix=clients ``` Which fails with the following exception in runtime: ``` [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.sql.Timestamp for field: "null" at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) at org.apache.kafka.connect.data.Struct.put(Struct.java:214) at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` If I remove the transforms.* part of the connector it will work correctly. Actually, it doesn't really matter which types I use in the transformation for field 'a', just the existence of a timestamp field brings the exception. -- This message was sent by Atlassian JIRA (v6.4.14#64029)