????????????????pyflink,??????????????????????????
????????
from pyflink.table import StreamTableEnvironment, DataTypes,
EnvironmentSettings,TableConfig,BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
elements = 'aaa|bbb'
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
@udf(input_types=[DataTypes.STRING()],
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def split(x):
return x.strip().split("|")
# t_env.register_function("split", udf(lambda i: i.strip().split("|"),
[DataTypes.STRING()], DataTypes.ARRAY(DataTypes.STRING())))
t_env.register_function("split", split)
#split????????????2??????
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()],
result_type=DataTypes.STRING())
def get(array, index):
return array[index]
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING())],
result_type=DataTypes.STRING())
def convert(array):
return get(array, 0)
t_env.register_function("convert", convert)
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('b',
DataTypes.STRING())) \
.with_schema(Schema()
.field('b',
DataTypes.STRING())) \
.create_temporary_table('mySink')
t_env.from_elements(elements)\
.alias('line')\
.select('split(line)')\
.alias('array')\
.select('convert(array) as b')\
.insert_into('mySink')\
.t_env.execute("convert_job")
????????
TypeError: 'UserDefinedFunctionWrapper' object is not callable