Hello, Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or TRIM functions(there might be more such functions), I am getting the exception. These functions used to work fine with Flink 1.13. select if( address_id = 'a', 'default', upper(address_id) ) as address_id from feature_test
sample Input sent to my Kafka Source: {"address_id":"mydata"} It should be reproducible. Please try it. 2023-05-05 23:30:24 java.io.IOException: Failed to deserialize consumer record due to at StreamExecCalc$14237.processElement_split1961(Unknown Source) at StreamExecCalc$14237.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 28 more