pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-05-31 文章
目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题: @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def drop_fields(message, *fields): import json message = json.loads(message) for field in fields: message.pop(field) return

pyflink Table Api连接 外部系统问题

2020-05-29 文章
目前在学习使用pyflink的Table api,请教一个问题: 1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作? 2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。 新手入门,请多指教,感谢。