如果只是聚合的结果,像sum这种函数,可以先减去原来的值,然后再加上更新后的值。但如果是count(distinct)呢?还是需要把具体的每个值都存起来把。

Benchao Li <libenc...@apache.org> 于2020年9月10日周四 下午3:26写道:

> sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。
> 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。
>
> last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。
>
> lec ssmi <shicheng31...@gmail.com> 于2020年9月10日周四 下午2:35写道:
>
> > 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
> > 感觉底层和 last_value() group by id是一样的。
> >
> > Benchao Li <libenc...@apache.org> 于2020年9月10日周四 上午10:34写道:
> >
> > >
> > >
> >
> 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
> > > 如果还有自己的binlog格式,也可以自定义format来实现。
> > >
> > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为
> > > 1. append / update_after 消息会累加到聚合指标上
> > > 2. delete / update_before 消息会从聚合指标上进行retract
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html
> > > [2]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
> > >
> > > 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:
> > >
> > > > 请问第1点是有实际的案例使用了么?
> > > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> > > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> > > > 谢谢.
> > > >
> > > >
> > > >
> > > >
> > > > ------------------&nbsp;原始邮件&nbsp;------------------
> > > > 发件人:
> > > >                                                   "user-zh"
> > > >                                                                     <
> > > > libenc...@apache.org&gt;;
> > > > 发送时间:&nbsp;2020年9月9日(星期三) 中午1:09
> > > > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> > > >
> > > > 主题:&nbsp;Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
> > > >
> > > >
> > > >
> > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
> > > > 1. 首先版本是1.11+, 可以直接用binlog
> > > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
> > > > &nbsp; 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
> > > > yyy这种,那这个sum指标会自动做好这件事。
> > > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1]
> > > 将append数据流转成retract数据流,这样下游再用同样的
> > > > &nbsp; 聚合逻辑,效果也是一样的。
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> > > >
> > > >
> > > > xuzh <x...@chyjr.com&gt; 于2020年9月8日周二 下午5:56写道:
> > > >
> > > > &gt; 场景:
> > > > &gt; &amp;nbsp; &amp;nbsp;实时统计每天的GMV,但是订单金额是会修改的。
> > > > &gt; &amp;nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
> > > > &gt; &amp;nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka
> > ,GMV实时统计为1000.
> > > > &gt; &amp;nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
> > > > &gt; 这时如果不减去上午已经统计的金额。那么总金额就是错的。&amp;nbsp;&amp;nbsp;
> > > > &gt; 请问是不是根据 update /delete
> > > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
> > > > &gt;
> > > > &gt;
> > > > &gt; &amp;nbsp; 刚入坑实时处理,请大神赐教
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>

回复