Hi Hang and community,
There is a correction in my earlier email. The issue comes when I use the
UPPER or TRIM function with IF.
Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or
TRIM functions(there might be more such functions), I am getting the
exception. These functions used to work fine with Flink 1.13.
select
  if(
    address_id = 'a',
    'default',
    upper(address_id)
  ) as address_id
from
  feature_test

sample Input sent to my Kafka Source:   {"address_id":"mydata"}
It should be reproducible. Please try it.

2023-05-05 23:30:24
java.io.IOException: Failed to deserialize consumer record due to
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
    at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
    at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
    at org.apache.flink.streaming.runtime.io
.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:750)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
    at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
    at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
    at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
    at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
    ... 14 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    ... 22 more
Caused by: java.lang.NullPointerException
    at StreamExecCalc$14237.processElement_split1961(Unknown Source)
    at StreamExecCalc$14237.processElement(Unknown Source)
    at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    ... 28 more


On Mon, May 8, 2023 at 11:48 AM Hang Ruan <ruanhang1...@gmail.com> wrote:

> Hi, neha,
>
> I think the error occurred because of the deserialization. Is there some
> example data and runnable SQLs to reproduce the problem?
>
> Best,
> Hang
>
> neha goyal <nehagoy...@gmail.com> 于2023年5月2日周二 16:33写道:
>
>> Hello,
>>
>> I am using Flink 1.16.1 and observing a different behavior from Flink
>> 1.13.6.
>>
>> SELECT if(some_string_field is null, 'default', 'some_string_field') from
>> my_stream
>>
>> This SQL flink job in the streaming environment is erroring out during
>> runtime with the exception mentioned below. There are no null values sent
>> and it is failing for the nonnull values as well.
>>
>> It is running fine in Flink 1.13.6. Also, if I use the Integer field, it
>> runs fine.
>> Was there any change around this in Flink 14/15/16?
>>
>> io.IOException: Failed to deserialize consumer record due to
>>     at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
>>     at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
>>     at
>> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
>>     at
>> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>>     at org.apache.flink.streaming.runtime.io
>> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>>     at org.apache.flink.streaming.runtime.io
>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>>     at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>>     at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>>     at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>>     at java.lang.Thread.run(Thread.java:750)
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
>>     at
>> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>>     at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
>>     at
>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
>>     at
>> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
>>     at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
>>     ... 14 more
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>>     at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>>     at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>>     at
>> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>>     ... 22 more
>> Caused by: java.lang.NullPointerException
>>     at
>> StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown
>> Source)
>>     at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source)
>>     at StreamExecCalc$53548.processElement_split10047(Unknown Source)
>>     at StreamExecCalc$53548.processElement(Unknown Source)
>>     at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>>     ... 28 more
>>
>>

Reply via email to