Hi Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的 sink无法处理retract消息。 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。 祝好, Leonard Xu > 在 2020年5月27日,10:23,Benchao Li <[email protected]> 写道: > > 而且你的SQL里面有一部分是会产生retract的: > 这里用的是regular left join,这种join类型是会产生retract结果的。 > > | FROM ( > | SELECT `database`, `table`, > `transaction_type`, `transaction_id`, > | `merchant_id`, `event_time`, `status`, > `reference_id` > | FROM main_table > | LEFT JOIN merchant_table > | ON main_table.reference_id = > merchant_table.transaction_sn > | ) > > > macia kk <[email protected]> 于2020年5月27日周三 上午1:20写道: > >> Hi,各位大佬,谁有空帮我看下这个问题 >> >> Source: Kafka >> SinkL Kafka >> >> 主要逻辑是 *main_table* left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条 >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是 >> >> Error >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: AppendStreamTableSink requires that Table has >> only insert changes. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> >> >> >> >> >> >> >> >> Code >> >> val main_column = "`database`, `table`, `transaction_type`, >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`, >> `status`" >> val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ") >> bsTableEnv.createTemporaryView("main_table", main_table) >> >> val merchant_column = "transaction_sn, user_id" >> val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column >> FROM Keystats_airpay_consumer WHERE `table` LIKE >> 'wallet_id_merchant_db%' ") >> bsTableEnv.createTemporaryView("merchant_table", merchant_table) >> >> bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer >> | SELECT `database`, `table`, >> `transaction_type`, >> | `merchant_id`, `event_time`, `status`, >> | FIRST_VALUE(`transaction_id`) OVER >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED >> PRECEDING) >> | FROM ( >> | SELECT `database`, `table`, >> `transaction_type`, `transaction_id`, >> | `merchant_id`, `event_time`, `status`, >> `reference_id` >> | FROM main_table >> | LEFT JOIN merchant_table >> | ON main_table.reference_id = >> merchant_table.transaction_sn >> | ) >> |""".stripMargin) >> > > > -- > > Best, > Benchao Li
