Re: Reducing Checkpoint Count for Chain Operator

2023-02-02 Thread Talat Uyarer via user
Hi Schwalbe,


>- There is no way to have only one file unless you lower the
>parallelism to 1 (= only one subtask)
>
>
Even with single parallelism there are multiple checkpoint files for
chained operators.


>- So which files do you see: 1 “_metadata” + multiple data files (or
>just one)?
>
>
Yes per checkpoint we have a folder with a checkpoint number. That folder
has one metadata file and one file per operator with vertex uuid.


>- The idea of having multiple files is to allow multiple threads to be
>able to stare checkpoints at the same time, and when restarting from a
>checkpoint to consume from more files potentially distributed to multiple
>physical hard driver (more I/O capacity)
>
>
Yes I am well aware of why we have multiple files for operators. But having
a file per operator which is running the same thread in the operator chain
is redundant and increases checkpoint size. I believe the operator chain
driver could handle checkpointing all at once. It would reduce the total
size of the checkpoint. Because all chain operators use the same memory. If
objectreuse is enabled. then they use exact same objects.

Still (out of curiosity) why would you want to have everything in a single
> file?


I dont want to have single files for checkpointing. I want one file per
operator chain group. rather than having multiple files per operator in the
chain. When you have huge parallelism checkpoint size can hit huge numbers
such as GBs. And because of the size of the checkpoint we can not do
frequent checkpointing as much as we want. Chaining Operators are a really
good optimization in terms of memory usage. However it is still lacking in
terms of checkpointing. Today we are using Dataflow. Dataflow has similar
behavior with checkpoint support they call pipeline fusion. [1]

[1]
https://cloud.google.com/dataflow/docs/pipeline-lifecycle#fusion_optimization

Thanks

On Thu, Feb 2, 2023 at 9:25 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Talat Uyarer,
>
>
>
>- There is no way to have only one file unless you lower the
>parallelism to 1 (= only one subtask)
>- So which files do you see: 1 “_metadata” + multiple data files (or
>just one)?
>- The idea of having multiple files is to allow multiple threads to be
>able to stare checkpoints at the same time, and when restarting from a
>checkpoint to consume from more files potentially distributed to multiple
>physical hard driver (more I/O capacity)
>- So in general it is good to have multiple files
>
>
>
> Still (out of curiosity) why would you want to have everything in a single
> file?
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* Talat Uyarer 
> *Sent:* Thursday, February 2, 2023 5:57 PM
> *To:* Schwalbe Matthias 
> *Cc:* Kishore Pola ; weijie guo <
> guoweijieres...@gmail.com>; user@flink.apache.org
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Schwalbe, weijie,
>
>
>
> Thanks for your reply.
>
>
>
>
>- Each state primitive/per subtask stores state into a separate file
>
>
>
> In this picture You can see Operator Chain
> https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Dmaster_fig_tasks-5Fchains.svg&d=DwMGaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=2rvMynf_RRdLppOftxxNXj4d2xIKuDBgRCDui4KEBUMnmC2Xs9FHeVT2u0F5phSC&s=P6gNiFIDFa7RcSbPI7tx8ccw5WLhsi--Y0LU5v2h1Rk&e=>
>
>
>
> Source and Map are in the same chain. Today Flink creates two files for
> that operator chain. When we have OperatorChain, All subtasks are running
> in the same machine, same thread for memory optimization.  However Flink
> creates separate files per subtasks. Our question is whether there is a way
> to have one file not multiple files.
>
>
>
> Thanks
>
>
>
>
>
>
>
> On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
> Hi Kishore,
>
>
>
>
>
> Having followed this thread for a while it is still quite a bit of
> confusion of concepts and in order to help resolve your original we would
> need to know,
>
>- *what makes your observation a problem to be solved?*
>- You write, you have no shuffling, does that mean you don’t use any
>keyBy(), or rebalance()?
>- How do you determine that there are 7 checkpoint, one for each
>operator?
>- In general please relate a bit more details about how 

RE: Reducing Checkpoint Count for Chain Operator

2023-02-02 Thread Schwalbe Matthias
Hi Talat Uyarer,


  *   There is no way to have only one file unless you lower the parallelism to 
1 (= only one subtask)
  *   So which files do you see: 1 “_metadata” + multiple data files (or just 
one)?
  *   The idea of having multiple files is to allow multiple threads to be able 
to stare checkpoints at the same time, and when restarting from a checkpoint to 
consume from more files potentially distributed to multiple physical hard 
driver (more I/O capacity)
  *   So in general it is good to have multiple files

Still (out of curiosity) why would you want to have everything in a single file?

Sincere greetings

Thias


From: Talat Uyarer 
Sent: Thursday, February 2, 2023 5:57 PM
To: Schwalbe Matthias 
Cc: Kishore Pola ; weijie guo 
; user@flink.apache.org
Subject: Re: Reducing Checkpoint Count for Chain Operator

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Schwalbe, weijie,

Thanks for your reply.


  *   Each state primitive/per subtask stores state into a separate file

In this picture You can see Operator Chain 
https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg

Source and Map are in the same chain. Today Flink creates two files for that 
operator chain. When we have OperatorChain, All subtasks are running in the 
same machine, same thread for memory optimization.  However Flink creates 
separate files per subtasks. Our question is whether there is a way to have one 
file not multiple files.

Thanks



On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Hi Kishore,


Having followed this thread for a while it is still quite a bit of confusion of 
concepts and in order to help resolve your original we would need to know,

  *   what makes your observation a problem to be solved?
  *   You write, you have no shuffling, does that mean you don’t use any 
keyBy(), or rebalance()?
  *   How do you determine that there are 7 checkpoint, one for each operator?
  *   In general please relate a bit more details about how you configure state 
primitives: kinds/also operator state?/on all operators/etc.

In general (as Weijie told) checkpointing works like that (simplified):

  *   Jobmanager creates checkpoint mark/barrier in a configured interval
  *   For synchronous checkpointing this flows along with the events through 
the chain of tasks
  *   For asynchronous checkpointing, the checkpointing marker is directly sent 
to the subtasks
  *   A single checkpoint looks like that:

 *   Each state primitive/per subtask stores state into a separate file
 *   At the end jobmager writes a “_metadata” file for the checkpoint 
metadata and for state that is too small to end up in a separate file
 *   i.e. each checkpoint generates only one checkpoint (multiple files) 
not 7

Hope we shed a little light on this

Best regards

Thias



From: Kishore Pola mailto:kishore.p...@hotmail.com>>
Sent: Thursday, February 2, 2023 4:12 AM
To: weijie guo mailto:guoweijieres...@gmail.com>>; 
Talat Uyarer mailto:tuya...@paloaltonetworks.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Weijie,

In our case we do have 7 operators. All the 7 operators are getting executed as 
one chain within a single StreamTask. As checkpoint barrier is passing through 
all the operators, there are 7 checkpoints being stored. So our checkpoint size 
is up by 7 times. We are investigating to see if we can checkpoint the start 
operator (kafka source) or end operator (BQ sink), we are good and check point 
size comes down. Hence the question, when the operators are executed in the 
same StreamTask as one chain, is it possible to checkpoint at operator chain or 
single operator level?

Thanks,
Kishore


From: weijie guo mailto:guoweijieres...@gmail.com>>
Sent: Wednesday, February 1, 2023 6:59 PM
To: Talat Uyarer 
mailto:tuya...@paloaltonetworks.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass 
through all operators in turn. Each stateful operator will do checkpoint in 
this process, the state is managed at operator granularity, not operator chain. 
So what is the significance of checkpoint based on the granularity of operator 
chain?


Best regards,

Weijie


Talat Uyarer 
mailto:tuya...@paloaltonetworks.com>> 
于2023年2月2日周四 02:20写道:
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that are 
executed as one chain within a single StreamTask. But each operator creates 
their own checkpoint at checkpointing time . Rather than creating a checkpoint 
per operator in checkpointing time. Can I have one checkpoint per 
OperatorChain? This is my 

Re: Reducing Checkpoint Count for Chain Operator

2023-02-02 Thread Talat Uyarer via user
Hi Schwalbe, weijie,

Thanks for your reply.


>- Each state primitive/per subtask stores state into a separate file
>
>
In this picture You can see Operator Chain
https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg

Source and Map are in the same chain. Today Flink creates two files for
that operator chain. When we have OperatorChain, All subtasks are running
in the same machine, same thread for memory optimization.  However Flink
creates separate files per subtasks. Our question is whether there is a way
to have one file not multiple files.

Thanks



On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Kishore,
>
>
>
>
>
> Having followed this thread for a while it is still quite a bit of
> confusion of concepts and in order to help resolve your original we would
> need to know,
>
>- *what makes your observation a problem to be solved?*
>- You write, you have no shuffling, does that mean you don’t use any
>keyBy(), or rebalance()?
>- How do you determine that there are 7 checkpoint, one for each
>operator?
>- In general please relate a bit more details about how you configure
>state primitives: kinds/also operator state?/on all operators/etc.
>
>
>
> In general (as Weijie told) checkpointing works like that (simplified):
>
>- Jobmanager creates checkpoint mark/barrier in a configured interval
>- For synchronous checkpointing this flows along with the events
>through the chain of tasks
>- For asynchronous checkpointing, the checkpointing marker is directly
>sent to the subtasks
>- A single checkpoint looks like that:
>   - Each state primitive/per subtask stores state into a separate file
>   - At the end jobmager writes a “_metadata” file for the checkpoint
>   metadata and for state that is too small to end up in a separate file
>   - i.e. each checkpoint generates only one checkpoint (multiple
>   files) not 7
>
>
>
> Hope we shed a little light on this
>
>
>
> Best regards
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Kishore Pola 
> *Sent:* Thursday, February 2, 2023 4:12 AM
> *To:* weijie guo ; Talat Uyarer <
> tuya...@paloaltonetworks.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Weijie,
>
>
>
> In our case we do have 7 operators. All the 7 operators are getting
> executed as one chain within a single StreamTask. As checkpoint barrier is
> passing through all the operators, there are 7 checkpoints being stored. So
> our checkpoint size is up by 7 times. We are investigating to see if we can
> checkpoint the start operator (kafka source) or end operator (BQ sink), we
> are good and check point size comes down. Hence the question, when the
> operators are executed in the same StreamTask as one chain, is it possible
> to checkpoint at operator chain or single operator level?
>
>
>
> Thanks,
>
> Kishore
>
>
> --
>
> *From:* weijie guo 
> *Sent:* Wednesday, February 1, 2023 6:59 PM
> *To:* Talat Uyarer 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Talat,
>
>
>
> In Flink, a checkpoint barrier will be injected from source, and then pass
> through all operators in turn. Each stateful operator will do checkpoint in
> this process, the state is managed at operator granularity, not operator
> chain. So what is the significance of checkpoint based on the granularity
> of operator chain?
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer  于2023年2月2日周四 02:20写道:
>
> Hi Weijie,
>
>
>
> Thanks for replying back.
>
>
>
> Our job is  a streaming job. The OperatorChain contains all operators that
> are executed as one chain within a single StreamTask. But each
> operator creates their own checkpoint at checkpointing time . Rather than
> creating a checkpoint per operator in checkpointing time. Can I have one
> checkpoint per OperatorChain? This is my question.
>
>
>
> Thanks
>
>
>
> On Wed, Feb 1, 2023 at 1:02 AM weijie guo 
> wrote:
>
> Hi Talat,
>
>
>
> Can you elaborate on what it means to create one checkpoint object per
> chain operator more than all operators? If you mean to do checkpoint
> independently for each task, this is not supported.
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer via user  于2023年2月1日周三 15:34写道:
>
> Hi,
>
>
>
> We have a job that is reading from kafka and writing some endpoints. The
> job does 

RE: Reducing Checkpoint Count for Chain Operator

2023-02-01 Thread Schwalbe Matthias
Hi Kishore,


Having followed this thread for a while it is still quite a bit of confusion of 
concepts and in order to help resolve your original we would need to know,

  *   what makes your observation a problem to be solved?
  *   You write, you have no shuffling, does that mean you don’t use any 
keyBy(), or rebalance()?
  *   How do you determine that there are 7 checkpoint, one for each operator?
  *   In general please relate a bit more details about how you configure state 
primitives: kinds/also operator state?/on all operators/etc.

In general (as Weijie told) checkpointing works like that (simplified):

  *   Jobmanager creates checkpoint mark/barrier in a configured interval
  *   For synchronous checkpointing this flows along with the events through 
the chain of tasks
  *   For asynchronous checkpointing, the checkpointing marker is directly sent 
to the subtasks
  *   A single checkpoint looks like that:
 *   Each state primitive/per subtask stores state into a separate file
 *   At the end jobmager writes a “_metadata” file for the checkpoint 
metadata and for state that is too small to end up in a separate file
 *   i.e. each checkpoint generates only one checkpoint (multiple files) 
not 7

Hope we shed a little light on this

Best regards

Thias



From: Kishore Pola 
Sent: Thursday, February 2, 2023 4:12 AM
To: weijie guo ; Talat Uyarer 

Cc: user@flink.apache.org
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Weijie,

In our case we do have 7 operators. All the 7 operators are getting executed as 
one chain within a single StreamTask. As checkpoint barrier is passing through 
all the operators, there are 7 checkpoints being stored. So our checkpoint size 
is up by 7 times. We are investigating to see if we can checkpoint the start 
operator (kafka source) or end operator (BQ sink), we are good and check point 
size comes down. Hence the question, when the operators are executed in the 
same StreamTask as one chain, is it possible to checkpoint at operator chain or 
single operator level?

Thanks,
Kishore


From: weijie guo mailto:guoweijieres...@gmail.com>>
Sent: Wednesday, February 1, 2023 6:59 PM
To: Talat Uyarer 
mailto:tuya...@paloaltonetworks.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass 
through all operators in turn. Each stateful operator will do checkpoint in 
this process, the state is managed at operator granularity, not operator chain. 
So what is the significance of checkpoint based on the granularity of operator 
chain?


Best regards,

Weijie


Talat Uyarer 
mailto:tuya...@paloaltonetworks.com>> 
于2023年2月2日周四 02:20写道:
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that are 
executed as one chain within a single StreamTask. But each operator creates 
their own checkpoint at checkpointing time . Rather than creating a checkpoint 
per operator in checkpointing time. Can I have one checkpoint per 
OperatorChain? This is my question.

Thanks

On Wed, Feb 1, 2023 at 1:02 AM weijie guo 
mailto:guoweijieres...@gmail.com>> wrote:
Hi Talat,

Can you elaborate on what it means to create one checkpoint object per chain 
operator more than all operators? If you mean to do checkpoint independently 
for each task, this is not supported.



Best regards,

Weijie


Talat Uyarer via user mailto:user@flink.apache.org>> 
于2023年2月1日周三 15:34写道:
Hi,

We have a job that is reading from kafka and writing some endpoints. The job 
does not have any shuffling steps.  I implement it with multiple steps.  Flink 
chained those operators in one operator in submission time. However I see all 
operators are doing checkpointing.

Is there any way to create one checkpoint object per chain operator rather than 
all operators ?

Thanks
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments

Re: Reducing Checkpoint Count for Chain Operator

2023-02-01 Thread Kishore Pola
Hi Weijie,

In our case we do have 7 operators. All the 7 operators are getting executed as 
one chain within a single StreamTask. As checkpoint barrier is passing through 
all the operators, there are 7 checkpoints being stored. So our checkpoint size 
is up by 7 times. We are investigating to see if we can checkpoint the start 
operator (kafka source) or end operator (BQ sink), we are good and check point 
size comes down. Hence the question, when the operators are executed in the 
same StreamTask as one chain, is it possible to checkpoint at operator chain or 
single operator level?

Thanks,
Kishore


From: weijie guo 
Sent: Wednesday, February 1, 2023 6:59 PM
To: Talat Uyarer 
Cc: user@flink.apache.org 
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass 
through all operators in turn. Each stateful operator will do checkpoint in 
this process, the state is managed at operator granularity, not operator chain. 
So what is the significance of checkpoint based on the granularity of operator 
chain?


Best regards,

Weijie


Talat Uyarer 
mailto:tuya...@paloaltonetworks.com>> 
于2023年2月2日周四 02:20写道:
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that are 
executed as one chain within a single StreamTask. But each operator creates 
their own checkpoint at checkpointing time . Rather than creating a checkpoint 
per operator in checkpointing time. Can I have one checkpoint per 
OperatorChain? This is my question.

Thanks

On Wed, Feb 1, 2023 at 1:02 AM weijie guo 
mailto:guoweijieres...@gmail.com>> wrote:
Hi Talat,

Can you elaborate on what it means to create one checkpoint object per chain 
operator more than all operators? If you mean to do checkpoint independently 
for each task, this is not supported.


Best regards,

Weijie


Talat Uyarer via user mailto:user@flink.apache.org>> 
于2023年2月1日周三 15:34写道:
Hi,

We have a job that is reading from kafka and writing some endpoints. The job 
does not have any shuffling steps.  I implement it with multiple steps.  Flink 
chained those operators in one operator in submission time. However I see all 
operators are doing checkpointing.

Is there any way to create one checkpoint object per chain operator rather than 
all operators ?

Thanks


Re: Reducing Checkpoint Count for Chain Operator

2023-02-01 Thread weijie guo
Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass
through all operators in turn. Each stateful operator will do checkpoint in
this process, the state is managed at operator granularity, not operator
chain. So what is the significance of checkpoint based on the granularity
of operator chain?

Best regards,

Weijie


Talat Uyarer  于2023年2月2日周四 02:20写道:

> Hi Weijie,
>
> Thanks for replying back.
>
> Our job is  a streaming job. The OperatorChain contains all operators that
> are executed as one chain within a single StreamTask. But each
> operator creates their own checkpoint at checkpointing time . Rather than
> creating a checkpoint per operator in checkpointing time. Can I have one
> checkpoint per OperatorChain? This is my question.
>
> Thanks
>
> On Wed, Feb 1, 2023 at 1:02 AM weijie guo 
> wrote:
>
>> Hi Talat,
>>
>> Can you elaborate on what it means to create one checkpoint object per
>> chain operator more than all operators? If you mean to do checkpoint
>> independently for each task, this is not supported.
>>
>>
>> Best regards,
>>
>> Weijie
>>
>>
>> Talat Uyarer via user  于2023年2月1日周三 15:34写道:
>>
>>> Hi,
>>>
>>> We have a job that is reading from kafka and writing some endpoints. The
>>> job does not have any shuffling steps.  I implement it with multiple
>>> steps.  Flink chained those operators in one operator in submission time.
>>> However I see all operators are doing checkpointing.
>>>
>>> Is there any way to create one checkpoint object per chain operator
>>> rather than all operators ?
>>>
>>> Thanks
>>>
>>


Re: Reducing Checkpoint Count for Chain Operator

2023-02-01 Thread Talat Uyarer via user
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that
are executed as one chain within a single StreamTask. But each
operator creates their own checkpoint at checkpointing time . Rather than
creating a checkpoint per operator in checkpointing time. Can I have one
checkpoint per OperatorChain? This is my question.

Thanks

On Wed, Feb 1, 2023 at 1:02 AM weijie guo  wrote:

> Hi Talat,
>
> Can you elaborate on what it means to create one checkpoint object per
> chain operator more than all operators? If you mean to do checkpoint
> independently for each task, this is not supported.
>
>
> Best regards,
>
> Weijie
>
>
> Talat Uyarer via user  于2023年2月1日周三 15:34写道:
>
>> Hi,
>>
>> We have a job that is reading from kafka and writing some endpoints. The
>> job does not have any shuffling steps.  I implement it with multiple
>> steps.  Flink chained those operators in one operator in submission time.
>> However I see all operators are doing checkpointing.
>>
>> Is there any way to create one checkpoint object per chain operator
>> rather than all operators ?
>>
>> Thanks
>>
>


Re: Reducing Checkpoint Count for Chain Operator

2023-02-01 Thread weijie guo
Hi Talat,

Can you elaborate on what it means to create one checkpoint object per
chain operator more than all operators? If you mean to do checkpoint
independently for each task, this is not supported.


Best regards,

Weijie


Talat Uyarer via user  于2023年2月1日周三 15:34写道:

> Hi,
>
> We have a job that is reading from kafka and writing some endpoints. The
> job does not have any shuffling steps.  I implement it with multiple
> steps.  Flink chained those operators in one operator in submission time.
> However I see all operators are doing checkpointing.
>
> Is there any way to create one checkpoint object per chain operator rather
> than all operators ?
>
> Thanks
>