目前在学习使用pyflink的Table api,请教一个pyflink udf函数报错问题:


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def drop_fields(message, *fields):
      import json
      message = json.loads(message)
      for field in fields:
            message.pop(field)
      return  json.dumps(message)




st_env \
      .form_path("source") \
      .select("drop_fields(message,'x')") \
      .insert_into("sink")


message 格式:
{“a”:"1","x","2"}


报错参数类型不匹配:
Actual:(java.lang.String, java.lang.String)
Expected:(org.apache.flink.table.dataformat.BinaryString)


新手入门,请多指教,感谢。

回复