Hi Neha,

The HOP window will increase the size of the checkpoint and I'm sorry
that I'm not very familiar with the HOP window.

If the configurations are all right, and you want to confirm if it's a HOP
window issue, I think you can submit a flink job without HOP window but
with regular agg operators, and observe whether the checkpoint and
savepoint meet expectations.

Best,
Shammon FY

On Tue, Jul 18, 2023 at 8:25 PM Neha . <neh...@swiggy.in> wrote:

> Hi Shammon,
>
> These configs exist in Flink WebUI. We have set
> exeEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Do
> you think it can create some issues for the HOP(proctime, some interval,
> some interval) and not releasing the state for checkpoints?
> I am really confused about why savepoints are working fine and not
> checkpoints.
>
> On Tue, Jul 18, 2023 at 6:56 AM Shammon FY <zjur...@gmail.com> wrote:
>
>> Hi Neha,
>>
>> I think you can first check whether the options `state.backend` and
>> `state.backend.incremental` you mentioned above exist in
>> `JobManager`->`Configuration` in Flink webui. If they do not exist, you may
>> be using the wrong conf file.
>>
>> Best,
>> Shammon FY
>>
>>
>> On Mon, Jul 17, 2023 at 5:04 PM Neha . <neh...@swiggy.in> wrote:
>>
>>> Hi Shammon,
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>>
>>> This is already set in the Flink-conf. Anything else that should be
>>> taken care of for the incremental checkpointing? Is there any related bug
>>> in Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink
>>> 1.13.6.
>>> What can be the reason for stopped incremental checkpointing?
>>>
>>> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY <zjur...@gmail.com> wrote:
>>>
>>>> Hi Neha,
>>>>
>>>> I noticed that the `Checkpointed Data Size` is always equals to `Full
>>>> Checkpoint Data Size`, I think the job is using full checkpoint instead of
>>>> incremental checkpoint,  you can check it
>>>>
>>>> Best,
>>>> Shammon FY
>>>>
>>>> On Mon, Jul 17, 2023 at 10:25 AM Neha . <neh...@swiggy.in> wrote:
>>>>
>>>>> Hello Shammon,
>>>>>
>>>>> Thank you for your assistance.
>>>>> I have already enabled the incremental checkpointing, Attaching the
>>>>> screenshot. Can you please elaborate on what makes you think it is not
>>>>> enabled, It might hint towards the issue. The problem is checkpoint size 
>>>>> is
>>>>> not going down and keeps on increasing while savepoint size shows the
>>>>> correct behavior of going up and down with the throughput peaks.
>>>>>
>>>>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>>>>>
>>>>>
>>>>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY <zjur...@gmail.com> wrote:
>>>>>
>>>>>> Hi Neha,
>>>>>>
>>>>>> I think it is normal for the data size of a savepoint to be smaller
>>>>>> than the full data of a checkpoint. Flink uses rocksdb to store
>>>>>> checkpointed data, which is an LSM structured storage where the same key
>>>>>> will have multiple version records, while savepoint will traverse all 
>>>>>> keys
>>>>>> and store only one record per key.
>>>>>>
>>>>>> But I noticed that you did not enable incremental checkpoint, which
>>>>>> resulted in each checkpoint saving full data. You can refer to [1] for 
>>>>>> more
>>>>>> detail and turn it on, which will reduce the data size of the checkpoint.
>>>>>>
>>>>>> [1]
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>>>>>> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/*incremental-checkpoints__;Iw!!BeGeivfSdT4o5A!i6xqu0TfnOScUXZ2hWnwv1pOEjBPosucnmXfxDO3762tx0hIlwBc3e0V0ZpxUm4Q4VAPQdSXSY25U1wp$>
>>>>>>
>>>>>> Best,
>>>>>> Shammon FY
>>>>>>
>>>>>>
>>>>>> On Sun, Jul 16, 2023 at 2:30 PM Neha . <neh...@swiggy.in> wrote:
>>>>>>
>>>>>>> Hello  Shammon FY,
>>>>>>>
>>>>>>> It is a production issue for me. Can you please take a look if
>>>>>>> anything can be done?
>>>>>>>
>>>>>>> ---------- Forwarded message ---------
>>>>>>> From: Neha . <neh...@swiggy.in>
>>>>>>> Date: Fri, Jul 14, 2023 at 4:06 PM
>>>>>>> Subject: Checkpoint size smaller than Savepoint size
>>>>>>> To: <user@flink.apache.org>
>>>>>>>
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> According to Flink's documentation, Checkpoints are designed to be
>>>>>>> lightweight. However, in my Flink pipeline, I have observed that the
>>>>>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>>>>>> behavior? What are the possible scenarios that can lead to this 
>>>>>>> situation?
>>>>>>>
>>>>>>> Additionally, I have noticed that the checkpoint size in my
>>>>>>> datastream pipeline continues to grow while the savepoint size behaves 
>>>>>>> as
>>>>>>> expected. Could this be attributed to the usage of Common Table 
>>>>>>> Expressions
>>>>>>> (CTEs) in Flink SQL?
>>>>>>>
>>>>>>> Flink version: 1.16.1
>>>>>>> Incremental checkpointing is enabled.
>>>>>>> StateBackend: RocksDB
>>>>>>> Time Characteristic: Ingestion
>>>>>>>
>>>>>>> SQL:
>>>>>>>
>>>>>>> SELECT
>>>>>>>   *
>>>>>>> from
>>>>>>>   (
>>>>>>>     With Actuals as (
>>>>>>>       SELECT
>>>>>>>         clientOrderId,
>>>>>>>         Cast(
>>>>>>>           ValueFromKeyCacheUDF(
>>>>>>>             concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>>>>>>           ) as INT
>>>>>>>         ) as zoneId,
>>>>>>>         cityId,
>>>>>>>         case
>>>>>>>           when status = 'ASSIGNED' then 1
>>>>>>>           else 0
>>>>>>>         end as acceptance_flag,
>>>>>>>         unicast.proctime
>>>>>>>       FROM
>>>>>>>         order
>>>>>>>         INNER JOIN unicast_df ON unicast.clientOrderId =
>>>>>>> order.order_id
>>>>>>>         AND order.proctime BETWEEN unicast.proctime - interval '70'
>>>>>>> minute
>>>>>>>         AND unicast.proctime + interval '10' minute
>>>>>>>         and unicast.status in ('ASSIGNED', 'REJECTED')
>>>>>>>     ),
>>>>>>>     zone_agg as (
>>>>>>>       select
>>>>>>>         zoneId,
>>>>>>>         (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
>>>>>>>         avg(cityId) as cityId,
>>>>>>>         COUNT(*) as `unicast_count`,
>>>>>>>         proctime() as proctime
>>>>>>>       from
>>>>>>>         Actuals
>>>>>>>       group by
>>>>>>>         HOP(
>>>>>>>           proctime(),
>>>>>>>           interval '5' minute,
>>>>>>>           interval '30' minute
>>>>>>>         ),
>>>>>>>         zoneId
>>>>>>>     ),
>>>>>>>     city_agg as(
>>>>>>>       select
>>>>>>>         cityId,
>>>>>>>         sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
>>>>>>>         proctime() as proctime
>>>>>>>       from
>>>>>>>         Actuals
>>>>>>>       group by
>>>>>>>         HOP(
>>>>>>>           proctime(),
>>>>>>>           interval '5' minute,
>>>>>>>           interval '30' minute
>>>>>>>         ),
>>>>>>>         cityId
>>>>>>>     ),
>>>>>>>     final as (
>>>>>>>       select
>>>>>>>         zone_agg.zoneId,
>>>>>>>         zone_agg.cityId,
>>>>>>>         avg(zone_agg.unicast_count) as unicast_count,
>>>>>>>         avg(zone_agg.zone_quotient) as zone_quotient,
>>>>>>>         avg(city_agg.city_quotient) as city_quotient
>>>>>>>       from
>>>>>>>         city_agg
>>>>>>>         INNER join zone_agg on zone_agg.cityId = city_agg.cityId
>>>>>>>         AND city_agg.proctime BETWEEN zone_agg.proctime - interval
>>>>>>> '60' minute
>>>>>>>         AND zone_agg.proctime
>>>>>>>       group by
>>>>>>>         HOP(
>>>>>>>           proctime(),
>>>>>>>           interval '5' minute,
>>>>>>>           interval '30' minute
>>>>>>>         ),
>>>>>>>         zone_agg.zoneId,
>>>>>>>         zone_agg.cityId
>>>>>>>     ),
>>>>>>>     new_final as (
>>>>>>>       select
>>>>>>>         'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as
>>>>>>> key,
>>>>>>>         zone_quotient,
>>>>>>>         city_quotient,
>>>>>>>         case
>>>>>>>           when unicast_count > 5 then zone_quotient
>>>>>>>           else city_quotient
>>>>>>>         end as `value`
>>>>>>>       from
>>>>>>>         final
>>>>>>>     )
>>>>>>>     select
>>>>>>>       key,
>>>>>>>       case
>>>>>>>         when new_final.`value` > 1 then 1
>>>>>>>         else new_final.`value`
>>>>>>>       end as `value`,
>>>>>>>       zone_quotient,
>>>>>>>       city_quotient
>>>>>>>     from
>>>>>>>       new_final
>>>>>>>   )
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> IMPORTANT NOTICE:  The contents of this email and any attachments
>>>>>>> are confidential in nature and intended solely for the addressee, and 
>>>>>>> are
>>>>>>> subject to the terms and conditions of disclosure as further described
>>>>>>> here: https://www.scd.swiggy.in/nda. If you are not the intended
>>>>>>> recipient or you do not agree to the terms and conditions of disclosure,
>>>>>>> please delete this email immediately, and notify the sender by return
>>>>>>> email. In the event that you continue to access the information herein 
>>>>>>> or
>>>>>>> act upon it in any manner, the terms and conditions shall be deemed
>>>>>>> accepted by you.
>>>>>>
>>>>>>
>>>>>
>>>>> ------------------------------
>>>>> IMPORTANT NOTICE:  The contents of this email and any attachments are
>>>>> confidential in nature and intended solely for the addressee, and are
>>>>> subject to the terms and conditions of disclosure as further described
>>>>> here: https://www.scd.swiggy.in/nda. If you are not the intended
>>>>> recipient or you do not agree to the terms and conditions of disclosure,
>>>>> please delete this email immediately, and notify the sender by return
>>>>> email. In the event that you continue to access the information herein or
>>>>> act upon it in any manner, the terms and conditions shall be deemed
>>>>> accepted by you.
>>>>
>>>>
>>>
>>> ------------------------------
>>> IMPORTANT NOTICE:  The contents of this email and any attachments are
>>> confidential in nature and intended solely for the addressee, and are
>>> subject to the terms and conditions of disclosure as further described
>>> here: https://www.scd.swiggy.in/nda. If you are not the intended
>>> recipient or you do not agree to the terms and conditions of disclosure,
>>> please delete this email immediately, and notify the sender by return
>>> email. In the event that you continue to access the information herein or
>>> act upon it in any manner, the terms and conditions shall be deemed
>>> accepted by you.
>>
>>
>
> ------------------------------
> IMPORTANT NOTICE:  The contents of this email and any attachments are
> confidential in nature and intended solely for the addressee, and are
> subject to the terms and conditions of disclosure as further described
> here: https://www.scd.swiggy.in/nda. If you are not the intended
> recipient or you do not agree to the terms and conditions of disclosure,
> please delete this email immediately, and notify the sender by return
> email. In the event that you continue to access the information herein or
> act upon it in any manner, the terms and conditions shall be deemed
> accepted by you.

Reply via email to