With the help from Dian and friends, it turns out the root cause is: When it `create_temporary_function`, it is in the default catalog. However, when it `execute_sql(TRANSFORM)`, it is in the "hive" catalog. A function defined as a temporary function in catalog "default" is not accessible from catalog "hive".
To solve the problem, simply replace `create_temporary_function` with `create_temporary_system_function` so that it is accessible from other catalogs as well. Reference: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/overview/ Best, Yik San On Tue, May 18, 2021 at 6:43 PM Yik San Chan <evan.chanyik...@gmail.com> wrote: > Hi Dian, > > I changed the udf to: > > ```python > @udf( > input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], > result_type=DataTypes.BIGINT(), > ) > def add(i, j): > return i + j > ``` > > But I still get the same error. > > On Tue, May 18, 2021 at 5:47 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Yik San, >> >> The expected input types for add are DataTypes.INT, however, the schema >> of aiinfra.mysource is: a bigint and b bigint. >> >> Regards, >> Dian >> >> 2021年5月18日 下午5:38,Yik San Chan <evan.chanyik...@gmail.com> 写道: >> >> Hi, >> >> I have a PyFlink script that fails to use a simple UDF. The full script >> can be found below: >> >> ```python >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import ( >> DataTypes, >> EnvironmentSettings, >> SqlDialect, >> StreamTableEnvironment, >> ) >> from pyflink.table.udf import udf >> >> >> @udf( >> input_types=[DataTypes.INT(), DataTypes.INT()], >> result_type=DataTypes.BIGINT(), >> ) >> def add(i, j): >> return i + j >> >> >> TRANSFORM = """ >> INSERT INTO aiinfra.mysink >> SELECT ADD(a, b) >> FROM aiinfra.mysource >> """ >> >> CREATE_CATALOG = """ >> CREATE CATALOG hive WITH ( >> 'type' = 'hive', >> 'hive-conf-dir' = '/data/software/hive-2.1.0/conf' >> )""" >> >> USE_CATALOG = "USE CATALOG hive" >> >> >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >> t_env = StreamTableEnvironment.create( >> stream_execution_environment=exec_env, environment_settings=env_settings >> ) >> >> t_env.create_temporary_function("add", add) >> >> t_env.get_config().set_sql_dialect(SqlDialect.HIVE) >> t_env.execute_sql(CREATE_CATALOG) >> t_env.execute_sql(USE_CATALOG) >> >> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) >> t_result = t_env.execute_sql(TRANSFORM) >> ``` >> >> However, when I submit the python file to my flink cluster, it throws >> exception: >> >> ``` >> [INFO] 2021-05-18 17:27:47.758 - >> [taskAppId=TASK-90019-86729-380519]:[152] - -> Traceback (most recent call >> last): >> File "aiinfra/batch_example.py", line 50, in <module> >> t_result = t_env.execute_sql(TRANSFORM) >> File >> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", >> line 766, in execute_sql >> File >> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", >> line 1286, in __call__ >> File >> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", >> line 147, in deco >> File >> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", >> line 328, in get_return_value >> py4j.protocol.Py4JJavaError: An error occurred while calling >> o4.executeSql. >> : org.apache.flink.table.api.ValidationException: SQL validation failed. >> From line 3, column 8 to line 3, column 16: No match found for function >> signature ADD(<NUMERIC>, <NUMERIC>) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/> >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.lang.Thread.run(Thread.java:748) >> ``` >> >> Seems it has difficulties knowing the "add" function has already been >> registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I >> don't think it is a upper-or-lower case issue. >> >> Also, if I replace "ADD(a, b)" with the simple "a + b", the script >> produces exactly what I need. >> >> Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2 >> columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint. >> >> Any help? Thanks! >> >> Best, >> Yik San >> >> >>