你的理解是对的。你可以尝试下用time windowed join[1],这个不管是什么join类型,结果都是append的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

macia kk <[email protected]> 于2020年5月27日周三 下午6:56写道:

> 感谢 Benchao
>
> 原来如此,我之前用的是 spark structured Streming, 可能模式跟Flink不太一样,它会缓存没有 join 到的数据,直到
> watermark 结束之后才 emit
>
> Flink 新的数据进来跟右边的缓存数据 join, 没有 join 到先发 null,但是这个数据还会缓存, 后边右边如果有新的数据可以 join
> 到左边已经发出去的这条数据,会产生 retract. (我的理解)
>
>
> 那我这种情况有别的解决方案吗?因为我的 Sink (Kafka) 下游是 Druid, 数据会直接 index 后作为查询,不支持 retract
> 场景。
>
>
>
>
> Benchao Li <[email protected]> 于2020年5月27日周三 下午6:32写道:
>
> > 产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
> > 1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
> > 2. 非inner/anti 的join(不包括time interval
> > join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
> > 3. 取latest的去重
> > 4. topn,排名变化需要更新结果
> > 5. window + emit,提前emit的结果需要retract来更新
> >
> > macia kk <[email protected]> 于2020年5月27日周三 下午6:19写道:
> >
> > > 感谢 Benchao 和  Leonard 的回复
> > >
> > > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> > > 出去,但是什么情况下会产生 react 消息呢?
> > >
> > > Leonard Xu <[email protected]> 于2020年5月27日周三 下午3:50写道:
> > >
> > > > Hi
> > > > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > > > sink无法处理retract消息。
> > > > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> > > >
> > > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > > > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> > > >
> > > >
> > > > 祝好,
> > > > Leonard Xu
> > > >
> > > >
> > > >
> > > > > 在 2020年5月27日,10:23,Benchao Li <[email protected]> 写道:
> > > > >
> > > > > 而且你的SQL里面有一部分是会产生retract的:
> > > > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > > > >
> > > > >                           | FROM (
> > > > >                           |    SELECT `database`, `table`,
> > > > > `transaction_type`, `transaction_id`,
> > > > >                           |    `merchant_id`, `event_time`,
> `status`,
> > > > > `reference_id`
> > > > >                           |    FROM main_table
> > > > >                           |    LEFT JOIN merchant_table
> > > > >                           |    ON main_table.reference_id =
> > > > > merchant_table.transaction_sn
> > > > >                           | )
> > > > >
> > > > >
> > > > > macia kk <[email protected]> 于2020年5月27日周三 上午1:20写道:
> > > > >
> > > > >> Hi,各位大佬,谁有空帮我看下这个问题
> > > > >>
> > > > >> Source: Kafka
> > > > >> SinkL Kafka
> > > > >>
> > > > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> > > 函数取第一条
> > > > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > > > >>
> > > > >> Error
> > > > >>
> > > > >> org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > >> method caused an error: AppendStreamTableSink requires that Table
> > has
> > > > >> only insert changes.
> > > > >>    at
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > > > >>    at
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > > > >>    at
> > > > >>
> > >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> Code
> > > > >>
> > > > >>   val main_column = "`database`, `table`, `transaction_type`,
> > > > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > > > >> `status`"
> > > > >>    val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > > > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > > > >>    bsTableEnv.createTemporaryView("main_table", main_table)
> > > > >>
> > > > >>    val merchant_column = "transaction_sn, user_id"
> > > > >>    val merchant_table = bsTableEnv.sqlQuery(s"SELECT
> > $merchant_column
> > > > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > > > >> 'wallet_id_merchant_db%' ")
> > > > >>    bsTableEnv.createTemporaryView("merchant_table",
> merchant_table)
> > > > >>
> > > > >>    bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > > > >>                           | SELECT `database`, `table`,
> > > > >> `transaction_type`,
> > > > >>                           |   `merchant_id`, `event_time`,
> `status`,
> > > > >>                           |    FIRST_VALUE(`transaction_id`) OVER
> > > > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > > > >> PRECEDING)
> > > > >>                           | FROM (
> > > > >>                           |    SELECT `database`, `table`,
> > > > >> `transaction_type`, `transaction_id`,
> > > > >>                           |    `merchant_id`, `event_time`,
> > `status`,
> > > > >> `reference_id`
> > > > >>                           |    FROM main_table
> > > > >>                           |    LEFT JOIN merchant_table
> > > > >>                           |    ON main_table.reference_id =
> > > > >> merchant_table.transaction_sn
> > > > >>                           | )
> > > > >>                           |""".stripMargin)
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > >
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li

回复