谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用 snapshot 往下游发的数据,在B 执行 notifyCheckpointComplete 与 A snapshot 下发的数据到达B 这2者没有必然的先后顺序。
另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A snapshot 发出的数据 到达了B. 我的场景是 有3个核心算子 start->proccess->submit . 其中 start和 submit 并行度为1, proccess 并行度为N, start 会开启一个事务 编号 proccess 用这个事务 编号 去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交), submit 收到上游批处理的结果 用 同样的事务编号去提交 Congxian Qiu <[email protected]> 于2020年8月17日周一 上午10:42写道: > Hi > 上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete > 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。 > Best, > Congxian > > > key lou <[email protected]> 于2020年8月16日周日 下午9:27写道: > > > 各位大佬: > > 在如下代码中: FCombine 执行snapshot collect 发送数据之后如果不执行sleep 则 FSubmit > > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。 > > 如果在 FCombine 执行snapshot collect 发送数据之后如果执行sleep, > > 在执行 notifyCheckpointComplete 方法时 则就可以收到 snapshot collect 发送的数据。 > > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行 > > notifyCheckpointComplete 之前就会收到 上游 snapshot collect 发送数据(因为 snapshot > > collect 在前,广播 barrier 在后,然后下游在收到了 barrier 才会执行 chekpoint > > 的相关方法,所以在执行相关方法前 上游 snapshot collect 发出的数据就应该已经到达了下游)。 > > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。 > > > > public class FlinkCheckpointTest { > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment steamEnv = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > steamEnv.enableCheckpointing(1000L*2); > > steamEnv > > .addSource(new FSource()).setParallelism(4) > > .transform("开始事务", Types.STRING,new > FStart()).setParallelism(1) > > .process(new FCombine()).name("事务预处理").setParallelism(4) > > .addSink(new FSubmit()).name("提交事务").setParallelism(1) > > ; > > steamEnv.execute("test"); > > } > > > > static class FSource extends RichParallelSourceFunction<String>{ > > @Override > > public void run(SourceContext<String> sourceContext) throws > > Exception { > > int I =0; > > while (true){ > > I = I + 1; > > sourceContext.collect("thread " + > > Thread.currentThread().getId() +"-" +I); > > Thread.sleep(1000); > > } > > } > > @Override > > public void cancel() {} > > } > > > > static class FStart extends AbstractStreamOperator<String> > > implements OneInputStreamOperator<String,String>{ > > volatile Long ckid = 0L; > > @Override > > public void processElement(StreamRecord<String> streamRecord) > > throws Exception { > > log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid); > > output.collect(streamRecord); > > } > > @Override > > public void prepareSnapshotPreBarrier(long checkpointId) > > throws Exception { > > log("开启事务: " + checkpointId); > > ckid = checkpointId; > > super.prepareSnapshotPreBarrier(checkpointId); > > } > > } > > > > static class FCombine extends ProcessFunction<String,String> > > implements CheckpointedFunction { > > List ls = new ArrayList<String>(); > > Collector<String> collector =null; > > volatile Long ckid = 0L; > > > > @Override > > public void snapshotState(FunctionSnapshotContext > > functionSnapshotContext) throws Exception { > > StringBuffer sb = new StringBuffer(); > > ls.forEach(x->{sb.append(x).append(";");}); > > log("批处理 " + functionSnapshotContext.getCheckpointId() + > > ": 时收到数据:" + sb.toString()); > > Thread.sleep(5*1000); > > collector.collect(sb.toString()); > > ls.clear(); > > Thread.sleep(5*1000); > > //Thread.sleep(20*1000); > > } > > @Override > > public void initializeState(FunctionInitializationContext > > functionInitializationContext) throws Exception { } > > @Override > > public void processElement(String s, Context context, > > Collector<String> out) throws Exception { > > if(StringUtils.isNotBlank(s)){ > > ls.add(s); > > } > > log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" + > > ckid); > > if(collector ==null){ > > collector = out; > > } > > } > > } > > > > static class FSubmit extends RichSinkFunction<String> implements > > /* CheckpointedFunction,*/ CheckpointListener { > > List ls = new ArrayList<String>(); > > volatile Long ckid = 0L; > > @Override > > public void notifyCheckpointComplete(long l) throws Exception { > > ckid = l; > > StringBuffer sb = new StringBuffer(); > > ls.forEach(x->{sb.append(x).append("||");}); > > log("submit checkpoint " + l + " over data:list size" + > > ls.size()+ "; detail" + sb.toString()); > > ls.clear(); > > } > > @Override > > public void invoke(String value, Context context) throws > Exception > > { > > if(StringUtils.isNotBlank(value)){ > > ls.add(value); > > } > > log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" + > > ckid); > > } > > } > > public static void log(String s){ > > String name = Thread.currentThread().getName(); > > System.out.println(new SimpleDateFormat("HH:mm:ss").format(new > > Date())+":"+name + ":" + s); > > } > > } > > >
