版本:
pyflink==1.0
apache-flink==1.11.2
代码如下:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
'true')
class SplitStr(TableFunction):
def eval(self, data):
for row in data:
yield row[0], row[1]
splitStr = udtf(
SplitStr(),
DataTypes.ARRAY(
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.STRING())
]
)
),
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.STRING())
]
)
)
t_env.register_function("splitStr", splitStr)
t_env.sql_update("""
CREATE TABLE mySource (
id varchar,
data array<ROW<name STRING,age STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'mytesttopic',
'properties.bootstrap.servers' = '172.17.0.2:9092',
'properties.group.id' = 'flink-test-cxy',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
t_env.sql_update("""
CREATE TABLE mysqlsink (
id varchar
,name varchar
,age varchar
)
with (
'connector' = 'print'
)
""")
t_env.sql_update("insert into mysqlsink select id,name,age from mySource
,LATERAL TABLE(splitStr(data)) as T(name, age)")
t_env.execute("test")
最终报错
TypeError: Invalid result_type: result_type should be DataType but contains
RowField(name, VARCHAR)
报错的地方是
File
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
line 264, in __init__
def __init__(self, func, input_types, result_types, deterministic=None,
name=None):
super(UserDefinedTableFunctionWrapper, self).__init__(
func, input_types, deterministic, name)
if not isinstance(result_types, collections.Iterable):
result_types = [result_types]
for result_type in result_types:
if not isinstance(result_type, DataType):
raise TypeError(
"Invalid result_type: result_type should be DataType but contains {}".format(
result_type))
self._result_types = result_types
self._judtf_placeholder = None
断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗
另外的,假如我在
上面在创建udtf的时候,如果这样写
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(),
DataTypes.BIGINT()])
却可以正常运行,但是显然类型跟我实际运行的不对应