Hi, 小学生

我稍微修改了一下你的code(你的from_elements那样写按理说就没法运行)
code是能够正确运行的,你可以参考一下,你去掉的是不是有问题,或者你把你修改后的代码贴上来,再一起看看
from pyflink.table import StreamTableEnvironment, DataTypes,
EnvironmentSettings, TableConfig, \
    BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment


def test_test():
    elements = 'aaa|bbb'
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

    @udf(input_types=[DataTypes.STRING()],
         result_type=DataTypes.ARRAY(DataTypes.STRING()))
    def split(x):
        print(x.strip().split("|"))
        return x.strip().split("|")

    # t_env.register_function("split", udf(lambda i: i.strip().split("|"),
[DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING())))
    t_env.register_function("split", split)

    # split拆分后为一个2元数组
    def get(arr, index):
        return arr[index]

    @udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())],
result_type=DataTypes.STRING())
    def convert(arr):
        return get(arr, 0)


    t_env.register_function("convert", convert)

    t_env.connect(FileSystem().path('/tmp/output')) \
        .with_format(OldCsv()
                     .field('b', DataTypes.STRING())) \
        .with_schema(Schema()
                     .field('b', DataTypes.STRING())) \
        .create_temporary_table('mySink')

    t_env.from_elements([(elements,), ], ["a"]) \
        .alias('line') \
        .select('split(line) as arr') \
        .select('convert(arr) as b') \
        .insert_into('mySink')
    t_env.execute("convert_job")


if __name__ == '__main__':
    test_test()

Best,
Xingbo

小学生 <[email protected]> 于2020年6月4日周四 下午3:38写道:

> 大佬好,去掉了运行还是出错,一样的错误

回复