Hi

??????????????
DataStream broad=env.readFrom(...).broad;
DataStream firstSource=env.readFrom(...);
DataStream secondSource=env.readFrom(...);


DataStream  union=firstSource.union(secondSource);
IterativeStream iterativeStream=union.keyby(...).process(...).iterate();

DataStream  result=iterativeStream.closeWith(
                      
                      
                      
iterativeStream
                      
                      
                      
.keyby(...)
                      
                      
                      
.connect(broad)
                      
                      
                      
.process(...));
result.addSink(...);


??????????????????????????????????????Thanks all




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??8??26??(??????) ????7:18
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: ????????????checkpoint????



Hi
&nbsp;&nbsp; ?????????????? barrier 
??????????????????????????????????????????????????????????
barrier????????????????????????????????????????????????????????????????????????????
Best,
Congxian


Robert.Zhang <[email protected]&gt; ??2020??8??26?????? ????11:43??????

&gt; Hi Congxian,
&gt;
&gt; ????????????????????????????iteration source??barrier??????????????????
&gt; 
??barrier??????????????????????????????????operator??????barrier??????checkpoint??????????????
&gt; 
??????????????????????????????????????????????????????????????????????????operator????????
&gt; ??????????????????????????barrier??checkpoint??????????????
&gt;
&gt;
&gt;
&gt; ????????????????????????????????????????????????????????checkpoint????????
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ---????????---
&gt; *??????:* "Congxian Qiu"<[email protected]&gt;
&gt; *????????:* 2020??8??25??(????) ????5:33
&gt; *??????:* "user-zh"<[email protected]&gt;;
&gt; *????:* Re: ????????????checkpoint????
&gt;
&gt; Hi
&gt;&nbsp;&nbsp;&nbsp; ???? checkpoint 
?????????????????????????????????????????? source 
???????????????????????????????????????? snapshot
&gt; ?? source???? CPU 
??????????????????????????????????????????????????????source ?????? JM ?? rpc 
????????
&gt; snapshot???????????????????????????????????? barrier ????????????
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; Robert.Zhang <[email protected]&gt; ??2020??8??25?????? ????12:58??????
&gt;
&gt; &gt; ????????????????????checkpoint ????????????web?????? iteration 
source??checkpoint??????????????
&gt; &gt; ??????????????iterative
&gt; &gt; 
stream??checkpoint??????????????????????loop??????????????????????????checkpoint????????????????????????
&gt; &gt; 
??????????chandylamport????????????????operator??barrier????????????????????
&gt; &gt; ????????????????????barrier????????????????????????????????????
&gt; &gt;
&gt; &gt; ---????????---
&gt; &gt; ??????: "Congxian Qiu"<[email protected]&amp;gt;
&gt; &gt; ????????: 2020??8??24??(????) ????8:21
&gt; &gt; ??????: "user-zh"<[email protected]&amp;gt;;
&gt; &gt; ????: Re: ????????????checkpoint????
&gt; &gt;
&gt; &gt;
&gt; &gt; Hi
&gt; &gt; &amp;nbsp;&amp;nbsp; ?????? ??Exceeded checkpoint tolerable failure 
threshold?? ????????
&gt; &gt; checkpoint
&gt; &gt; ?????????????????????????????????????????????? checkpoint 
??????????????????[1] ??????????????
&gt; &gt; &amp;nbsp;&amp;nbsp; ?????????????????????? unalign 
checkpoint??????????????????????????????????????
&gt; &gt;
&gt; &gt; [1] https://zhuanlan.zhihu.com/p/87131964
&gt; &gt; Best,
&gt; &gt; Congxian
&gt; &gt;
&gt; &gt;
&gt; &gt; Robert.Zhang <[email protected]&amp;gt; ??2020??8??21?????? 
????6:31??????
&gt; &gt;
&gt; &gt; &amp;gt; Hello all,
&gt; &gt; &amp;gt; ????????????????????iterative stream job
&gt; &gt; &amp;gt; 
????checkpoint??????????????????????????????????????checkpoint????????????
&gt; &gt; &amp;gt; ????state
&gt; 
????????????k??????????????????????org.apache.flink.util.FlinkRuntimeException:
&gt; &gt; &amp;gt; Exceeded checkpoint tolerable failure threshold.??????
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt; ??????????
&gt; &gt; &amp;gt; env.enableCheckpointing(10000, 
CheckpointingMode.EXACTLY_ONCE,
&gt; true);
&gt; &gt; &amp;gt; CheckpointConfig checkpointConfig = 
env.getCheckpointConfig();
&gt; &gt; &amp;gt; checkpointConfig.setCheckpointTimeout(600000);
&gt; &gt; &amp;gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
&gt; &gt; &amp;gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt;
&gt; 
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; &gt; &amp;gt; checkpointConfig.setPreferCheckpointForRecovery(true);
&gt; &gt; &amp;gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
&gt; &gt; &amp;gt; checkpointConfig.enableUnalignedCheckpoints();
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt;
&gt; &gt; &amp;gt; 
??????????????????????????????????????????????????????????????
&gt;

回复