[ 
https://issues.apache.org/jira/browse/KAFKA-10627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388779#comment-17388779
 ] 

Randall Hauch commented on KAFKA-10627:
---------------------------------------

Thanks, [~joshuagrisham]! I've added you as a contributor to this Jira project, 
and assigned this issue to you since you've created the KIP and a PR.

I'll take a look at the proposed KIP and respond on the KIP discussion thread.

> Connect TimestampConverter transform does not support multiple formats for 
> the same field and only allows one field to be transformed at a time
> -----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10627
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10627
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Joshua Grisham
>            Assignee: Joshua Grisham
>            Priority: Minor
>              Labels: connect-transformation, need-kip
>
> Some of the limitations of the *TimestampConverter* transform are causing 
> issues for us since we have a lot of different producers from different 
> systems producing events to some of our topics.  We try our best to have 
> governance on the data formats including strict usage of Avro schemas but 
> there are still variations in timestamp data types that are allowed by the 
> schema.
> In the end there will be multiple formats coming into the same timestamp 
> fields (for example, with and without milliseconds, with and without a 
> timezone specifier, etc).
> And then you get failed events in Connect with messages like this:
> {noformat}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>       at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorror(RetryWithToleranceOperator.java:178)
>       at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>       at 
> org.apache.ntime.TransformationChain.apply(TransformationChain.java:50)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
>       at 
> org.aect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
>       at org.apache.kafka.corkerSinkTask.iteration(WorkerSinkTask.java:228)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
>       at org.apache.kafka.connect.runtime.WorrkerTask.java:184)
>       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       atrrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$WorolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.connect.errors.DataException: Could not parse 
> timestamp: value (2020-10-06T12:12:27h pattern (yyyy-MM-dd'T'HH:mm:ss.SSSX)
>       at 
> org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120)
>       at 
> org.apache.kafka.connect.transformrter.convertTimestamp(TimestampConverter.java:450)
>       at 
> org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375)
>       at 
> org.apachtransforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362)
>       at 
> org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279)
>       at 
> .connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>       at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithT.java:128)
>       at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>       ... 14 more
> Caused by: java.text.Unparseable date: \"2020-10-06T12:12:27Z\"
>       at java.text.DateFormat.parse(DateFormat.java:366)
>       at 
> org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampCo)
>       ... 21 more
> {noformat}
>  
> My thinking is that maybe a good solution is to switch from using 
> *java.util.Date* to instead using *java.util.Time*, then instead of 
> *SimpleDateFormatter* switch to *DateTimeFormatter* which will allow usage of 
> more sophisticated patterns in the config to match multiple different 
> allowable formats.
> For example instead of effectively doing this:
> {code:java}
> SimpleDateFormat format = new 
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");{code}
> It can be something like this:
> {code:java}
> DateTimeFormatter format = DateTimeFormatter.ofPattern("[yyyy-MM-dd[['T'][ 
> ]HH:mm:ss[.SSSSSSSz][.SSS[XXX][X]]]]");{code}
> Also if there are multiple timestamp fields in the schema/events, then today 
> you have to chain multiple *TimestampConverter* transforms together but I can 
> see a little bit of a performance impact if there are many timestamps on 
> large events and the topic has a lot of events coming through.
> So it would be great actually if the field name could instead be a 
> comma-separated list of field names (much like you can use with *Cast*, 
> *ReplaceField*, etc transforms) and then it will just loop through each field 
> in the list and apply the same logic (parse field based on string and give 
> requested output type).
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to