版本:
pyflink==1.0
apache-flink==1.11.2
代码如下:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
 'true')


class SplitStr(TableFunction):
    def eval(self, data):
        for row in data:
            yield row[0], row[1]
splitStr = udtf(
    SplitStr(),
    DataTypes.ARRAY(
        DataTypes.ROW(
            [
                DataTypes.FIELD("name", DataTypes.STRING()),
                DataTypes.FIELD("id", DataTypes.STRING())
            ]
        )
    ),
    DataTypes.ROW(
        [
            DataTypes.FIELD("name", DataTypes.STRING()),
            DataTypes.FIELD("id", DataTypes.STRING())
        ]
    )
)
t_env.register_function("splitStr", splitStr)


t_env.sql_update("""
CREATE TABLE mySource (                                                         
                               
id varchar,
data array<ROW<name STRING,age STRING>>                                 
) WITH (                                                         
'connector' = 'kafka',
        'topic' = 'mytesttopic',
        'properties.bootstrap.servers' = '172.17.0.2:9092',
        'properties.group.id' = 'flink-test-cxy',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'                                     
) 
""")
t_env.sql_update("""
CREATE TABLE mysqlsink (
id varchar
,name varchar
,age  varchar
) 
with (
    'connector' = 'print'
)
""")
t_env.sql_update("insert into mysqlsink select id,name,age from mySource 
,LATERAL TABLE(splitStr(data)) as T(name, age)")
t_env.execute("test")


最终报错
TypeError: Invalid result_type: result_type should be DataType but contains 
RowField(name, VARCHAR)
报错的地方是
File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
 line 264, in __init__


def __init__(self, func, input_types, result_types, deterministic=None, 
name=None):
super(UserDefinedTableFunctionWrapper, self).__init__(
func, input_types, deterministic, name)


if not isinstance(result_types, collections.Iterable):
result_types = [result_types]


for result_type in result_types:
if not isinstance(result_type, DataType):
raise TypeError(
"Invalid result_type: result_type should be DataType but contains {}".format(
result_type))


self._result_types = result_types
self._judtf_placeholder = None


断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗


另外的,假如我在
上面在创建udtf的时候,如果这样写
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
DataTypes.BIGINT()])
却可以正常运行,但是显然类型跟我实际运行的不对应

回复