感谢 Benchao 和 Leonard 的回复 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit 出去,但是什么情况下会产生 react 消息呢?
Leonard Xu <[email protected]> 于2020年5月27日周三 下午3:50写道: > Hi > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的 > sink无法处理retract消息。 > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误, > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新 > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。 > > > 祝好, > Leonard Xu > > > > > 在 2020年5月27日,10:23,Benchao Li <[email protected]> 写道: > > > > 而且你的SQL里面有一部分是会产生retract的: > > 这里用的是regular left join,这种join类型是会产生retract结果的。 > > > > | FROM ( > > | SELECT `database`, `table`, > > `transaction_type`, `transaction_id`, > > | `merchant_id`, `event_time`, `status`, > > `reference_id` > > | FROM main_table > > | LEFT JOIN merchant_table > > | ON main_table.reference_id = > > merchant_table.transaction_sn > > | ) > > > > > > macia kk <[email protected]> 于2020年5月27日周三 上午1:20写道: > > > >> Hi,各位大佬,谁有空帮我看下这个问题 > >> > >> Source: Kafka > >> SinkL Kafka > >> > >> 主要逻辑是 *main_table* left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条 > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是 > >> > >> Error > >> > >> org.apache.flink.client.program.ProgramInvocationException: The main > >> method caused an error: AppendStreamTableSink requires that Table has > >> only insert changes. > >> at > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > >> at > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > >> at > >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > >> > >> > >> > >> > >> > >> > >> > >> > >> Code > >> > >> val main_column = "`database`, `table`, `transaction_type`, > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`, > >> `status`" > >> val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ") > >> bsTableEnv.createTemporaryView("main_table", main_table) > >> > >> val merchant_column = "transaction_sn, user_id" > >> val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column > >> FROM Keystats_airpay_consumer WHERE `table` LIKE > >> 'wallet_id_merchant_db%' ") > >> bsTableEnv.createTemporaryView("merchant_table", merchant_table) > >> > >> bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer > >> | SELECT `database`, `table`, > >> `transaction_type`, > >> | `merchant_id`, `event_time`, `status`, > >> | FIRST_VALUE(`transaction_id`) OVER > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED > >> PRECEDING) > >> | FROM ( > >> | SELECT `database`, `table`, > >> `transaction_type`, `transaction_id`, > >> | `merchant_id`, `event_time`, `status`, > >> `reference_id` > >> | FROM main_table > >> | LEFT JOIN merchant_table > >> | ON main_table.reference_id = > >> merchant_table.transaction_sn > >> | ) > >> |""".stripMargin) > >> > > > > > > -- > > > > Best, > > Benchao Li > >
