Re: flink cdc 3.0 schema evolution原理是怎样的?
Hi, 是的,目前来说会 block 住。 flush + apply schema change 一般来说不会持续太长时间, 且 schema 变更一般来说是低频事件,即使 block 也不会有太大性能影响。 Best, Jiabao > 2023年12月28日 12:57,casel.chen 写道: > > > > > 感谢解惑! > 还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗? > > > > > > > > > 在 2023-12-28 01:16:40,"Jiabao Sun" 写道: >> Hi, >> >>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); >>> 还要发送一次SchemaChangeEvent呢? >> >> Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 >> writer,参考 DorisEventSerializer >> >>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release >>> upstream的呢? >> 被 block 的原因是 responseFuture没有 >> complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() >> 在没有完成时会 block 住。 >> 只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 >> waitFlushSuccess的responseFuture 标记为 complete。 >> 参考 >> SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150. >> >> 保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。 >> >> Best, >> Jiabao >> >> [1] >> https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit >> >>> 2023年12月27日 22:14,casel.chen 写道: >>> >>> 看了infoq介绍flink cdc 3.0文章 >>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. >>> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。 >>> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, >>> d6 其中d代表数据变更,s代表schema变更 >>> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。 >>> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, >>> 而Task2处理 d3, d4, d5, s3, d6 >>> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢? >>> >>> >>> SchemaOperator代码中 >>> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent >>> schemaChangeEvent) { >>> // The request will need to send a FlushEvent or block until flushing >>> finished >>> SchemaChangeResponse response = requestSchemaChange(tableId, >>> schemaChangeEvent); >>> if (response.isShouldSendFlushEvent()) { >>> LOG.info( >>> "Sending the FlushEvent for table {} in subtask {}.", >>> tableId, >>> getRuntimeContext().getIndexOfThisSubtask()); >>> output.collect(new StreamRecord<>(new FlushEvent(tableId))); >>> output.collect(new StreamRecord<>(schemaChangeEvent)); >>> // The request will block until flushing finished in each sink >>> writer >>> requestReleaseUpstream(); >>> } >>> } >>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); >>> 还要发送一次SchemaChangeEvent呢? >>> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么? >>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release >>> upstream的呢? >>> 求指教,谢谢! >>>
Re:Re: flink cdc 3.0 schema evolution原理是怎样的?
感谢解惑! 还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗? 在 2023-12-28 01:16:40,"Jiabao Sun" 写道: >Hi, > >> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); >> 还要发送一次SchemaChangeEvent呢? > >Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 >writer,参考 DorisEventSerializer > >> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release >> upstream的呢? >被 block 的原因是 responseFuture没有 >complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() >在没有完成时会 block 住。 >只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 >waitFlushSuccess的responseFuture 标记为 complete。 >参考 >SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150. > >保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。 > >Best, >Jiabao > >[1] >https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit > >> 2023年12月27日 22:14,casel.chen 写道: >> >> 看了infoq介绍flink cdc 3.0文章 >> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. >> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。 >> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, >> d6 其中d代表数据变更,s代表schema变更 >> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。 >> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, >> 而Task2处理 d3, d4, d5, s3, d6 >> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢? >> >> >> SchemaOperator代码中 >> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent >> schemaChangeEvent) { >>// The request will need to send a FlushEvent or block until flushing >> finished >>SchemaChangeResponse response = requestSchemaChange(tableId, >> schemaChangeEvent); >>if (response.isShouldSendFlushEvent()) { >>LOG.info( >>"Sending the FlushEvent for table {} in subtask {}.", >>tableId, >>getRuntimeContext().getIndexOfThisSubtask()); >>output.collect(new StreamRecord<>(new FlushEvent(tableId))); >>output.collect(new StreamRecord<>(schemaChangeEvent)); >>// The request will block until flushing finished in each sink >> writer >>requestReleaseUpstream(); >>} >>} >> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); >> 还要发送一次SchemaChangeEvent呢? >> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么? >> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release >> upstream的呢? >> 求指教,谢谢! >>
Re: flink cdc 3.0 schema evolution原理是怎样的?
Hi, > 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); > 还要发送一次SchemaChangeEvent呢? Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 writer,参考 DorisEventSerializer > 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release > upstream的呢? 被 block 的原因是 responseFuture没有 complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 在没有完成时会 block 住。 只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 waitFlushSuccess的responseFuture 标记为 complete。 参考 SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150. 保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。 Best, Jiabao [1] https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit > 2023年12月27日 22:14,casel.chen 写道: > > 看了infoq介绍flink cdc 3.0文章 > https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. > evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。 > 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, > d6 其中d代表数据变更,s代表schema变更 > 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。 > 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, > 而Task2处理 d3, d4, d5, s3, d6 > 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢? > > > SchemaOperator代码中 > private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent > schemaChangeEvent) { >// The request will need to send a FlushEvent or block until flushing > finished >SchemaChangeResponse response = requestSchemaChange(tableId, > schemaChangeEvent); >if (response.isShouldSendFlushEvent()) { >LOG.info( >"Sending the FlushEvent for table {} in subtask {}.", >tableId, >getRuntimeContext().getIndexOfThisSubtask()); >output.collect(new StreamRecord<>(new FlushEvent(tableId))); >output.collect(new StreamRecord<>(schemaChangeEvent)); >// The request will block until flushing finished in each sink > writer >requestReleaseUpstream(); >} >} > 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); > 还要发送一次SchemaChangeEvent呢? > 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么? > 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release > upstream的呢? > 求指教,谢谢! >
flink cdc 3.0 schema evolution原理是怎样的?
看了infoq介绍flink cdc 3.0文章 https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, d6 其中d代表数据变更,s代表schema变更 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 而Task2处理 d3, d4, d5, s3, d6 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢? SchemaOperator代码中 private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) { // The request will need to send a FlushEvent or block until flushing finished SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); if (response.isShouldSendFlushEvent()) { LOG.info( "Sending the FlushEvent for table {} in subtask {}.", tableId, getRuntimeContext().getIndexOfThisSubtask()); output.collect(new StreamRecord<>(new FlushEvent(tableId))); output.collect(new StreamRecord<>(schemaChangeEvent)); // The request will block until flushing finished in each sink writer requestReleaseUpstream(); } } 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 还要发送一次SchemaChangeEvent呢? 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么? 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release upstream的呢? 求指教,谢谢!