Hi, 小学生。
把函数get的标签udf给去掉,它只是普通的Python函数,不要加上@udf,加上之后就不是python的函数了。只有Python的UDF你才要加上@udf

Best,
Xingbo

小学生 <[email protected]> 于2020年6月4日周四 下午2:46写道:

> 各位大佬好,初学pyflink,有一个问题需要帮忙解决下。
>
>
> 代码为:
> from pyflink.table import StreamTableEnvironment, DataTypes,
> EnvironmentSettings,TableConfig,BatchTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> elements = 'aaa|bbb'
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env,
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
>
>
> @udf(input_types=[DataTypes.STRING()],
> &nbsp; &nbsp; &nbsp;result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def split(x):
> &nbsp; &nbsp; return x.strip().split("|")
> # t_env.register_function("split", udf(lambda i: i.strip().split("|"),
> [DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING())))
> t_env.register_function("split", split)
> #split拆分后为一个2元数组
> @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()],
> &nbsp; &nbsp; &nbsp;result_type=DataTypes.STRING())
> def get(array, index):
> &nbsp; &nbsp; return array[index]
>
>
> @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())],
> &nbsp; &nbsp; &nbsp;result_type=DataTypes.STRING())
> def convert(array):
> &nbsp; &nbsp; return get(array, 0)
>
>
> t_env.register_function("convert", convert)
>
>
> t_env.connect(FileSystem().path('/tmp/output')) \
> &nbsp; &nbsp; .with_format(OldCsv()
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.field('b',
> DataTypes.STRING())) \
> &nbsp; &nbsp; .with_schema(Schema()
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.field('b',
> DataTypes.STRING())) \
> &nbsp; &nbsp; .create_temporary_table('mySink')
>
>
> t_env.from_elements(elements)\
> &nbsp; &nbsp; &nbsp;.alias('line')\
> &nbsp; &nbsp; &nbsp;.select('split(line)')\
> &nbsp; &nbsp; &nbsp;.alias('array')\
> &nbsp; &nbsp; &nbsp;.select('convert(array) as b')\
> &nbsp; &nbsp; &nbsp;.insert_into('mySink')\
> &nbsp; &nbsp; &nbsp;.t_env.execute("convert_job")
>
> 报错为:
> TypeError: 'UserDefinedFunctionWrapper' object is not callable

回复