Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx closed pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one URL: https://github.com/apache/flink/pull/21635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1884161683 Thanks all for the detailed review. Please let me know if any other comments. I will merge it if no other comments beyond two days from now on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
llkj1 commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879978170 感谢您与我们联系。(Thank you for contacting us.) 我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195427714。 想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。 Apple Developer Program Support We’ve received your support request and will get back to you in one to two business days. Your case number is 102195427714. For additional information on development-related topics, visit: https://developer.apple.com/support/ Best regards, Apple Developer Program Support Copyright (c) 2024 Apple Inc. All rights reserved. Contact Us https://developer.apple.com/contact/ Developer https://developer.apple.com/ My Apple ID https://appleid.apple.com Privacy Policy https://www.apple.com/privacy/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
llkj1 commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879977820 感谢您与我们联系。(Thank you for contacting us.) 我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195426596。 想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。 Apple Developer Program Support We’ve received your support request and will get back to you in one to two business days. Your case number is 102195426596. For additional information on development-related topics, visit: https://developer.apple.com/support/ Best regards, Apple Developer Program Support Copyright (c) 2024 Apple Inc. All rights reserved. Contact Us https://developer.apple.com/contact/ Developer https://developer.apple.com/ My Apple ID https://appleid.apple.com Privacy Policy https://www.apple.com/privacy/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
llkj1 commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879977455 感谢您与我们联系。(Thank you for contacting us.) 我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195425319。 想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。 Apple Developer Program Support We’ve received your support request and will get back to you in one to two business days. Your case number is 102195425319. For additional information on development-related topics, visit: https://developer.apple.com/support/ Best regards, Apple Developer Program Support Copyright (c) 2024 Apple Inc. All rights reserved. Contact Us https://developer.apple.com/contact/ Developer https://developer.apple.com/ My Apple ID https://appleid.apple.com Privacy Policy https://www.apple.com/privacy/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
llkj1 commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879977057 感谢您与我们联系。(Thank you for contacting us.) 我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195424272。 想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。 Apple Developer Program Support We’ve received your support request and will get back to you in one to two business days. Your case number is 102195424272. For additional information on development-related topics, visit: https://developer.apple.com/support/ Best regards, Apple Developer Program Support Copyright (c) 2024 Apple Inc. All rights reserved. Contact Us https://developer.apple.com/contact/ Developer https://developer.apple.com/ My Apple ID https://appleid.apple.com Privacy Policy https://www.apple.com/privacy/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
llkj1 commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879976379 感谢您与我们联系。(Thank you for contacting us.) 我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195422930。 想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。 Apple Developer Program Support We’ve received your support request and will get back to you in one to two business days. Your case number is 102195422930. For additional information on development-related topics, visit: https://developer.apple.com/support/ Best regards, Apple Developer Program Support Copyright (c) 2024 Apple Inc. All rights reserved. Contact Us https://developer.apple.com/contact/ Developer https://developer.apple.com/ My Apple ID https://appleid.apple.com Privacy Policy https://www.apple.com/privacy/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
llkj1 commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879975977 感谢您与我们联系。(Thank you for contacting us.) 我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195421822。 想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。 Apple Developer Program Support We’ve received your support request and will get back to you in one to two business days. Your case number is 102195421822. For additional information on development-related topics, visit: https://developer.apple.com/support/ Best regards, Apple Developer Program Support Copyright (c) 2024 Apple Inc. All rights reserved. Contact Us https://developer.apple.com/contact/ Developer https://developer.apple.com/ My Apple ID https://appleid.apple.com Privacy Policy https://www.apple.com/privacy/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
llkj1 commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879975416 感谢您与我们联系。(Thank you for contacting us.) 我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195420550。 想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。 Apple Developer Program Support We’ve received your support request and will get back to you in one to two business days. Your case number is 102195420550. For additional information on development-related topics, visit: https://developer.apple.com/support/ Best regards, Apple Developer Program Support Copyright (c) 2024 Apple Inc. All rights reserved. Contact Us https://developer.apple.com/contact/ Developer https://developer.apple.com/ My Apple ID https://appleid.apple.com Privacy Policy https://www.apple.com/privacy/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879974970 @1996fanrui @Zakelly Thanks for the review. I have updated remaining minor comments. The failed CI about python occurs from yesterday (see other [failed cases](https://dev.azure.com/apache-flink/apache-flink/_build?definitionId=2&_a=summary)) which is not related to this pr. @curcur Could you also help to take a review ? Thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1879934580 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
1996fanrui commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1427799403 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ -TypeSerializerSchemaCompatibility resolveSchemaCompatibility( -TypeSerializer newSerializer); +@Deprecated +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializer newSerializer) { +return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); +} + +/** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializerSnapshot oldSerializerSnapshot) { +return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer()); Review Comment: > `TypeSerializerSnapshot and TypeSerializer` are public API which means they maybe also used in the user code. Then users still have to modify their codes which may break changes. Thanks for the clarification! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
1996fanrui commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1427798414 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ -TypeSerializerSchemaCompatibility resolveSchemaCompatibility( -TypeSerializer newSerializer); +@Deprecated +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializer newSerializer) { +return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); +} + +/** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( Review Comment: Sounds make sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1857189693 > Hi @masteryhx , sorry for the late reply! > > After reading your PR, I finally figured out why the default implementation of old method and new method should call each other. I think we could do something tricky to check whether one of the old and new methods has been implemented in specific `TypeSerializerSnapshot` before it is called. https://stackoverflow.com/a/2315467 this may provide some ideas. WDYT? Thanks for the advice. It's a good idea to avoid infinite loop when users don't implement both methods. I will update the pr to support this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1427497404 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ -TypeSerializerSchemaCompatibility resolveSchemaCompatibility( -TypeSerializer newSerializer); +@Deprecated +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializer newSerializer) { +return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); +} + +/** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializerSnapshot oldSerializerSnapshot) { +return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer()); Review Comment: > IIUC, Flink just calls the new resolveSchemaCompatibility(TypeSerializerSnapshot) method in the future, right? Yes, Flink could make sure this. But IIUC, `TypeSerializerSnapshot` and `TypeSerializer` are public API which means they maybe also used in the user code. Then users still have to modify their codes which may break changes. > BTW, the code comment should guide users or developers to implement the new resolveSchemaCompatibility(TypeSerializerSnapshot) method in the future. Thanks for the advice. That's a good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
1996fanrui commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1427479694 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ -TypeSerializerSchemaCompatibility resolveSchemaCompatibility( -TypeSerializer newSerializer); +@Deprecated +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializer newSerializer) { +return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); +} + +/** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializerSnapshot oldSerializerSnapshot) { +return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer()); Review Comment: After thinking more about it, I'm wondering if we don't need a compatibility implementation for the old `resolveSchemaCompatibility(TypeSerializer)` method. IIUC, Flink just calls the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method in the future, right? If so, we can add a default implementation for the old `resolveSchemaCompatibility(TypeSerializer)` method, the default implementation throwes `UnsupportedException`. We need to add a default implementation because users or developers don't need to implement it in the future. For the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method, you current implementation is fine. All old implementation class can be supported if `resolveSchemaCompatibility(TypeSerializerSnapshot)` calls the old one. BTW, the code comment should guide users or developers to implement the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method in the future. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
Zakelly commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1856242365 Hi @masteryhx , sorry for the late reply! After reading your PR, I finally figured out why the default implementation of old method and new method should call each other. I think we could do something tricky to check whether one of the old and new methods has been implemented in specific `TypeSerializerSnapshot` before it is called. https://stackoverflow.com/a/2315467 this may provide some ideas. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1855044542 > Hi @masteryhx , thanks for the contribution and update! > > I have reviewed this PR and left some comments. Please take a look in your free time, thanks~ > > Also, `The Schema Compatibility` related code is Flink’s infrastructure and is used by many components. I didn't change and read related code before this PR, so I'm not professional in this area. I have reviewed, but it would be great to have more committers who are familiar with this area to help review. It will be helpful for the bug-free. Thanks a lot for the detailed review. We may have some key problems remaing in the comment to discuss. I'll update the pr after we are on the same way about these problems. Also ping @Zakelly @fredia here to help to review and discuss together, Thanks a lot for your time! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1426111500 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java: ## @@ -1244,8 +1244,8 @@ void testStateMigrationAfterChangingTTLFromDisablingToEnabling() { testKeyedValueStateUpgrade( initialAccessDescriptor, newAccessDescriptorAfterRestore)) .satisfiesAnyOf( -e -> assertThat(e).isInstanceOf(IllegalStateException.class), -e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class)); +e -> assertThat(e).isInstanceOf(StateMigrationException.class), +e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } Review Comment: As discussed in [#21635(comment)](https://github.com/apache/flink/pull/21635#discussion_r1426110489), I'd like to also add more state level test after adjusting in a seprate pr. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1426110489 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ -TypeSerializerSchemaCompatibility resolveSchemaCompatibility( -TypeSerializer newSerializer); +@Deprecated +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializer newSerializer) { +return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); +} + +/** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( Review Comment: Yes, you're right. These are all root classes call the `resolveSchemaCompatibility` in state module. So I'd like to prepare a seprate PR to adjust them as I mentioned before. It should not affect the correctness of usage. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1426108037 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ -TypeSerializerSchemaCompatibility resolveSchemaCompatibility( -TypeSerializer newSerializer); +@Deprecated +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializer newSerializer) { +return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); +} + +/** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializerSnapshot oldSerializerSnapshot) { +return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer()); Review Comment: Thanks for the detailed review. IIUC, Only If users has implemented one of the methods (old one or new one), it should work well in current PR. Sorry maybe I missed your point. The case you described is an nested serializer, right ? Whichever method `data1` and `data2` implement, the call should be onesided, otherwise the logic should be error even if we don't have this pr. The reason why we provide two default methods is that users could not use mixed migrated method and non-migrated method as you could saw the discussion [#21635(comment)](https://github.com/apache/flink/pull/21635#discussion_r1086625857) Just cc @Zakelly as we also talked about this problem offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
1996fanrui commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1422192659 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java: ## @@ -1244,8 +1244,8 @@ void testStateMigrationAfterChangingTTLFromDisablingToEnabling() { testKeyedValueStateUpgrade( initialAccessDescriptor, newAccessDescriptorAfterRestore)) .satisfiesAnyOf( -e -> assertThat(e).isInstanceOf(IllegalStateException.class), -e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class)); +e -> assertThat(e).isInstanceOf(StateMigrationException.class), +e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } Review Comment: Would you mind adding one test to check all state backends call the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` instead of old method? ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java: ## @@ -289,6 +292,38 @@ protected void readOuterSnapshot( int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {} +/** + * Checks the schema compatibility of the given old serializer snapshot based on the outer + * snapshot. + * + * The base implementation of this method assumes that the outer serializer only has nested + * serializers and no extra information, and therefore the result of the check is {@link + * OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer serializer contains some + * extra information that has been persisted as part of the serializer snapshot, this must be + * overridden. Note that this method and the corresponding methods {@link + * #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, + * ClassLoader)} needs to be implemented. + * + * @param oldSerializerSnapshot the old serializer snapshot, which contains the old outer + * information to check against. + * @return a {@link OuterSchemaCompatibility} indicating whether or the new serializer's outer Review Comment: ```suggestion * @return a {@link OuterSchemaCompatibility} indicating whether the new serializer's outer ``` ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ -TypeSerializerSchemaCompatibility resolveSchemaCompatibility( -TypeSerializer newSerializer); +@Deprecated +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializer newSerializer) { +return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); +} + +/** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ +default TypeSerializerSchemaCompatibility resolveSchemaCompatibility( +TypeSerializerSnapshot oldSerializerSnapshot) { +return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer()); Review Comment: I'm not sure should we add a default implementation for the new
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1827061892 > Thanks @masteryhx for the contribution! > > I didn't finish the review, and left some minor comments. Please take a look in your free time, thanks~ Thanks a lot for your time! I have updated the pr by adding two commits. I will rebase some commits after the pr is approved. Please take a review again in your free time. Thanks a lot again! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1405573786 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java: ## @@ -34,21 +34,21 @@ public class CompositeTypeSerializerUtil { * can be used by legacy snapshot classes, which have a newer implementation implemented as a * {@link CompositeTypeSerializerSnapshot}. * - * @param newSerializer the new serializer to check for compatibility. + * @param legacySerializerSnapshot the legacy serializer snapshot to check for compatibility. * @param newCompositeSnapshot an instance of the new snapshot class to delegate compatibility * checks to. This instance should already contain the outer snapshot information. * @param legacyNestedSnapshots the nested serializer snapshots of the legacy composite * snapshot. * @return the result compatibility. */ public static TypeSerializerSchemaCompatibility delegateCompatibilityCheckToNewSnapshot( -TypeSerializer newSerializer, -CompositeTypeSerializerSnapshot newCompositeSnapshot, +TypeSerializerSnapshot legacySerializerSnapshot, Review Comment: The `legacy` means it's deprecated and the code is just used for temporary compatibility check as before. It's different from regular compatibility check from old/previous to new one. I have migrated `previous` to `old` for some classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
1996fanrui commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1402925601 ## docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ## @@ -442,4 +445,23 @@ migrate from the old abstractions. The steps to do this is as follows: `TypeSerializerConfigSnapshot` implementation as will as the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer). +## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.18 Review Comment: ```suggestion ## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.19 ``` And this part has a series of 1.18 ## docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ## @@ -345,12 +345,15 @@ public final class GenericArraySerializerSnapshot extends CompositeTypeSerial this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); } -@Override -protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) { -return (this.componentClass == newSerializer.getComponentClass()) -? OuterSchemaCompatibility.COMPATIBLE_AS_IS -: OuterSchemaCompatibility.INCOMPATIBLE; -} + @Override + protected OuterSchemaCompatibility resolveOuterSchemaCompatibility( + TypeSerializerSnapshot oldSerializerSnapshot) { + GenericArraySerializerSnapshot oldGenericArraySerializerSnapshot = + (GenericArraySerializerSnapshot) oldSerializerSnapshot; + return (this.componentClass == oldGenericArraySerializerSnapshot.componentClass) + ? OuterSchemaCompatibility.COMPATIBLE_AS_IS + : OuterSchemaCompatibility.INCOMPATIBLE; + } Review Comment: Same comment: we should use 4 spaces instead of tab. ## docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ## @@ -342,12 +342,15 @@ public final class GenericArraySerializerSnapshot extends CompositeTypeSerial this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); } -@Override -protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) { -return (this.componentClass == newSerializer.getComponentClass()) -? OuterSchemaCompatibility.COMPATIBLE_AS_IS -: OuterSchemaCompatibility.INCOMPATIBLE; -} + @Override + protected OuterSchemaCompatibility resolveOuterSchemaCompatibility( + TypeSerializerSnapshot oldSerializerSnapshot) { + GenericArraySerializerSnapshot oldGenericArraySerializerSnapshot = + (GenericArraySerializerSnapshot) oldSerializerSnapshot; + return (this.componentClass == oldGenericArraySerializerSnapshot.componentClass) + ? OuterSchemaCompatibility.COMPATIBLE_AS_IS + : OuterSchemaCompatibility.INCOMPATIBLE; + } Review Comment: keep the code style are same. We should use 4 spaces instead of tab. ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java: ## @@ -34,21 +34,21 @@ public class CompositeTypeSerializerUtil { * can be used by legacy snapshot classes, which have a newer implementation implemented as a * {@link CompositeTypeSerializerSnapshot}. * - * @param newSerializer the new serializer to check for compatibility. + * @param legacySerializerSnapshot the legacy serializer snapshot to check for compatibility. * @param newCompositeSnapshot an instance of the new snapshot class to delegate compatibility * checks to. This instance should already contain the outer snapshot information. * @param legacyNestedSnapshots the nested serializer snapshots of the legacy composite * snapshot. * @return the result compatibility. */ public static TypeSerializerSchemaCompatibility delegateCompatibilityCheckToNewSnapshot( -TypeSerializer newSerializer, -CompositeTypeSerializerSnapshot newCompositeSnapshot, +TypeSerializerSnapshot legacySerializerSnapshot, Review Comment: It use the `legacy` as the name prefix, and the caller uses `old` as the prefix. I see most of your changes use `old` or `previous` as the prefix, should we unify them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1803157270 rebased. @Zakelly Could you also help to take a look ? Thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org