Hi Yik San,

The expected input types for add are DataTypes.INT, however, the schema of 
aiinfra.mysource is: a bigint and b bigint.

Regards,
Dian

> 2021年5月18日 下午5:38,Yik San Chan <evan.chanyik...@gmail.com> 写道:
> 
> Hi,
> 
> I have a PyFlink script that fails to use a simple UDF. The full script can 
> be found below:
> 
> ```python
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import (
>     DataTypes,
>     EnvironmentSettings,
>     SqlDialect,
>     StreamTableEnvironment,
> )
> from pyflink.table.udf import udf
> 
> 
> @udf(
>     input_types=[DataTypes.INT(), DataTypes.INT()],
>     result_type=DataTypes.BIGINT(),
> )
> def add(i, j):
>     return i + j
> 
> 
> TRANSFORM = """
> INSERT INTO aiinfra.mysink
> SELECT ADD(a, b)
> FROM aiinfra.mysource
> """
> 
> CREATE_CATALOG = """
> CREATE CATALOG hive WITH (
>     'type' = 'hive',
>     'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
> )"""
> 
> USE_CATALOG = "USE CATALOG hive"
> 
> 
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
> t_env = StreamTableEnvironment.create(
>     stream_execution_environment=exec_env, environment_settings=env_settings
> )
> 
> t_env.create_temporary_function("add", add)
> 
> t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
> t_env.execute_sql(CREATE_CATALOG)
> t_env.execute_sql(USE_CATALOG)
> 
> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
> t_result = t_env.execute_sql(TRANSFORM)
> ```
> 
> However, when I submit the python file to my flink cluster, it throws 
> exception:
> 
> ```
> [INFO] 2021-05-18 17:27:47.758  - [taskAppId=TASK-90019-86729-380519]:[152] - 
>  -> Traceback (most recent call last):
>  File "aiinfra/batch_example.py", line 50, in <module>
>    t_result = t_env.execute_sql(TRANSFORM)
>  File 
> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
>  line 766, in execute_sql
>  File 
> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
>  File 
> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>  line 147, in deco
>  File 
> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
> : org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 3, column 8 to line 3, column 16: No match found for function signature 
> ADD(<NUMERIC>, <NUMERIC>)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> ```
> 
> Seems it has difficulties knowing the "add" function has already been 
> registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I 
> don't think it is a upper-or-lower case issue.
> 
> Also, if I replace "ADD(a, b)" with the simple "a + b", the script produces 
> exactly what I need.
> 
> Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2 
> columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint.
> 
> Any help? Thanks!
> 
> Best,
> Yik San

Reply via email to