[jira] [Updated] (KAFKA-10627) Connect TimestampConverter transform does not support multiple formats for the same field and only allows one field to be transformed at a time

2021-07-28 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10627:
--
Issue Type: New Feature  (was: Improvement)

> Connect TimestampConverter transform does not support multiple formats for 
> the same field and only allows one field to be transformed at a time
> ---
>
> Key: KAFKA-10627
> URL: https://issues.apache.org/jira/browse/KAFKA-10627
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Joshua Grisham
>Priority: Minor
>  Labels: connect-transformation, need-kip
>
> Some of the limitations of the *TimestampConverter* transform are causing 
> issues for us since we have a lot of different producers from different 
> systems producing events to some of our topics.  We try our best to have 
> governance on the data formats including strict usage of Avro schemas but 
> there are still variations in timestamp data types that are allowed by the 
> schema.
> In the end there will be multiple formats coming into the same timestamp 
> fields (for example, with and without milliseconds, with and without a 
> timezone specifier, etc).
> And then you get failed events in Connect with messages like this:
> {noformat}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorror(RetryWithToleranceOperator.java:178)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at 
> org.apache.ntime.TransformationChain.apply(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
>   at 
> org.aect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
>   at org.apache.kafka.corkerSinkTask.iteration(WorkerSinkTask.java:228)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
>   at org.apache.kafka.connect.runtime.WorrkerTask.java:184)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   atrrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at java.util.concurrent.ThreadPoolExecutor$WorolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.connect.errors.DataException: Could not parse 
> timestamp: value (2020-10-06T12:12:27h pattern (-MM-dd'T'HH:mm:ss.SSSX)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120)
>   at 
> org.apache.kafka.connect.transformrter.convertTimestamp(TimestampConverter.java:450)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375)
>   at 
> org.apachtransforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279)
>   at 
> .connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithT.java:128)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   ... 14 more
> Caused by: java.text.Unparseable date: \"2020-10-06T12:12:27Z\"
>   at java.text.DateFormat.parse(DateFormat.java:366)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampCo)
>   ... 21 more
> {noformat}
>  
> My thinking is that maybe a good solution is to switch from using 
> *java.util.Date* to instead using *java.util.Time*, then instead of 
> *SimpleDateFormatter* switch to *DateTimeFormatter* which will allow usage of 
> more sophisticated patterns in the config to match multiple different 
> allowable formats.
> For example instead of effectively doing this:
> {code:java}
> SimpleDateFormat format = new 
> SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX");{code}
> It can be something like this:
> {code:java}
> DateTimeFormatter format = DateTimeFormatter.ofPattern("[-MM-dd[['T'][ 
> ]HH:mm:ss[.SSSz][.SSS[XXX][X");{code}
> Also if there are multiple timestamp fields in the schema/events, then today 
> you have to chain multiple *TimestampConverter* 

[jira] [Updated] (KAFKA-10627) Connect TimestampConverter transform does not support multiple formats for the same field and only allows one field to be transformed at a time

2021-07-28 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10627:
--
Labels: connect-transformation need-kip  (was: )

> Connect TimestampConverter transform does not support multiple formats for 
> the same field and only allows one field to be transformed at a time
> ---
>
> Key: KAFKA-10627
> URL: https://issues.apache.org/jira/browse/KAFKA-10627
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Joshua Grisham
>Priority: Minor
>  Labels: connect-transformation, need-kip
>
> Some of the limitations of the *TimestampConverter* transform are causing 
> issues for us since we have a lot of different producers from different 
> systems producing events to some of our topics.  We try our best to have 
> governance on the data formats including strict usage of Avro schemas but 
> there are still variations in timestamp data types that are allowed by the 
> schema.
> In the end there will be multiple formats coming into the same timestamp 
> fields (for example, with and without milliseconds, with and without a 
> timezone specifier, etc).
> And then you get failed events in Connect with messages like this:
> {noformat}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorror(RetryWithToleranceOperator.java:178)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at 
> org.apache.ntime.TransformationChain.apply(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
>   at 
> org.aect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
>   at org.apache.kafka.corkerSinkTask.iteration(WorkerSinkTask.java:228)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
>   at org.apache.kafka.connect.runtime.WorrkerTask.java:184)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   atrrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at java.util.concurrent.ThreadPoolExecutor$WorolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.connect.errors.DataException: Could not parse 
> timestamp: value (2020-10-06T12:12:27h pattern (-MM-dd'T'HH:mm:ss.SSSX)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120)
>   at 
> org.apache.kafka.connect.transformrter.convertTimestamp(TimestampConverter.java:450)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375)
>   at 
> org.apachtransforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279)
>   at 
> .connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithT.java:128)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   ... 14 more
> Caused by: java.text.Unparseable date: \"2020-10-06T12:12:27Z\"
>   at java.text.DateFormat.parse(DateFormat.java:366)
>   at 
> org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampCo)
>   ... 21 more
> {noformat}
>  
> My thinking is that maybe a good solution is to switch from using 
> *java.util.Date* to instead using *java.util.Time*, then instead of 
> *SimpleDateFormatter* switch to *DateTimeFormatter* which will allow usage of 
> more sophisticated patterns in the config to match multiple different 
> allowable formats.
> For example instead of effectively doing this:
> {code:java}
> SimpleDateFormat format = new 
> SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX");{code}
> It can be something like this:
> {code:java}
> DateTimeFormatter format = DateTimeFormatter.ofPattern("[-MM-dd[['T'][ 
> ]HH:mm:ss[.SSSz][.SSS[XXX][X");{code}
> Also if there are multiple timestamp fields in the schema/events, then today 
> you have to chain multiple *TimestampConverter*