Hi
notifyCheckpointComplete 是整个 checkpoint 完成后调用的(也就是所有算子都做完了 snapshot,而且
JM 也做完了一些其他的工作),你的需求看上去只是要在算子间做一些顺序操作,这个应该不需要依赖 notifyCheckpointComplete
的,你可以自己写一个逻辑,在 submit 收集到 N 个信号后再做相应的事情。
Best,
Congxian
key lou <[email protected]> 于2020年8月17日周一 上午11:42写道:
> 谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用 snapshot 往下游发的数据,在B 执行
> notifyCheckpointComplete 与 A snapshot 下发的数据到达B 这2者没有必然的先后顺序。
>
> 另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A snapshot 发出的数据
> 到达了B.
>
> 我的场景是 有3个核心算子 start->proccess->submit . 其中 start和 submit 并行度为1, proccess
> 并行度为N, start 会开启一个事务 编号 proccess 用这个事务 编号
> 去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交), submit 收到上游批处理的结果 用 同样的事务编号去提交
>
>
> Congxian Qiu <[email protected]> 于2020年8月17日周一 上午10:42写道:
>
> > Hi
> > 上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
> > 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
> > Best,
> > Congxian
> >
> >
> > key lou <[email protected]> 于2020年8月16日周日 下午9:27写道:
> >
> > > 各位大佬:
> > > 在如下代码中: FCombine 执行snapshot collect 发送数据之后如果不执行sleep 则 FSubmit
> > > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> > > 如果在 FCombine 执行snapshot collect 发送数据之后如果执行sleep,
> > > 在执行 notifyCheckpointComplete 方法时 则就可以收到 snapshot collect 发送的数据。
> > > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> > > notifyCheckpointComplete 之前就会收到 上游 snapshot collect 发送数据(因为 snapshot
> > > collect 在前,广播 barrier 在后,然后下游在收到了 barrier 才会执行 chekpoint
> > > 的相关方法,所以在执行相关方法前 上游 snapshot collect 发出的数据就应该已经到达了下游)。
> > > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
> > >
> > > public class FlinkCheckpointTest {
> > > public static void main(String[] args) throws Exception {
> > > StreamExecutionEnvironment steamEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > steamEnv.enableCheckpointing(1000L*2);
> > > steamEnv
> > > .addSource(new FSource()).setParallelism(4)
> > > .transform("开始事务", Types.STRING,new
> > FStart()).setParallelism(1)
> > > .process(new FCombine()).name("事务预处理").setParallelism(4)
> > > .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> > > ;
> > > steamEnv.execute("test");
> > > }
> > >
> > > static class FSource extends RichParallelSourceFunction<String>{
> > > @Override
> > > public void run(SourceContext<String> sourceContext) throws
> > > Exception {
> > > int I =0;
> > > while (true){
> > > I = I + 1;
> > > sourceContext.collect("thread " +
> > > Thread.currentThread().getId() +"-" +I);
> > > Thread.sleep(1000);
> > > }
> > > }
> > > @Override
> > > public void cancel() {}
> > > }
> > >
> > > static class FStart extends AbstractStreamOperator<String>
> > > implements OneInputStreamOperator<String,String>{
> > > volatile Long ckid = 0L;
> > > @Override
> > > public void processElement(StreamRecord<String> streamRecord)
> > > throws Exception {
> > > log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> > > output.collect(streamRecord);
> > > }
> > > @Override
> > > public void prepareSnapshotPreBarrier(long checkpointId)
> > > throws Exception {
> > > log("开启事务: " + checkpointId);
> > > ckid = checkpointId;
> > > super.prepareSnapshotPreBarrier(checkpointId);
> > > }
> > > }
> > >
> > > static class FCombine extends ProcessFunction<String,String>
> > > implements CheckpointedFunction {
> > > List ls = new ArrayList<String>();
> > > Collector<String> collector =null;
> > > volatile Long ckid = 0L;
> > >
> > > @Override
> > > public void snapshotState(FunctionSnapshotContext
> > > functionSnapshotContext) throws Exception {
> > > StringBuffer sb = new StringBuffer();
> > > ls.forEach(x->{sb.append(x).append(";");});
> > > log("批处理 " + functionSnapshotContext.getCheckpointId() +
> > > ": 时收到数据:" + sb.toString());
> > > Thread.sleep(5*1000);
> > > collector.collect(sb.toString());
> > > ls.clear();
> > > Thread.sleep(5*1000);
> > > //Thread.sleep(20*1000);
> > > }
> > > @Override
> > > public void initializeState(FunctionInitializationContext
> > > functionInitializationContext) throws Exception { }
> > > @Override
> > > public void processElement(String s, Context context,
> > > Collector<String> out) throws Exception {
> > > if(StringUtils.isNotBlank(s)){
> > > ls.add(s);
> > > }
> > > log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> > > ckid);
> > > if(collector ==null){
> > > collector = out;
> > > }
> > > }
> > > }
> > >
> > > static class FSubmit extends RichSinkFunction<String> implements
> > > /* CheckpointedFunction,*/ CheckpointListener {
> > > List ls = new ArrayList<String>();
> > > volatile Long ckid = 0L;
> > > @Override
> > > public void notifyCheckpointComplete(long l) throws Exception {
> > > ckid = l;
> > > StringBuffer sb = new StringBuffer();
> > > ls.forEach(x->{sb.append(x).append("||");});
> > > log("submit checkpoint " + l + " over data:list size" +
> > > ls.size()+ "; detail" + sb.toString());
> > > ls.clear();
> > > }
> > > @Override
> > > public void invoke(String value, Context context) throws
> > Exception
> > > {
> > > if(StringUtils.isNotBlank(value)){
> > > ls.add(value);
> > > }
> > > log("收到数据 :" + value + " list zie:" + ls.size() +
> "..ckid:" +
> > > ckid);
> > > }
> > > }
> > > public static void log(String s){
> > > String name = Thread.currentThread().getName();
> > > System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> > > Date())+":"+name + ":" + s);
> > > }
> > > }
> > >
> >
>