上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(),
DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(),
[DataTypes.STRING(), DataTypes.STRING()])<br/>udtf的第三个参数,
好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
在 2020-09-30 19:07:06,"chenxuying" <cxydeve...@163.com> 写道:
>版本:
>pyflink==1.0
>apache-flink==1.11.2
>代码如下:
>env = StreamExecutionEnvironment.get_execution_environment()
>env.set_parallelism(1)
>t_env = StreamTableEnvironment.create(env)
>t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
>
>
>class SplitStr(TableFunction):
> def eval(self, data):
> for row in data:
> yield row[0], row[1]
>splitStr = udtf(
> SplitStr(),
> DataTypes.ARRAY(
> DataTypes.ROW(
> [
> DataTypes.FIELD("name", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.STRING())
> ]
> )
> ),
> DataTypes.ROW(
> [
> DataTypes.FIELD("name", DataTypes.STRING()),
> DataTypes.FIELD("id", DataTypes.STRING())
> ]
> )
>)
>t_env.register_function("splitStr", splitStr)
>
>
>t_env.sql_update("""
>CREATE TABLE mySource (
>
>id varchar,
>data array<ROW<name STRING,age STRING>>
>) WITH (
>'connector' = 'kafka',
> 'topic' = 'mytesttopic',
> 'properties.bootstrap.servers' = '172.17.0.2:9092',
> 'properties.group.id' = 'flink-test-cxy',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
>)
>""")
>t_env.sql_update("""
>CREATE TABLE mysqlsink (
>id varchar
>,name varchar
>,age varchar
>)
>with (
> 'connector' = 'print'
>)
>""")
>t_env.sql_update("insert into mysqlsink select id,name,age from mySource
>,LATERAL TABLE(splitStr(data)) as T(name, age)")
>t_env.execute("test")
>
>
>最终报错
>TypeError: Invalid result_type: result_type should be DataType but contains
>RowField(name, VARCHAR)
>报错的地方是
>File
>"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
> line 264, in __init__
>
>
>def __init__(self, func, input_types, result_types, deterministic=None,
>name=None):
>super(UserDefinedTableFunctionWrapper, self).__init__(
>func, input_types, deterministic, name)
>
>
>if not isinstance(result_types, collections.Iterable):
>result_types = [result_types]
>
>
>for result_type in result_types:
>if not isinstance(result_type, DataType):
>raise TypeError(
>"Invalid result_type: result_type should be DataType but contains {}".format(
>result_type))
>
>
>self._result_types = result_types
>self._judtf_placeholder = None
>
>
>断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗
>
>
>另外的,假如我在
>上面在创建udtf的时候,如果这样写
>splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(),
>DataTypes.BIGINT()])
>却可以正常运行,但是显然类型跟我实际运行的不对应