[ https://issues.apache.org/jira/browse/KAFKA-10627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch updated KAFKA-10627: ---------------------------------- Issue Type: New Feature (was: Improvement) > Connect TimestampConverter transform does not support multiple formats for > the same field and only allows one field to be transformed at a time > ----------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10627 > URL: https://issues.apache.org/jira/browse/KAFKA-10627 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Joshua Grisham > Priority: Minor > Labels: connect-transformation, need-kip > > Some of the limitations of the *TimestampConverter* transform are causing > issues for us since we have a lot of different producers from different > systems producing events to some of our topics. We try our best to have > governance on the data formats including strict usage of Avro schemas but > there are still variations in timestamp data types that are allowed by the > schema. > In the end there will be multiple formats coming into the same timestamp > fields (for example, with and without milliseconds, with and without a > timezone specifier, etc). > And then you get failed events in Connect with messages like this: > {noformat} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorror(RetryWithToleranceOperator.java:178) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) > at > org.apache.ntime.TransformationChain.apply(TransformationChain.java:50) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514) > at > org.aect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325) > at org.apache.kafka.corkerSinkTask.iteration(WorkerSinkTask.java:228) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196) > at org.apache.kafka.connect.runtime.WorrkerTask.java:184) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > atrrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$WorolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.connect.errors.DataException: Could not parse > timestamp: value (2020-10-06T12:12:27h pattern (yyyy-MM-dd'T'HH:mm:ss.SSSX) > at > org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120) > at > org.apache.kafka.connect.transformrter.convertTimestamp(TimestampConverter.java:450) > at > org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375) > at > org.apachtransforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362) > at > org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279) > at > .connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithT.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 14 more > Caused by: java.text.Unparseable date: \"2020-10-06T12:12:27Z\" > at java.text.DateFormat.parse(DateFormat.java:366) > at > org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampCo) > ... 21 more > {noformat} > > My thinking is that maybe a good solution is to switch from using > *java.util.Date* to instead using *java.util.Time*, then instead of > *SimpleDateFormatter* switch to *DateTimeFormatter* which will allow usage of > more sophisticated patterns in the config to match multiple different > allowable formats. > For example instead of effectively doing this: > {code:java} > SimpleDateFormat format = new > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");{code} > It can be something like this: > {code:java} > DateTimeFormatter format = DateTimeFormatter.ofPattern("[yyyy-MM-dd[['T'][ > ]HH:mm:ss[.SSSSSSSz][.SSS[XXX][X]]]]");{code} > Also if there are multiple timestamp fields in the schema/events, then today > you have to chain multiple *TimestampConverter* transforms together but I can > see a little bit of a performance impact if there are many timestamps on > large events and the topic has a lot of events coming through. > So it would be great actually if the field name could instead be a > comma-separated list of field names (much like you can use with *Cast*, > *ReplaceField*, etc transforms) and then it will just loop through each field > in the list and apply the same logic (parse field based on string and give > requested output type). > -- This message was sent by Atlassian Jira (v8.3.4#803005)