Hi, I am trying to use a UserDefined Table Function to split up some data as follows:
from pyflink.table.udf import udtf @udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), DataTypes.DOUBLE()]) def split_feature_values(data_string): json_data = loads(data_string) for f_name, f_value in json_data.items(): yield (f_name, f_value) # configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory. t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m') # Register UDTF t_env.register_function("split", split_feature_values) ddl_source = f""" CREATE TABLE {INPUT_TABLE} ( `monitorId` STRING, `deviceId` STRING, `state` INT, `data` STRING, `time_st` TIMESTAMP(3), WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = '{INPUT_TOPIC}', 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', 'format' = 'json' ) """ ddl_temporary_table = f""" CREATE TABLE {TEMPORARY_TABLE} ( `monitorId` STRING, `featureName` STRING, `featureData` DOUBLE, `time_st` TIMESTAMP(3), WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND ) """ ddl_populate_temporary_table = f""" INSERT INTO {TEMPORARY_TABLE} SELECT monitorId, split(data), time_st FROM {INPUT_TABLE} """ t_env.execute_sql(ddl_source) t_env.execute_sql(ddl_temporary_table) t_env.execute_sql(ddl_populate_temporary_table) However, I get the following error : py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql. : org.apache.flink.table.api.ValidationException: SQL validation failed. >From line 3, column 23 to line 3, column 33:* No match found for function signature split(<CHARACTER>)* I believe I am using the correct call to register the UDTF as per [1]. Am I missing something? Thanks, Manas [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions