为啥不用天级别的tumble window? 自动就帮你清楚 state 了

On Wed, 6 Jan 2021 at 13:53, 徐州州 <[email protected]> wrote:

> 最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStamp&gt;current_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new
> MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。
> |                  select
> |                       TO_DATE(cast(doi.DeliveryTime as
> String),'yyyy-MM-dd') as  days,
> |                       doi.UserId,
> |                       count(doi.Code) as   SendTime,
> |                       sum(doi.PayAmount / 100) as SendCashcharge,
> |                       sum(doi.PayAmount / 100 - ChargeAmount / 100 +
> UseBalance / 100) as  SendCashuse,
> |                       sum(doi.CashMoney / 100)as  SendCash
> |                    from dwd_order_info doi
> |                    where doi.DeliveryTime &gt;cast(current_date AS
> TIMESTAMP) and doi.OrderType = 29 and doi.Status &gt;= 50 and doi.Status
> <&gt; 60
> |                    group by TO_DATE(cast(doi.DeliveryTime as
> String),'yyyy-MM-dd'), doi.UserId

回复