Hi, Tom. I don't reproduce the exception in the master. I am not sure whether the problem is fixed or I missing something.
The only difference is my test udf extends ScalarFunction rather than DPScalarFunction and I use String[] as the input type. ``` public static class ListToString extends ScalarFunction { public String eval(String[] arr) { return "foo"; } } ``` I think you can also debug in this way: 0. Open the Flink repo and checkout to the release-1.11 1. Create the UDF in JavaUserDefinedScalarFunctions 2. Find a test in the table ITCase, e.g. TableSinkITCase.scala 3. Add a new test to verify the results. I just add the following code ``` @Test def test(): Unit = { val dataId = TestValuesTableFactory.registerRowData( Seq(GenericRowData.of(new GenericArrayData(Array(StringData.fromString("3")).toArray[Object])))) tEnv.executeSql( s""" |CREATE TABLE test2 (person ARRAY<STRING>) WITH( | 'connector' = 'values', | 'data-id' = '$dataId', | 'register-internal-data' = 'true' |) |""".stripMargin ) tEnv.createFunction("ListToString", classOf[ListToString]) tEnv.executeSql("SELECT ListToString(`person`) as col1_string FROM `test2`").print() } ``` 4. Then you can debug the case in your IDEA. Considering the Flink 1.11 is not maintained by the community, do you mind to upgrade to the latest version(1.13/1.14/1.15)? Best, Shengkai Tom Thornton <thom...@yelp.com> 于2022年6月1日周三 02:06写道: > Hi all, > > Thank you for the help. > > It seems an exception thrown when Flink try to deserialize the object >> outputed by your udf. So is the obejct produced by your udf serializable? >> Does it contain any lambda function in the object/class? > > > The output object of the UDF is the string "foo" which should be > serializable. This exception only occurs when the input to the UDF is not > null. However, when the input is null, then the output object (which is > still the string "foo") does not cause any error or exception (i.e. it is > able to be serialized). There are no lambda functions in the output object > (it is just a string object). > > Thanks, > Tom > > On Thu, May 26, 2022 at 9:36 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > >> It seems an exception thrown when Flink try to deserialize the object >> outputed by your udf. So is the obejct produced by your udf serializable? >> Does it contain any lambda function in the object/class? >> >> Best regards, >> Yuxia >> >> ------------------------------ >> *发件人: *"Tom Thornton" <thom...@yelp.com> >> *收件人: *"User" <user@flink.apache.org> >> *发送时间: *星期五, 2022年 5 月 27日 上午 6:47:04 >> *主题: *Exception when running Java UDF with Blink table planner >> >> We are migrating from the legacy table planner to the Blink table >> planner. Previously we had a UDF defined like this that worked without >> issue: >> >> public class ListToString extends DPScalarFunction { >> public String eval(List list) { >> return "foo"; >> } >> >> Since moving to the Blink table planner and receiving this error: >> >> Caused by: org.apache.flink.table.api.ValidationException: Given parameters >> of function 'ListToString' do not match any signature. >> Actual: (java.lang.String[]) >> Expected: (java.util.List) >> >> >> We refactored the UDF to take as input an Object[] to match what is >> received from Blink: >> >> public class ListToString extends DPScalarFunction { >> public String eval(Object[] arr) { return "foo"; >> } >> } >> >> Now the UDF always fails (including for the simplified example above >> where we return a constant string regardless of input). For example, when >> we run on a query like this one: >> >> SELECT ListToString(`col1`) as col1_string FROM `table` >> >> Produces an IndexOutOfBoundsException: >> >> Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for >> length 0 >> at >> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) >> at >> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) >> at >> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) >> at java.base/java.util.Objects.checkIndex(Objects.java:372) >> at java.base/java.util.ArrayList.get(ArrayList.java:459) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) >> at >> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570) >> at >> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64) >> at >> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700) >> at >> org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683) >> at >> org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175) >> at >> org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104) >> at >> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128) >> at >> org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070) >> at >> org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406) >> at StreamExecCalc$337.processElement(Unknown Source) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at SourceConversion$328.processElement(Unknown Source) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:107) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:114) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:187) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:833) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:825) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266) >> >> Any ideas what may be causing this? >> >>