Hi Tom,

Sorry for the late reply, I missed this. In the upcoming Flink 1.15 a
number of improvements on CAST will be included [1] Would you be able to
test this with the current RC0 of Flink 1.15? [2]

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1] https://issues.apache.org/jira/browse/FLINK-24403
[2] https://lists.apache.org/thread/qpzz298lh5zq5osxmoo0ky6kg0b0r5zg


On Tue, 22 Mar 2022 at 18:06, Tom Thornton <thom...@yelp.com> wrote:

> Hi Martijn,
>
> Do you know what could be causing this issue given our Flink version? Is
> this possibly a bug with that version?
>
> Thanks,
> Tom
>
> On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton <thom...@yelp.com> wrote:
>
>> Hi Martijn,
>>
>> We are using 1.11.6.
>>
>> Thank you for the help.
>>
>> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser <martijnvis...@apache.org>
>> wrote:
>>
>>> Hi Tom,
>>>
>>> Which version of Flink are you using?
>>>
>>> Best regards,
>>>
>>> Martijn Visser
>>> https://twitter.com/MartijnVisser82
>>>
>>>
>>> On Wed, 16 Mar 2022 at 23:59, Tom Thornton <thom...@yelp.com> wrote:
>>>
>>>> Per the docs <https://flink.apache.org/gettinghelp.html#found-a-bug>,
>>>> I'm hoping to confirm whether or not an error we are seeing is a bug with
>>>> Flink. We have a job that uses a Kafka source to read Avro records. The
>>>> kafka source is converted into a StreamTableSource. We are using the
>>>> new Blink table planner to execute SQL on the table stream. The output is
>>>> then put in a sink back to kafka as Avro records. Whenever a query selects
>>>> a column that has an avro logicalType of date, we get this error (link to 
>>>> full
>>>> stack trace <https://pastebin.com/raw/duQaTAh6>).
>>>>
>>>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be 
>>>> cast to class java.time.LocalDate (java.sql.Date is in module java.sql of 
>>>> loader 'platform'; java.time.LocalDate is in module java.base of loader 
>>>> 'bootstrap')
>>>>         at 
>>>> org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>>>>         at 
>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>>>>         at 
>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>>>         at 
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>>         at 
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>>         at 
>>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>>>>         at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>>>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>>>>         at java.base/java.lang.Thread.run(Thread.java:829)
>>>>
>>>>
>>>> The avro schema definition for a date field is as follows:
>>>>
>>>>             {
>>>>                 "name": "date",
>>>>                 "type": {
>>>>                     "type": "int",
>>>>                     "logicalType": "date"
>>>>                 },
>>>>                 "doc": "date"
>>>>             },
>>>>
>>>> Any query that selects a date column would produce the error (and any 
>>>> query without a column with type date will work). Example of a query that 
>>>> causes the error:
>>>>
>>>> select `date` from table1
>>>>
>>>> As suggested in the docs, I also tried this with parent-first loading and 
>>>> got the same error. When we run the same job without the Blink table 
>>>> planner, i.e., useOldPlanner(), we do not get this error. Is this a bug 
>>>> with Flink? Or is there something we can change in the application code to 
>>>> prevent this error? Any help/suggestions would be appreciated.
>>>>
>>>>

Reply via email to