看了infoq介绍flink cdc 3.0文章 
https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, d6 
其中d代表数据变更,s代表schema变更
这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
而Task2处理 d3, d4, d5, s3, d6
这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?


SchemaOperator代码中
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
schemaChangeEvent) {
        // The request will need to send a FlushEvent or block until flushing 
finished
        SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
        if (response.isShouldSendFlushEvent()) {
            LOG.info(
                    "Sending the FlushEvent for table {} in subtask {}.",
                    tableId,
                    getRuntimeContext().getIndexOfThisSubtask());
            output.collect(new StreamRecord<>(new FlushEvent(tableId)));
            output.collect(new StreamRecord<>(schemaChangeEvent));
            // The request will block until flushing finished in each sink 
writer
            requestReleaseUpstream();
        }
    }
为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
还要发送一次SchemaChangeEvent呢?
当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
upstream的呢?
求指教,谢谢!

回复