Hi Yun and CongXian

Source??????????kafkaconsumer ??????????map??????????????????


????????????iteration head????processinput??????????????overwrite??
??????????????????????????????????
stream task????????????????mailbox??????????????
????????????????????????????
??????????????overwrite????????????stream task??????????






------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??8??27??(??????) ????9:20
??????:&nbsp;"user-zh"<[email protected]&gt;;
????:&nbsp;"Yun Gao"<[email protected]&gt;;
????:&nbsp;Re: ?????? ????????????checkpoint????



Hi
&nbsp;&nbsp; ?????????????????????????????? ???? ?????? checkpoint 
????????????????????????????????????????????????????Yun
Gao??????????????????????????

Best,
Congxian


Yun Tang <[email protected]&gt; ??2020??8??27?????? ????5:10??????

&gt; Hi Robert
&gt;
&gt; ????????source
&gt; 
firstSource??secondSource??????????????????????????source????????????????[1][2]????checkpoint
&gt; barrier????????????
&gt; 
????????jstack??????????????????????source????task??java??????????????????????????????
&gt;
&gt; [1]
&gt; 
https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916
&gt; [2]
&gt; 
https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92
&gt;
&gt; ????
&gt; ????
&gt; ________________________________
&gt; From: Robert.Zhang <[email protected]&gt;
&gt; Sent: Wednesday, August 26, 2020 22:17
&gt; To: user-zh <[email protected]&gt;
&gt; Subject: ?????? ????????????checkpoint????
&gt;
&gt; Hi
&gt;
&gt;
&gt; ??????????????
&gt; DataStream broad=env.readFrom(...).broad;
&gt; DataStream firstSource=env.readFrom(...);
&gt; DataStream secondSource=env.readFrom(...);
&gt;
&gt;
&gt; DataStream&amp;nbsp; union=firstSource.union(secondSource);
&gt; IterativeStream iterativeStream=union.keyby(...).process(...).iterate();
&gt;
&gt; DataStream&amp;nbsp; result=iterativeStream.closeWith(
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; iterativeStream
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; .keyby(...)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; .connect(broad)
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; .process(...));
&gt; result.addSink(...);
&gt;
&gt;
&gt; ??????????????????????????????????????Thanks all
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "user-zh"
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 <
&gt; [email protected]&amp;gt;;
&gt; ????????:&amp;nbsp;2020??8??26??(??????) ????7:18
&gt; ??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;Re: ????????????checkpoint????
&gt;
&gt;
&gt;
&gt; Hi
&gt; &amp;nbsp;&amp;nbsp; ?????????????? barrier 
??????????????????????????????????????????????????????????
&gt; 
barrier????????????????????????????????????????????????????????????????????????????
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; Robert.Zhang <[email protected]&amp;gt; ??2020??8??26?????? ????11:43??????
&gt;
&gt; &amp;gt; Hi Congxian,
&gt; &amp;gt;
&gt; &amp;gt; ????????????????????????????iteration 
source??barrier??????????????????
&gt; &amp;gt; 
??barrier??????????????????????????????????operator??????barrier??????checkpoint??????????????
&gt; &amp;gt; 
??????????????????????????????????????????????????????????????????????????operator????????
&gt; &amp;gt; ??????????????????????????barrier??checkpoint??????????????
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 
????????????????????????????????????????????????????????checkpoint????????
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ---????????---
&gt; &amp;gt; *??????:* "Congxian Qiu"<[email protected]&amp;gt;
&gt; &amp;gt; *????????:* 2020??8??25??(????) ????5:33
&gt; &amp;gt; *??????:* "user-zh"<[email protected]&amp;gt;;
&gt; &amp;gt; *????:* Re: ????????????checkpoint????
&gt; &amp;gt;
&gt; &amp;gt; Hi
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; ???? checkpoint 
?????????????????????????????????????????? source
&gt; ???????????????????????????????????????? snapshot
&gt; &amp;gt; ?? source???? CPU 
??????????????????????????????????????????????????????source ?????? JM ?? rpc 
????????
&gt; &amp;gt; snapshot???????????????????????????????????? barrier ????????????
&gt; &amp;gt; Best,
&gt; &amp;gt; Congxian
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; Robert.Zhang <[email protected]&amp;gt; ??2020??8??25?????? 
????12:58??????
&gt; &amp;gt;
&gt; &amp;gt; &amp;gt; ????????????????????checkpoint ????????????web?????? 
iteration
&gt; source??checkpoint??????????????
&gt; &amp;gt; &amp;gt; ??????????????iterative
&gt; &amp;gt; &amp;gt;
&gt; 
stream??checkpoint??????????????????????loop??????????????????????????checkpoint????????????????????????
&gt; &amp;gt; &amp;gt; 
??????????chandylamport????????????????operator??barrier????????????????????
&gt; &amp;gt; &amp;gt; 
????????????????????barrier????????????????????????????????????
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; ---????????---
&gt; &amp;gt; &amp;gt; ??????: "Congxian Qiu"<[email protected]&amp;amp;gt;
&gt; &amp;gt; &amp;gt; ????????: 2020??8??24??(????) ????8:21
&gt; &amp;gt; &amp;gt; ??????: "user-zh"<[email protected]&amp;amp;gt;;
&gt; &amp;gt; &amp;gt; ????: Re: ????????????checkpoint????
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Hi
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; ?????? ??Exceeded 
checkpoint tolerable failure
&gt; threshold?? ????????
&gt; &amp;gt; &amp;gt; checkpoint
&gt; &amp;gt; &amp;gt; ?????????????????????????????????????????????? 
checkpoint ??????????????????[1] ??????????????
&gt; &amp;gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp; ?????????????????????? 
unalign
&gt; checkpoint??????????????????????????????????????
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; [1] https://zhuanlan.zhihu.com/p/87131964
&gt; &amp;gt; &amp;gt; Best,
&gt; &amp;gt; &amp;gt; Congxian
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; Robert.Zhang <[email protected]&amp;amp;gt; 
??2020??8??21?????? ????6:31??????
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt; Hello all,
&gt; &amp;gt; &amp;gt; &amp;amp;gt; ????????????????????iterative stream job
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 
????checkpoint??????????????????????????????????????checkpoint????????????
&gt; &amp;gt; &amp;gt; &amp;amp;gt; ????state
&gt; &amp;gt; 
????????????k??????????????????????org.apache.flink.util.FlinkRuntimeException:
&gt; &amp;gt; &amp;gt; &amp;amp;gt; Exceeded checkpoint tolerable failure 
threshold.??????
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt; ??????????
&gt; &amp;gt; &amp;gt; &amp;amp;gt; env.enableCheckpointing(10000,
&gt; CheckpointingMode.EXACTLY_ONCE,
&gt; &amp;gt; true);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; CheckpointConfig checkpointConfig =
&gt; env.getCheckpointConfig();
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 
checkpointConfig.setCheckpointTimeout(600000);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 
checkpointConfig.setMinPauseBetweenCheckpoints(60000);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 
checkpointConfig.setMaxConcurrentCheckpoints(4);
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt;
&gt; &amp;gt;
&gt; 
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 
checkpointConfig.setPreferCheckpointForRecovery(true);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 
checkpointConfig.setTolerableCheckpointFailureNumber(2);
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 
checkpointConfig.enableUnalignedCheckpoints();
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt; &amp;gt; &amp;amp;gt; 
??????????????????????????????????????????????????????????????
&gt; &amp;gt;
&gt;

回复