Hi
??????????????
DataStream broad=env.readFrom(...).broad;
DataStream firstSource=env.readFrom(...);
DataStream secondSource=env.readFrom(...);
DataStream union=firstSource.union(secondSource);
IterativeStream iterativeStream=union.keyby(...).process(...).iterate();
DataStream result=iterativeStream.closeWith(
iterativeStream
.keyby(...)
.connect(broad)
.process(...));
result.addSink(...);
??????????????????????????????????????Thanks all
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??8??26??(??????) ????7:18
??????: "user-zh"<[email protected]>;
????: Re: ????????????checkpoint????
Hi
?????????????? barrier
??????????????????????????????????????????????????????????
barrier????????????????????????????????????????????????????????????????????????????
Best,
Congxian
Robert.Zhang <[email protected]> ??2020??8??26?????? ????11:43??????
> Hi Congxian,
>
> ????????????????????????????iteration source??barrier??????????????????
>
??barrier??????????????????????????????????operator??????barrier??????checkpoint??????????????
>
??????????????????????????????????????????????????????????????????????????operator????????
> ??????????????????????????barrier??checkpoint??????????????
>
>
>
> ????????????????????????????????????????????????????????checkpoint????????
>
>
>
>
>
> ---????????---
> *??????:* "Congxian Qiu"<[email protected]>
> *????????:* 2020??8??25??(????) ????5:33
> *??????:* "user-zh"<[email protected]>;
> *????:* Re: ????????????checkpoint????
>
> Hi
> ???? checkpoint
?????????????????????????????????????????? source
???????????????????????????????????????? snapshot
> ?? source???? CPU
??????????????????????????????????????????????????????source ?????? JM ?? rpc
????????
> snapshot???????????????????????????????????? barrier ????????????
> Best,
> Congxian
>
>
> Robert.Zhang <[email protected]> ??2020??8??25?????? ????12:58??????
>
> > ????????????????????checkpoint ????????????web?????? iteration
source??checkpoint??????????????
> > ??????????????iterative
> >
stream??checkpoint??????????????????????loop??????????????????????????checkpoint????????????????????????
> >
??????????chandylamport????????????????operator??barrier????????????????????
> > ????????????????????barrier????????????????????????????????????
> >
> > ---????????---
> > ??????: "Congxian Qiu"<[email protected]&gt;
> > ????????: 2020??8??24??(????) ????8:21
> > ??????: "user-zh"<[email protected]&gt;;
> > ????: Re: ????????????checkpoint????
> >
> >
> > Hi
> > &nbsp;&nbsp; ?????? ??Exceeded checkpoint tolerable failure
threshold?? ????????
> > checkpoint
> > ?????????????????????????????????????????????? checkpoint
??????????????????[1] ??????????????
> > &nbsp;&nbsp; ?????????????????????? unalign
checkpoint??????????????????????????????????????
> >
> > [1] https://zhuanlan.zhihu.com/p/87131964
> > Best,
> > Congxian
> >
> >
> > Robert.Zhang <[email protected]&gt; ??2020??8??21??????
????6:31??????
> >
> > &gt; Hello all,
> > &gt; ????????????????????iterative stream job
> > &gt;
????checkpoint??????????????????????????????????????checkpoint????????????
> > &gt; ????state
>
????????????k??????????????????????org.apache.flink.util.FlinkRuntimeException:
> > &gt; Exceeded checkpoint tolerable failure threshold.??????
> > &gt;
> > &gt;
> > &gt; ??????????
> > &gt; env.enableCheckpointing(10000,
CheckpointingMode.EXACTLY_ONCE,
> true);
> > &gt; CheckpointConfig checkpointConfig =
env.getCheckpointConfig();
> > &gt; checkpointConfig.setCheckpointTimeout(600000);
> > &gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
> > &gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
> > &gt;
> > &gt;
> >
>
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > &gt; checkpointConfig.setPreferCheckpointForRecovery(true);
> > &gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
> > &gt; checkpointConfig.enableUnalignedCheckpoints();
> > &gt;
> > &gt;
> > &gt;
??????????????????????????????????????????????????????????????
>