谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用  snapshot 往下游发的数据,在B 执行
notifyCheckpointComplete 与 A    snapshot 下发的数据到达B   这2者没有必然的先后顺序。

另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A  snapshot  发出的数据
到达了B.

 我的场景是 有3个核心算子  start->proccess->submit . 其中 start和 submit 并行度为1, proccess
并行度为N, start  会开启一个事务 编号    proccess  用这个事务 编号
去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交),  submit  收到上游批处理的结果 用 同样的事务编号去提交


Congxian Qiu <[email protected]> 于2020年8月17日周一 上午10:42写道:

> Hi
>     上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
> 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
> Best,
> Congxian
>
>
> key lou <[email protected]> 于2020年8月16日周日 下午9:27写道:
>
> > 各位大佬:
> >    在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> > 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> > 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> > notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
> >  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> > 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
> >
> > public class FlinkCheckpointTest {
> >     public static void main(String[] args) throws Exception {
> >         StreamExecutionEnvironment steamEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         steamEnv.enableCheckpointing(1000L*2);
> >         steamEnv
> >             .addSource(new FSource()).setParallelism(4)
> >             .transform("开始事务", Types.STRING,new
> FStart()).setParallelism(1)
> >             .process(new FCombine()).name("事务预处理").setParallelism(4)
> >             .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> >         ;
> >         steamEnv.execute("test");
> >     }
> >
> >    static class FSource extends RichParallelSourceFunction<String>{
> >         @Override
> >         public void run(SourceContext<String> sourceContext) throws
> > Exception {
> >             int I =0;
> >             while (true){
> >                 I = I + 1;
> >                 sourceContext.collect("thread " +
> > Thread.currentThread().getId() +"-" +I);
> >                 Thread.sleep(1000);
> >             }
> >         }
> >         @Override
> >         public void cancel() {}
> >     }
> >
> >     static class FStart extends AbstractStreamOperator<String>
> > implements OneInputStreamOperator<String,String>{
> >        volatile Long ckid = 0L;
> >         @Override
> >         public void processElement(StreamRecord<String> streamRecord)
> > throws Exception {
> >             log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> >             output.collect(streamRecord);
> >         }
> >         @Override
> >         public void prepareSnapshotPreBarrier(long checkpointId)
> > throws Exception {
> >             log("开启事务: " + checkpointId);
> >             ckid = checkpointId;
> >             super.prepareSnapshotPreBarrier(checkpointId);
> >         }
> >     }
> >
> >     static class FCombine extends ProcessFunction<String,String>
> > implements CheckpointedFunction {
> >         List ls = new ArrayList<String>();
> >         Collector<String> collector =null;
> >         volatile Long ckid = 0L;
> >
> >         @Override
> >         public void snapshotState(FunctionSnapshotContext
> > functionSnapshotContext) throws Exception {
> >             StringBuffer sb = new StringBuffer();
> >             ls.forEach(x->{sb.append(x).append(";");});
> >             log("批处理 " + functionSnapshotContext.getCheckpointId() +
> > ": 时收到数据:" + sb.toString());
> >             Thread.sleep(5*1000);
> >             collector.collect(sb.toString());
> >             ls.clear();
> >             Thread.sleep(5*1000);
> >             //Thread.sleep(20*1000);
> >         }
> >         @Override
> >         public void initializeState(FunctionInitializationContext
> > functionInitializationContext) throws Exception {        }
> >         @Override
> >         public void processElement(String s, Context context,
> > Collector<String> out) throws Exception {
> >             if(StringUtils.isNotBlank(s)){
> >                 ls.add(s);
> >             }
> >             log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> > ckid);
> >             if(collector ==null){
> >                 collector = out;
> >             }
> >         }
> >     }
> >
> >     static class FSubmit extends RichSinkFunction<String> implements
> > /*  CheckpointedFunction,*/ CheckpointListener {
> >         List ls = new ArrayList<String>();
> >         volatile Long ckid = 0L;
> >         @Override
> >         public void notifyCheckpointComplete(long l) throws Exception {
> >             ckid = l;
> >             StringBuffer sb = new StringBuffer();
> >             ls.forEach(x->{sb.append(x).append("||");});
> >             log("submit checkpoint " + l + " over data:list size" +
> > ls.size()+ "; detail" + sb.toString());
> >             ls.clear();
> >         }
> >         @Override
> >         public void invoke(String value, Context context) throws
> Exception
> > {
> >             if(StringUtils.isNotBlank(value)){
> >                 ls.add(value);
> >             }
> >             log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" +
> > ckid);
> >         }
> >     }
> >     public static void log(String s){
> >         String name = Thread.currentThread().getName();
> >         System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> > Date())+":"+name + ":" + s);
> >     }
> > }
> >
>

回复