感谢 Benchao 和  Leonard 的回复

我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
出去,但是什么情况下会产生 react 消息呢?

Leonard Xu <[email protected]> 于2020年5月27日周三 下午3:50写道:

> Hi
> Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> sink无法处理retract消息。
> 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
>
> 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
>
>
> 祝好,
> Leonard Xu
>
>
>
> > 在 2020年5月27日,10:23,Benchao Li <[email protected]> 写道:
> >
> > 而且你的SQL里面有一部分是会产生retract的:
> > 这里用的是regular left join,这种join类型是会产生retract结果的。
> >
> >                           | FROM (
> >                           |    SELECT `database`, `table`,
> > `transaction_type`, `transaction_id`,
> >                           |    `merchant_id`, `event_time`, `status`,
> > `reference_id`
> >                           |    FROM main_table
> >                           |    LEFT JOIN merchant_table
> >                           |    ON main_table.reference_id =
> > merchant_table.transaction_sn
> >                           | )
> >
> >
> > macia kk <[email protected]> 于2020年5月27日周三 上午1:20写道:
> >
> >> Hi,各位大佬,谁有空帮我看下这个问题
> >>
> >> Source: Kafka
> >> SinkL Kafka
> >>
> >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
> >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> >>
> >> Error
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: AppendStreamTableSink requires that Table has
> >> only insert changes.
> >>    at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >>    at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >>    at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> Code
> >>
> >>   val main_column = "`database`, `table`, `transaction_type`,
> >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> >> `status`"
> >>    val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> >>    bsTableEnv.createTemporaryView("main_table", main_table)
> >>
> >>    val merchant_column = "transaction_sn, user_id"
> >>    val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
> >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> >> 'wallet_id_merchant_db%' ")
> >>    bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> >>
> >>    bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> >>                           | SELECT `database`, `table`,
> >> `transaction_type`,
> >>                           |   `merchant_id`, `event_time`, `status`,
> >>                           |    FIRST_VALUE(`transaction_id`) OVER
> >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> >> PRECEDING)
> >>                           | FROM (
> >>                           |    SELECT `database`, `table`,
> >> `transaction_type`, `transaction_id`,
> >>                           |    `merchant_id`, `event_time`, `status`,
> >> `reference_id`
> >>                           |    FROM main_table
> >>                           |    LEFT JOIN merchant_table
> >>                           |    ON main_table.reference_id =
> >> merchant_table.transaction_sn
> >>                           | )
> >>                           |""".stripMargin)
> >>
> >
> >
> > --
> >
> > Best,
> > Benchao Li
>
>

回复