Re: pyFlink UDTF function registration

2020-07-15 Thread Xingbo Huang
Hi Manas,
You need to join with the python udtf function. You can try the following
sql:

ddl_populate_temporary_table = f"""
INSERT INTO {TEMPORARY_TABLE}
SELECT * FROM (
SELECT monitorId, featureName, featureData, time_st
FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName,
featureData)) t
"""

Best,
Xingbo

Manas Kale  于2020年7月15日周三 下午7:31写道:

> Hi,
> I am trying to use a UserDefined Table Function to split up some data as
> follows:
>
> from pyflink.table.udf import udtf
>
> @udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), 
> DataTypes.DOUBLE()])
> def split_feature_values(data_string):
> json_data = loads(data_string)
> for f_name, f_value in json_data.items():
> yield (f_name, f_value)
>
> # configure the off-heap memory of current taskmanager to enable the python 
> worker uses off-heap memory.
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>  '80m')
>
> # Register UDTF
> t_env.register_function("split", split_feature_values)
> ddl_source = f"""
> CREATE TABLE {INPUT_TABLE} (
> `monitorId` STRING,
> `deviceId` STRING,
> `state` INT,
> `data` STRING,
> `time_st` TIMESTAMP(3),
> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{INPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """
>
> ddl_temporary_table = f"""
> CREATE TABLE {TEMPORARY_TABLE} (
> `monitorId` STRING,
> `featureName` STRING,
> `featureData` DOUBLE,
> `time_st` TIMESTAMP(3),
> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
> )
> """
>
> ddl_populate_temporary_table = f"""
> INSERT INTO {TEMPORARY_TABLE}
> SELECT monitorId, split(data), time_st
> FROM {INPUT_TABLE}
> """
>
> t_env.execute_sql(ddl_source)
> t_env.execute_sql(ddl_temporary_table)
> t_env.execute_sql(ddl_populate_temporary_table)
>
>
> However, I get the following error :
> py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
> : org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 3, column 23 to line 3, column 33:* No match found for function
> signature split()*
>
> I believe I am using the correct call to register the UDTF as per [1]. Am
> I missing something?
>
> Thanks,
> Manas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions
>


pyFlink UDTF function registration

2020-07-15 Thread Manas Kale
Hi,
I am trying to use a UserDefined Table Function to split up some data as
follows:

from pyflink.table.udf import udtf

@udtf(input_types=DataTypes.STRING(), result_types=
[DataTypes.STRING(), DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield (f_name, f_value)

# configure the off-heap memory of current taskmanager to enable the
python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`data` STRING,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

ddl_temporary_table = f"""
CREATE TABLE {TEMPORARY_TABLE} (
`monitorId` STRING,
`featureName` STRING,
`featureData` DOUBLE,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
)
"""

ddl_populate_temporary_table = f"""
INSERT INTO {TEMPORARY_TABLE}
SELECT monitorId, split(data), time_st
FROM {INPUT_TABLE}
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_temporary_table)
t_env.execute_sql(ddl_populate_temporary_table)


However, I get the following error :
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 3, column 23 to line 3, column 33:* No match found for function
signature split()*

I believe I am using the correct call to register the UDTF as per [1]. Am I
missing something?

Thanks,
Manas

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions