Hi Yun and CongXian
Source??????????kafkaconsumer ??????????map??????????????????
????????????iteration head????processinput??????????????overwrite??
??????????????????????????????????
stream task????????????????mailbox??????????????
????????????????????????????
??????????????overwrite????????????stream task??????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??8??27??(??????) ????9:20
??????: "user-zh"<[email protected]>;
????: "Yun Gao"<[email protected]>;
????: Re: ?????? ????????????checkpoint????
Hi
?????????????????????????????? ???? ?????? checkpoint
????????????????????????????????????????????????????Yun
Gao??????????????????????????
Best,
Congxian
Yun Tang <[email protected]> ??2020??8??27?????? ????5:10??????
> Hi Robert
>
> ????????source
>
firstSource??secondSource??????????????????????????source????????????????[1][2]????checkpoint
> barrier????????????
>
????????jstack??????????????????????source????task??java??????????????????????????????
>
> [1]
>
https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916
> [2]
>
https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92
>
> ????
> ????
> ________________________________
> From: Robert.Zhang <[email protected]>
> Sent: Wednesday, August 26, 2020 22:17
> To: user-zh <[email protected]>
> Subject: ?????? ????????????checkpoint????
>
> Hi
>
>
> ??????????????
> DataStream broad=env.readFrom(...).broad;
> DataStream firstSource=env.readFrom(...);
> DataStream secondSource=env.readFrom(...);
>
>
> DataStream&nbsp; union=firstSource.union(secondSource);
> IterativeStream iterativeStream=union.keyby(...).process(...).iterate();
>
> DataStream&nbsp; result=iterativeStream.closeWith(
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; iterativeStream
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; .keyby(...)
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; .connect(broad)
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; .process(...));
> result.addSink(...);
>
>
> ??????????????????????????????????????Thanks all
>
>
>
>
> ------------------&nbsp;????????&nbsp;------------------
> ??????:
>
"user-zh"
>
<
> [email protected]&gt;;
> ????????:&nbsp;2020??8??26??(??????) ????7:18
> ??????:&nbsp;"user-zh"<[email protected]&gt;;
>
> ????:&nbsp;Re: ????????????checkpoint????
>
>
>
> Hi
> &nbsp;&nbsp; ?????????????? barrier
??????????????????????????????????????????????????????????
>
barrier????????????????????????????????????????????????????????????????????????????
> Best,
> Congxian
>
>
> Robert.Zhang <[email protected]&gt; ??2020??8??26?????? ????11:43??????
>
> &gt; Hi Congxian,
> &gt;
> &gt; ????????????????????????????iteration
source??barrier??????????????????
> &gt;
??barrier??????????????????????????????????operator??????barrier??????checkpoint??????????????
> &gt;
??????????????????????????????????????????????????????????????????????????operator????????
> &gt; ??????????????????????????barrier??checkpoint??????????????
> &gt;
> &gt;
> &gt;
> &gt;
????????????????????????????????????????????????????????checkpoint????????
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ---????????---
> &gt; *??????:* "Congxian Qiu"<[email protected]&gt;
> &gt; *????????:* 2020??8??25??(????) ????5:33
> &gt; *??????:* "user-zh"<[email protected]&gt;;
> &gt; *????:* Re: ????????????checkpoint????
> &gt;
> &gt; Hi
> &gt;&nbsp;&nbsp;&nbsp; ???? checkpoint
?????????????????????????????????????????? source
> ???????????????????????????????????????? snapshot
> &gt; ?? source???? CPU
??????????????????????????????????????????????????????source ?????? JM ?? rpc
????????
> &gt; snapshot???????????????????????????????????? barrier ????????????
> &gt; Best,
> &gt; Congxian
> &gt;
> &gt;
> &gt; Robert.Zhang <[email protected]&gt; ??2020??8??25??????
????12:58??????
> &gt;
> &gt; &gt; ????????????????????checkpoint ????????????web??????
iteration
> source??checkpoint??????????????
> &gt; &gt; ??????????????iterative
> &gt; &gt;
>
stream??checkpoint??????????????????????loop??????????????????????????checkpoint????????????????????????
> &gt; &gt;
??????????chandylamport????????????????operator??barrier????????????????????
> &gt; &gt;
????????????????????barrier????????????????????????????????????
> &gt; &gt;
> &gt; &gt; ---????????---
> &gt; &gt; ??????: "Congxian Qiu"<[email protected]&amp;gt;
> &gt; &gt; ????????: 2020??8??24??(????) ????8:21
> &gt; &gt; ??????: "user-zh"<[email protected]&amp;gt;;
> &gt; &gt; ????: Re: ????????????checkpoint????
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Hi
> &gt; &gt; &amp;nbsp;&amp;nbsp; ?????? ??Exceeded
checkpoint tolerable failure
> threshold?? ????????
> &gt; &gt; checkpoint
> &gt; &gt; ??????????????????????????????????????????????
checkpoint ??????????????????[1] ??????????????
> &gt; &gt; &amp;nbsp;&amp;nbsp; ??????????????????????
unalign
> checkpoint??????????????????????????????????????
> &gt; &gt;
> &gt; &gt; [1] https://zhuanlan.zhihu.com/p/87131964
> &gt; &gt; Best,
> &gt; &gt; Congxian
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Robert.Zhang <[email protected]&amp;gt;
??2020??8??21?????? ????6:31??????
> &gt; &gt;
> &gt; &gt; &amp;gt; Hello all,
> &gt; &gt; &amp;gt; ????????????????????iterative stream job
> &gt; &gt; &amp;gt;
????checkpoint??????????????????????????????????????checkpoint????????????
> &gt; &gt; &amp;gt; ????state
> &gt;
????????????k??????????????????????org.apache.flink.util.FlinkRuntimeException:
> &gt; &gt; &amp;gt; Exceeded checkpoint tolerable failure
threshold.??????
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt; ??????????
> &gt; &gt; &amp;gt; env.enableCheckpointing(10000,
> CheckpointingMode.EXACTLY_ONCE,
> &gt; true);
> &gt; &gt; &amp;gt; CheckpointConfig checkpointConfig =
> env.getCheckpointConfig();
> &gt; &gt; &amp;gt;
checkpointConfig.setCheckpointTimeout(600000);
> &gt; &gt; &amp;gt;
checkpointConfig.setMinPauseBetweenCheckpoints(60000);
> &gt; &gt; &amp;gt;
checkpointConfig.setMaxConcurrentCheckpoints(4);
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt;
> &gt;
>
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> &gt; &gt; &amp;gt;
checkpointConfig.setPreferCheckpointForRecovery(true);
> &gt; &gt; &amp;gt;
checkpointConfig.setTolerableCheckpointFailureNumber(2);
> &gt; &gt; &amp;gt;
checkpointConfig.enableUnalignedCheckpoints();
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
> &gt; &gt; &amp;gt;
??????????????????????????????????????????????????????????????
> &gt;
>