Hi, neha,

I think the error occurred because of the deserialization. Is there some
example data and runnable SQLs to reproduce the problem?

Best,
Hang

neha goyal <nehagoy...@gmail.com> 于2023年5月2日周二 16:33写道:

> Hello,
>
> I am using Flink 1.16.1 and observing a different behavior from Flink
> 1.13.6.
>
> SELECT if(some_string_field is null, 'default', 'some_string_field') from
> my_stream
>
> This SQL flink job in the streaming environment is erroring out during
> runtime with the exception mentioned below. There are no null values sent
> and it is failing for the nonnull values as well.
>
> It is running fine in Flink 1.13.6. Also, if I use the Integer field, it
> runs fine.
> Was there any change around this in Flink 14/15/16?
>
> io.IOException: Failed to deserialize consumer record due to
>     at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
>     at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
>     at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
>     at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>     at org.apache.flink.streaming.runtime.io
> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>     at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
>     at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>     at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
>     at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
>     at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
>     at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
>     ... 14 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     ... 22 more
> Caused by: java.lang.NullPointerException
>     at
> StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown
> Source)
>     at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source)
>     at StreamExecCalc$53548.processElement_split10047(Unknown Source)
>     at StreamExecCalc$53548.processElement(Unknown Source)
>     at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     ... 28 more
>
>

Reply via email to