Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 Jiabao Sun
Hi,

是的,目前来说会 block 住。
flush + apply schema change 一般来说不会持续太长时间,
且 schema 变更一般来说是低频事件,即使 block 也不会有太大性能影响。

Best,
Jiabao


> 2023年12月28日 12:57,casel.chen  写道:
> 
> 
> 
> 
> 感谢解惑!
> 还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗?
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2023-12-28 01:16:40,"Jiabao Sun"  写道:
>> Hi,
>> 
>>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>>> 还要发送一次SchemaChangeEvent呢?
>> 
>> Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
>> writer,参考 DorisEventSerializer
>> 
>>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>>> upstream的呢?
>> 被 block 的原因是 responseFuture没有 
>> complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 
>> 在没有完成时会 block 住。 
>> 只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
>> waitFlushSuccess的responseFuture 标记为 complete。
>> 参考 
>> SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.
>> 
>> 保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。
>> 
>> Best,
>> Jiabao
>> 
>> [1] 
>> https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit
>> 
>>> 2023年12月27日 22:14,casel.chen  写道:
>>> 
>>> 看了infoq介绍flink cdc 3.0文章 
>>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
>>> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
>>> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
>>> d6 其中d代表数据变更,s代表schema变更
>>> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
>>> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
>>> 而Task2处理 d3, d4, d5, s3, d6
>>> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
>>> 
>>> 
>>> SchemaOperator代码中
>>> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
>>> schemaChangeEvent) {
>>>   // The request will need to send a FlushEvent or block until flushing 
>>> finished
>>>   SchemaChangeResponse response = requestSchemaChange(tableId, 
>>> schemaChangeEvent);
>>>   if (response.isShouldSendFlushEvent()) {
>>>   LOG.info(
>>>   "Sending the FlushEvent for table {} in subtask {}.",
>>>   tableId,
>>>   getRuntimeContext().getIndexOfThisSubtask());
>>>   output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>>>   output.collect(new StreamRecord<>(schemaChangeEvent));
>>>   // The request will block until flushing finished in each sink 
>>> writer
>>>   requestReleaseUpstream();
>>>   }
>>>   }
>>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>>> 还要发送一次SchemaChangeEvent呢?
>>> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
>>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>>> upstream的呢?
>>> 求指教,谢谢!
>>> 



Re:Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 casel.chen



感谢解惑!
还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗?








在 2023-12-28 01:16:40,"Jiabao Sun"  写道:
>Hi,
>
>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>> 还要发送一次SchemaChangeEvent呢?
>
>Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
>writer,参考 DorisEventSerializer
>
>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>> upstream的呢?
>被 block 的原因是 responseFuture没有 
>complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 
>在没有完成时会 block 住。 
>只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
>waitFlushSuccess的responseFuture 标记为 complete。
>参考 
>SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.
>
>保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。
>
>Best,
>Jiabao
>
>[1] 
>https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit
>
>> 2023年12月27日 22:14,casel.chen  写道:
>> 
>> 看了infoq介绍flink cdc 3.0文章 
>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
>> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
>> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
>> d6 其中d代表数据变更,s代表schema变更
>> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
>> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
>> 而Task2处理 d3, d4, d5, s3, d6
>> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
>> 
>> 
>> SchemaOperator代码中
>> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
>> schemaChangeEvent) {
>>// The request will need to send a FlushEvent or block until flushing 
>> finished
>>SchemaChangeResponse response = requestSchemaChange(tableId, 
>> schemaChangeEvent);
>>if (response.isShouldSendFlushEvent()) {
>>LOG.info(
>>"Sending the FlushEvent for table {} in subtask {}.",
>>tableId,
>>getRuntimeContext().getIndexOfThisSubtask());
>>output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>>output.collect(new StreamRecord<>(schemaChangeEvent));
>>// The request will block until flushing finished in each sink 
>> writer
>>requestReleaseUpstream();
>>}
>>}
>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>> 还要发送一次SchemaChangeEvent呢?
>> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>> upstream的呢?
>> 求指教,谢谢!
>> 


Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 Jiabao Sun
Hi,

> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
> 还要发送一次SchemaChangeEvent呢?

Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
writer,参考 DorisEventSerializer

> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
> upstream的呢?
被 block 的原因是 responseFuture没有 complete,在SchemaOperator.sendRequestToCoordinator 
使用 responseFuture.get() 在没有完成时会 block 住。 
只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
waitFlushSuccess的responseFuture 标记为 complete。
参考 
SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.

保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。

Best,
Jiabao

[1] 
https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit

> 2023年12月27日 22:14,casel.chen  写道:
> 
> 看了infoq介绍flink cdc 3.0文章 
> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
> d6 其中d代表数据变更,s代表schema变更
> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
> 而Task2处理 d3, d4, d5, s3, d6
> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
> 
> 
> SchemaOperator代码中
> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
> schemaChangeEvent) {
>// The request will need to send a FlushEvent or block until flushing 
> finished
>SchemaChangeResponse response = requestSchemaChange(tableId, 
> schemaChangeEvent);
>if (response.isShouldSendFlushEvent()) {
>LOG.info(
>"Sending the FlushEvent for table {} in subtask {}.",
>tableId,
>getRuntimeContext().getIndexOfThisSubtask());
>output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>output.collect(new StreamRecord<>(schemaChangeEvent));
>// The request will block until flushing finished in each sink 
> writer
>requestReleaseUpstream();
>}
>}
> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
> 还要发送一次SchemaChangeEvent呢?
> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
> upstream的呢?
> 求指教,谢谢!
> 



flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 casel.chen
看了infoq介绍flink cdc 3.0文章 
https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, d6 
其中d代表数据变更,s代表schema变更
这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
而Task2处理 d3, d4, d5, s3, d6
这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?


SchemaOperator代码中
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
schemaChangeEvent) {
// The request will need to send a FlushEvent or block until flushing 
finished
SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
if (response.isShouldSendFlushEvent()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
tableId,
getRuntimeContext().getIndexOfThisSubtask());
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
output.collect(new StreamRecord<>(schemaChangeEvent));
// The request will block until flushing finished in each sink 
writer
requestReleaseUpstream();
}
}
为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
还要发送一次SchemaChangeEvent呢?
当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
upstream的呢?
求指教,谢谢!