Hi, neha.

Could you share more information:

   1. Which State Backend are you using? If it's RocksDB, is incremental
   checkpointing enabled?
   2. Which specific operator is experiencing an increase in Checkpoint
   data size? (You can check the Checkpoint size changes of different subtasks
   from the Checkpoint History in the Flink UI)
   3. Has there been any change in data flow and input data during this
   time?


On Fri, Jun 23, 2023 at 2:01 PM neha goyal <nehagoy...@gmail.com> wrote:

> Hello,
>
> I have assigned env.getConfig().set("table.exec.state.ttl", "180 s") to my
> table environment. Even after that, I can see continuous growth in
> savepoint size.
>
> I am attaching the screenshot of the job graph and savepoint metric.
> I am also adding the query that I am running on Kafka streams, It is a
> lengthy query. Any help would be highly appreciated.
>
> SELECT
>   *
> from
>   (
>     With Actuals as (
>       SELECT
>         a1.orderId,
>         a1.zoneId,
>         a3.cityId,
>         case
>           when a2.status = 'delivered' then round(
>             CAST(
>               (
>                 Cast(a2.server_time_stamp AS BIGINT) -
> Cast(a1.server_time_stamp AS BIGINT)
>               ) AS DOUBLE
>             ) / CAST(60000 AS DOUBLE),
>             4
>           )
>           when CAST(
>             CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
> DOUBLE
>           ) / CAST(60 * 1000 AS DOUBLE) > cast(lmTime as DOUBLE) then CAST(
>             CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
> DOUBLE
>           ) / CAST(60 * 1000 AS DOUBLE)
>           else null
>         end AS P2D_inclusive,
>         case
>           when a2.status = 'delivered' then round(
>             CAST(
>               (
>                 Cast(a2.server_time_stamp AS BIGINT) -
> Cast(a1.server_time_stamp AS BIGINT)
>               ) AS DOUBLE
>             ) / CAST(60000 AS DOUBLE),
>             4
>           )
>           else null
>         end as P2D_exclusive,
>         cast(lmTime as DOUBLE) as PP2D,
>         case
>           when a2.status = 'delivered' then exp(
>             (
>               (
>                 Cast(a2.server_time_stamp AS BIGINT) - CURRENT_TIMESTAMP
>               ) /(60 * 1000)
>             ) / 100
>           )
>           else 1
>         end as recency_wt,
>         case
>           when a2.status = 'delivered' then 1
>           else 0
>         end as delivered_flag,
>         case
>           when a2.status = 'delivered' then a2.proctime
>           else a1.proctime
>         end as proctime
>       FROM
>         (
>           select
>             distinct orderId,
>             zoneId,
>             server_time_stamp,
>             proctime
>           from
>             my_streamtable
>           where
>             status = 'pickedup'
>         ) a1
>         LEFT JOIN (
>           select
>             distinct orderId,
>             zoneId,
>             status,
>             server_time_stamp,
>             proctime
>           from
>             my_streamtable
>           where
>             status = 'delivered'
>         ) a2 ON a1.orderId = a2.orderId
>         AND a2.proctime BETWEEN a1.proctime - interval '60' minute
>         AND a1.proctime + interval '60' minute
>         INNER JOIN (
>           select
>             distinct orderId,
>             cityId,
>             lmTime,
>             proctime
>           from
>             my_streamtable2
>           where
>             orderId is not null
>         ) a3 ON cast(a1.orderId as VARCHAR) = cast(a3.orderId as VARCHAR)
>         AND a3.proctime BETWEEN a1.proctime - interval '60' minute
>         AND a1.proctime + interval '60' minute
>     ),
>     zone_count as(
>       select
>         zoneId,
>         proctime() as proctime,
>         COUNT(orderId) as counts_inclusive,
>         sum(delivered_flag) as counts_exclusive,
>         AVG(cityId) as cityId
>       from
>         Actuals
>       where
>         P2D_inclusive is not null
>       group by
>         HOP(
>           proctime(),
>           interval '5' minute,
>           interval '60' minute
>         ),
>         zoneId
>     ),
>     zone_agg as (
>       select
>         zoneId,
>         sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
> zone_quotient_inclusive,
>         sum(recency_wt *(P2D_exclusive - PP2D)) / sum(recency_wt) as
> zone_quotient_exclusive,
>         avg(cityId) as cityId,
>         proctime() as proctime
>       from
>         Actuals
>       where
>         P2D_inclusive is not null
>       group by
>         HOP(
>           proctime(),
>           interval '5' minute,
>           interval '60' minute
>         ),
>         zoneId
>     ),
>     city_agg as(
>       select
>         cityId,
>         sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
> city_quotient_inclusive,
>         sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
> city_quotient_exclusive,
>         proctime() as proctime
>       from
>         Actuals
>       where
>         P2D_inclusive is not null
>       group by
>         HOP(
>           proctime(),
>           interval '5' minute,
>           interval '60' minute
>         ),
>         cityId
>     ),
>     final as (
>       select
>         zone_count.zoneId,
>         zone_count.cityId,
>         avg(zone_count.counts_inclusive) as counts_inclusive,
>         avg(zone_count.counts_exclusive) as counts_exclusive,
>         avg(zone_agg.zone_quotient_inclusive) as zone_quotient_inclusive,
>         avg(city_agg.city_quotient_inclusive) as city_quotient_inclusive,
>         avg(zone_agg.zone_quotient_exclusive) as zone_quotient_exclusive,
>         avg(city_agg.city_quotient_exclusive) as city_quotient_exclusive
>       from
>         zone_count
>         INNER join zone_agg on zone_count.zoneId = zone_agg.zoneId
>         AND zone_agg.proctime BETWEEN zone_count.proctime - interval '60'
> minute
>         AND zone_count.proctime
>         INNER join city_agg on zone_count.cityId = city_agg.cityId
>         AND city_agg.proctime BETWEEN zone_count.proctime - interval '60'
> minute
>         AND zone_count.proctime
>       group by
>         HOP(
>           proctime(),
>           interval '5' minute,
>           interval '60' minute
>         ),
>         zone_count.zoneId,
>         zone_count.cityId
>     ),
>     new_final as (
>       select
>         'lm_feedback#' || cast(zoneId as varchar) as key,
>         case
>           when counts_inclusive > 5
>           and zone_quotient_inclusive > zone_quotient_exclusive then
> zone_quotient_inclusive
>           when counts_exclusive > 5 then zone_quotient_exclusive
>           when city_quotient_inclusive > city_quotient_exclusive then
> city_quotient_inclusive
>           else city_quotient_exclusive
>         end as value
>       from
>         final
>     )
>     select
>       key,
>       case
>         when new_final.value > 30 then 30
>         else new_final.value
>       end as value,
>       CURRENT_TIMESTAMP AS win_end,
>       CURRENT_TIMESTAMP - 1800000 AS win_start,
>       CURRENT_TIMESTAMP AS time_stamp
>     from
>       new_final
>   )
>


-- 
Best,
Hangxiang.

Reply via email to