感谢 Benchao

原来如此,我之前用的是 spark structured Streming, 可能模式跟Flink不太一样,它会缓存没有 join 到的数据,直到
watermark 结束之后才 emit

Flink 新的数据进来跟右边的缓存数据 join, 没有 join 到先发 null,但是这个数据还会缓存, 后边右边如果有新的数据可以 join
到左边已经发出去的这条数据,会产生 retract. (我的理解)


那我这种情况有别的解决方案吗?因为我的 Sink (Kafka) 下游是 Druid, 数据会直接 index 后作为查询,不支持 retract
场景。




Benchao Li <[email protected]> 于2020年5月27日周三 下午6:32写道:

> 产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
> 1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
> 2. 非inner/anti 的join(不包括time interval
> join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
> 3. 取latest的去重
> 4. topn,排名变化需要更新结果
> 5. window + emit,提前emit的结果需要retract来更新
>
> macia kk <[email protected]> 于2020年5月27日周三 下午6:19写道:
>
> > 感谢 Benchao 和  Leonard 的回复
> >
> > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> > 出去,但是什么情况下会产生 react 消息呢?
> >
> > Leonard Xu <[email protected]> 于2020年5月27日周三 下午3:50写道:
> >
> > > Hi
> > > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > > sink无法处理retract消息。
> > > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> > >
> > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> > >
> > >
> > > 祝好,
> > > Leonard Xu
> > >
> > >
> > >
> > > > 在 2020年5月27日,10:23,Benchao Li <[email protected]> 写道:
> > > >
> > > > 而且你的SQL里面有一部分是会产生retract的:
> > > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > > >
> > > >                           | FROM (
> > > >                           |    SELECT `database`, `table`,
> > > > `transaction_type`, `transaction_id`,
> > > >                           |    `merchant_id`, `event_time`, `status`,
> > > > `reference_id`
> > > >                           |    FROM main_table
> > > >                           |    LEFT JOIN merchant_table
> > > >                           |    ON main_table.reference_id =
> > > > merchant_table.transaction_sn
> > > >                           | )
> > > >
> > > >
> > > > macia kk <[email protected]> 于2020年5月27日周三 上午1:20写道:
> > > >
> > > >> Hi,各位大佬,谁有空帮我看下这个问题
> > > >>
> > > >> Source: Kafka
> > > >> SinkL Kafka
> > > >>
> > > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> > 函数取第一条
> > > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > > >>
> > > >> Error
> > > >>
> > > >> org.apache.flink.client.program.ProgramInvocationException: The main
> > > >> method caused an error: AppendStreamTableSink requires that Table
> has
> > > >> only insert changes.
> > > >>    at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > > >>    at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > > >>    at
> > > >>
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Code
> > > >>
> > > >>   val main_column = "`database`, `table`, `transaction_type`,
> > > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > > >> `status`"
> > > >>    val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > > >>    bsTableEnv.createTemporaryView("main_table", main_table)
> > > >>
> > > >>    val merchant_column = "transaction_sn, user_id"
> > > >>    val merchant_table = bsTableEnv.sqlQuery(s"SELECT
> $merchant_column
> > > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > > >> 'wallet_id_merchant_db%' ")
> > > >>    bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> > > >>
> > > >>    bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > > >>                           | SELECT `database`, `table`,
> > > >> `transaction_type`,
> > > >>                           |   `merchant_id`, `event_time`, `status`,
> > > >>                           |    FIRST_VALUE(`transaction_id`) OVER
> > > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > > >> PRECEDING)
> > > >>                           | FROM (
> > > >>                           |    SELECT `database`, `table`,
> > > >> `transaction_type`, `transaction_id`,
> > > >>                           |    `merchant_id`, `event_time`,
> `status`,
> > > >> `reference_id`
> > > >>                           |    FROM main_table
> > > >>                           |    LEFT JOIN merchant_table
> > > >>                           |    ON main_table.reference_id =
> > > >> merchant_table.transaction_sn
> > > >>                           | )
> > > >>                           |""".stripMargin)
> > > >>
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>

回复