Hi, 小学生。 把函数get的标签udf给去掉,它只是普通的Python函数,不要加上@udf,加上之后就不是python的函数了。只有Python的UDF你才要加上@udf
Best, Xingbo 小学生 <[email protected]> 于2020年6月4日周四 下午2:46写道: > 各位大佬好,初学pyflink,有一个问题需要帮忙解决下。 > > > 代码为: > from pyflink.table import StreamTableEnvironment, DataTypes, > EnvironmentSettings,TableConfig,BatchTableEnvironment > from pyflink.table.descriptors import Schema, OldCsv, FileSystem > from pyflink.table.udf import udf > from pyflink.datastream import StreamExecutionEnvironment > elements = 'aaa|bbb' > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > t_env = StreamTableEnvironment.create(env, > environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) > > > @udf(input_types=[DataTypes.STRING()], > result_type=DataTypes.ARRAY(DataTypes.STRING())) > def split(x): > return x.strip().split("|") > # t_env.register_function("split", udf(lambda i: i.strip().split("|"), > [DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING()))) > t_env.register_function("split", split) > #split拆分后为一个2元数组 > @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()], > result_type=DataTypes.STRING()) > def get(array, index): > return array[index] > > > @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())], > result_type=DataTypes.STRING()) > def convert(array): > return get(array, 0) > > > t_env.register_function("convert", convert) > > > t_env.connect(FileSystem().path('/tmp/output')) \ > .with_format(OldCsv() > .field('b', > DataTypes.STRING())) \ > .with_schema(Schema() > .field('b', > DataTypes.STRING())) \ > .create_temporary_table('mySink') > > > t_env.from_elements(elements)\ > .alias('line')\ > .select('split(line)')\ > .alias('array')\ > .select('convert(array) as b')\ > .insert_into('mySink')\ > .t_env.execute("convert_job") > > 报错为: > TypeError: 'UserDefinedFunctionWrapper' object is not callable
