Flink Sql erroring at runtime Flink 1.16
Hello, Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or TRIM functions(there might be more such functions), I am getting the exception. These functions used to work fine with Flink 1.13. select if( address_id = 'a', 'default', upper(address_id) ) as address_id from feature_test sample Input sent to my Kafka Source: {"address_id":"mydata"} It should be reproducible. Please try it. 2023-05-05 23:30:24 java.io.IOException: Failed to deserialize consumer record due to at StreamExecCalc$14237.processElement_split1961(Unknown Source) at StreamExecCalc$14237.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 28 more
Re: Flink Sql erroring at runtime
Hi Hang and community, There is a correction in my earlier email. The issue comes when I use the UPPER or TRIM function with IF. Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or TRIM functions(there might be more such functions), I am getting the exception. These functions used to work fine with Flink 1.13. select if( address_id = 'a', 'default', upper(address_id) ) as address_id from feature_test sample Input sent to my Kafka Source: {"address_id":"mydata"} It should be reproducible. Please try it. 2023-05-05 23:30:24 java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 22 more Caused by: java.lang.NullPointerException at StreamExecCalc$14237.processElement_split1961(Unknown Source) at StreamExecCalc$14237.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 28 more On Mon, May 8, 2023 at 11:48 AM Hang Ruan wrote: > Hi, neha, > > I think the error occurred because of the deserialization. Is there some > example data and runnable SQLs to reproduce the problem? > > Best, > Hang > > neha goyal 于2023年5月2日周二 16:33写道: > >> Hello, >> >> I am using Flink 1.16.1 and observing a different behavior from Flink >> 1.13.6. >> >> SELECT if(some_string_field is null, 'default', 'some_string_field') from >> my_stream >> >> This SQL
Re: Flink Sql erroring at runtime
Hi, neha, I think the error occurred because of the deserialization. Is there some example data and runnable SQLs to reproduce the problem? Best, Hang neha goyal 于2023年5月2日周二 16:33写道: > Hello, > > I am using Flink 1.16.1 and observing a different behavior from Flink > 1.13.6. > > SELECT if(some_string_field is null, 'default', 'some_string_field') from > my_stream > > This SQL flink job in the streaming environment is erroring out during > runtime with the exception mentioned below. There are no null values sent > and it is failing for the nonnull values as well. > > It is running fine in Flink 1.13.6. Also, if I use the Integer field, it > runs fine. > Was there any change around this in Flink 14/15/16? > > io.IOException: Failed to deserialize consumer record due to > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) > at org.apache.flink.streaming.runtime.io > .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:750) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) > at > org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) > at > org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) > ... 14 more > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > at > org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > ... 22 more > Caused by: java.lang.NullPointerException > at > StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown > Source) > at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source) > at StreamExecCalc$53548.processElement_split10047(Unknown Source) > at StreamExecCalc$53548.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > ... 28 more >
Flink Sql erroring at runtime
Hello, I am using Flink 1.16.1 and observing a different behavior from Flink 1.13.6. SELECT if(some_string_field is null, 'default', 'some_string_field') from my_stream This SQL flink job in the streaming environment is erroring out during runtime with the exception mentioned below. There are no null values sent and it is failing for the nonnull values as well. It is running fine in Flink 1.13.6. Also, if I use the Integer field, it runs fine. Was there any change around this in Flink 14/15/16? io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 22 more Caused by: java.lang.NullPointerException at StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown Source) at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source) at StreamExecCalc$53548.processElement_split10047(Unknown Source) at StreamExecCalc$53548.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 28 more