Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-14 Thread via GitHub


masteryhx closed pull request #21635: [FLINK-30613] Improve resolving schema 
compatibility -- Milestone one
URL: https://github.com/apache/flink/pull/21635


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-09 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1884161683

   Thanks all for the detailed review.
   Please let me know if any other comments.
   I will merge it if no other comments beyond two days from now on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


llkj1 commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879978170

   感谢您与我们联系。(Thank you for contacting us.)
   
   
   我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195427714。
   
   想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。
 
   Apple Developer Program Support
   
   
   We’ve received your support request and will get back to you in one to two 
business days. Your case number is 102195427714.
   
   For additional information on development-related topics, visit:
   https://developer.apple.com/support/
   
   Best regards, 
   
   Apple Developer Program Support
  
   
   
   Copyright (c) 2024 Apple Inc. All rights reserved.
   
   Contact Us
   https://developer.apple.com/contact/
   
   Developer
   https://developer.apple.com/
   
   My Apple ID
   https://appleid.apple.com
   
   Privacy Policy
   https://www.apple.com/privacy/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


llkj1 commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879977820

   感谢您与我们联系。(Thank you for contacting us.)
   
   
   我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195426596。
   
   想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。
 
   Apple Developer Program Support
   
   
   We’ve received your support request and will get back to you in one to two 
business days. Your case number is 102195426596.
   
   For additional information on development-related topics, visit:
   https://developer.apple.com/support/
   
   Best regards, 
   
   Apple Developer Program Support
  
   
   
   Copyright (c) 2024 Apple Inc. All rights reserved.
   
   Contact Us
   https://developer.apple.com/contact/
   
   Developer
   https://developer.apple.com/
   
   My Apple ID
   https://appleid.apple.com
   
   Privacy Policy
   https://www.apple.com/privacy/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


llkj1 commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879977455

   感谢您与我们联系。(Thank you for contacting us.)
   
   
   我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195425319。
   
   想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。
 
   Apple Developer Program Support
   
   
   We’ve received your support request and will get back to you in one to two 
business days. Your case number is 102195425319.
   
   For additional information on development-related topics, visit:
   https://developer.apple.com/support/
   
   Best regards, 
   
   Apple Developer Program Support
  
   
   
   Copyright (c) 2024 Apple Inc. All rights reserved.
   
   Contact Us
   https://developer.apple.com/contact/
   
   Developer
   https://developer.apple.com/
   
   My Apple ID
   https://appleid.apple.com
   
   Privacy Policy
   https://www.apple.com/privacy/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


llkj1 commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879977057

   感谢您与我们联系。(Thank you for contacting us.)
   
   
   我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195424272。
   
   想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。
 
   Apple Developer Program Support
   
   
   We’ve received your support request and will get back to you in one to two 
business days. Your case number is 102195424272.
   
   For additional information on development-related topics, visit:
   https://developer.apple.com/support/
   
   Best regards, 
   
   Apple Developer Program Support
  
   
   
   Copyright (c) 2024 Apple Inc. All rights reserved.
   
   Contact Us
   https://developer.apple.com/contact/
   
   Developer
   https://developer.apple.com/
   
   My Apple ID
   https://appleid.apple.com
   
   Privacy Policy
   https://www.apple.com/privacy/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


llkj1 commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879976379

   感谢您与我们联系。(Thank you for contacting us.)
   
   
   我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195422930。
   
   想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。
 
   Apple Developer Program Support
   
   
   We’ve received your support request and will get back to you in one to two 
business days. Your case number is 102195422930.
   
   For additional information on development-related topics, visit:
   https://developer.apple.com/support/
   
   Best regards, 
   
   Apple Developer Program Support
  
   
   
   Copyright (c) 2024 Apple Inc. All rights reserved.
   
   Contact Us
   https://developer.apple.com/contact/
   
   Developer
   https://developer.apple.com/
   
   My Apple ID
   https://appleid.apple.com
   
   Privacy Policy
   https://www.apple.com/privacy/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


llkj1 commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879975977

   感谢您与我们联系。(Thank you for contacting us.)
   
   
   我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195421822。
   
   想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。
 
   Apple Developer Program Support
   
   
   We’ve received your support request and will get back to you in one to two 
business days. Your case number is 102195421822.
   
   For additional information on development-related topics, visit:
   https://developer.apple.com/support/
   
   Best regards, 
   
   Apple Developer Program Support
  
   
   
   Copyright (c) 2024 Apple Inc. All rights reserved.
   
   Contact Us
   https://developer.apple.com/contact/
   
   Developer
   https://developer.apple.com/
   
   My Apple ID
   https://appleid.apple.com
   
   Privacy Policy
   https://www.apple.com/privacy/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


llkj1 commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879975416

   感谢您与我们联系。(Thank you for contacting us.)
   
   
   我们收到了您的请求,并会在一至两个工作日内给您回复。您的案例编号是 102195420550。
   
   想了解更多与开发相关主题的信息,请访问 开发者支持网页 (https://developer.apple.com/support/)。
 
   Apple Developer Program Support
   
   
   We’ve received your support request and will get back to you in one to two 
business days. Your case number is 102195420550.
   
   For additional information on development-related topics, visit:
   https://developer.apple.com/support/
   
   Best regards, 
   
   Apple Developer Program Support
  
   
   
   Copyright (c) 2024 Apple Inc. All rights reserved.
   
   Contact Us
   https://developer.apple.com/contact/
   
   Developer
   https://developer.apple.com/
   
   My Apple ID
   https://appleid.apple.com
   
   Privacy Policy
   https://www.apple.com/privacy/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879974970

   @1996fanrui @Zakelly Thanks for the review. I have updated remaining minor 
comments.
   The failed CI about python occurs from yesterday (see other [failed 
cases](https://dev.azure.com/apache-flink/apache-flink/_build?definitionId=2&_a=summary))
 which is not related to this pr.
   @curcur Could you also help to take a review ? Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2024-01-06 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1879934580

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-15 Thread via GitHub


1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1427799403


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializerSnapshot oldSerializerSnapshot) {
+return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());

Review Comment:
   >  `TypeSerializerSnapshot and TypeSerializer` are public API which means 
they maybe also used in the user code. Then users still have to modify their 
codes which may break changes.
   
   Thanks for the clarification!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-15 Thread via GitHub


1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1427798414


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(

Review Comment:
   Sounds make sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-14 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1857189693

   > Hi @masteryhx , sorry for the late reply!
   > 
   > After reading your PR, I finally figured out why the default 
implementation of old method and new method should call each other. I think we 
could do something tricky to check whether one of the old and new methods has 
been implemented in specific `TypeSerializerSnapshot` before it is called. 
https://stackoverflow.com/a/2315467 this may provide some ideas. WDYT?
   
   Thanks for the advice.
   It's a good idea to avoid infinite loop when users don't implement both 
methods.
   I will update the pr to support this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-14 Thread via GitHub


masteryhx commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1427497404


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializerSnapshot oldSerializerSnapshot) {
+return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());

Review Comment:
   > IIUC, Flink just calls the new 
resolveSchemaCompatibility(TypeSerializerSnapshot) method in the future, right?
   
   Yes, Flink could make sure this.
   But IIUC, `TypeSerializerSnapshot` and `TypeSerializer` are public API which 
means they maybe also used in the user code. Then users still have to modify 
their codes which may break changes.
   
   > BTW, the code comment should guide users or developers to implement the 
new resolveSchemaCompatibility(TypeSerializerSnapshot) method in the future.
   
   Thanks for the advice. That's a good idea.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-14 Thread via GitHub


1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1427479694


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializerSnapshot oldSerializerSnapshot) {
+return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());

Review Comment:
   After thinking more about it, I'm wondering if we don't need a compatibility 
implementation for the old `resolveSchemaCompatibility(TypeSerializer)` method.
   
   IIUC, Flink just calls the new 
`resolveSchemaCompatibility(TypeSerializerSnapshot)` method in the future, 
right?
   
   If so, we can add a default implementation for the old 
`resolveSchemaCompatibility(TypeSerializer)` method, the default implementation 
throwes `UnsupportedException`. We need to add a default implementation because 
users or developers don't need to implement it in the future.
   
   For the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method, you 
current implementation is fine. All old implementation class can be supported 
if `resolveSchemaCompatibility(TypeSerializerSnapshot)` calls the old one.
   
   BTW, the code comment should guide users or developers to implement the new 
`resolveSchemaCompatibility(TypeSerializerSnapshot)` method in the future.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-14 Thread via GitHub


Zakelly commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1856242365

   Hi @masteryhx , sorry for the late reply!
   
   After reading your PR, I finally figured out why the default implementation 
of old method and new method should call each other. I think we could do 
something tricky to check whether one of the old and new methods has been 
implemented in specific `TypeSerializerSnapshot` before it is called. 
https://stackoverflow.com/a/2315467 this may provide some ideas. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-13 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1855044542

   > Hi @masteryhx , thanks for the contribution and update!
   > 
   > I have reviewed this PR and left some comments. Please take a look in your 
free time, thanks~
   > 
   > Also, `The Schema Compatibility` related code is Flink’s infrastructure 
and is used by many components. I didn't change and read related code before 
this PR, so I'm not professional in this area. I have reviewed, but it would be 
great to have more committers who are familiar with this area to help review. 
It will be helpful for the bug-free.
   
   Thanks a lot for the detailed review.
   We may have some key problems remaing in the comment to discuss.
   I'll update the pr after we are on the same way about these problems.
   Also ping @Zakelly @fredia here to help to review and discuss together, 
Thanks a lot for your time!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-13 Thread via GitHub


masteryhx commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1426111500


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java:
##
@@ -1244,8 +1244,8 @@ void 
testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
 testKeyedValueStateUpgrade(
 initialAccessDescriptor, 
newAccessDescriptorAfterRestore))
 .satisfiesAnyOf(
-e -> 
assertThat(e).isInstanceOf(IllegalStateException.class),
-e -> 
assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
+e -> 
assertThat(e).isInstanceOf(StateMigrationException.class),
+e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
 }

Review Comment:
   As discussed in 
[#21635(comment)](https://github.com/apache/flink/pull/21635#discussion_r1426110489),
 I'd like to also add more state level test after adjusting in a seprate pr. 
WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-13 Thread via GitHub


masteryhx commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1426110489


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(

Review Comment:
   Yes, you're right.
   These are all root classes call the `resolveSchemaCompatibility` in state 
module.
   So I'd like to prepare a seprate PR to adjust them as I mentioned before. 
   It should not affect the correctness of usage.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-13 Thread via GitHub


masteryhx commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1426108037


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializerSnapshot oldSerializerSnapshot) {
+return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());

Review Comment:
   Thanks for the detailed review.
   IIUC, Only If users has implemented one of the methods (old one or new one), 
it should work well in current PR.
   Sorry maybe I missed your point. The case you described is an nested 
serializer, right ?
   Whichever method `data1` and `data2` implement, the call should be onesided, 
otherwise the logic should be error even if we don't have this pr.
   
   The reason why we provide two default methods is that users could not use 
mixed migrated method and non-migrated method as you could saw the discussion 
[#21635(comment)](https://github.com/apache/flink/pull/21635#discussion_r1086625857)
   
   Just cc @Zakelly as we also talked about this problem offline. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1422192659


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java:
##
@@ -1244,8 +1244,8 @@ void 
testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
 testKeyedValueStateUpgrade(
 initialAccessDescriptor, 
newAccessDescriptorAfterRestore))
 .satisfiesAnyOf(
-e -> 
assertThat(e).isInstanceOf(IllegalStateException.class),
-e -> 
assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
+e -> 
assertThat(e).isInstanceOf(StateMigrationException.class),
+e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
 }

Review Comment:
   Would you mind adding one test to check all state backends call the new 
`resolveSchemaCompatibility(TypeSerializerSnapshot)` instead of old method?



##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java:
##
@@ -289,6 +292,38 @@ protected void readOuterSnapshot(
 int readOuterSnapshotVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
 throws IOException {}
 
+/**
+ * Checks the schema compatibility of the given old serializer snapshot 
based on the outer
+ * snapshot.
+ *
+ * The base implementation of this method assumes that the outer 
serializer only has nested
+ * serializers and no extra information, and therefore the result of the 
check is {@link
+ * OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer 
serializer contains some
+ * extra information that has been persisted as part of the serializer 
snapshot, this must be
+ * overridden. Note that this method and the corresponding methods {@link
+ * #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, 
DataInputView,
+ * ClassLoader)} needs to be implemented.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot, which 
contains the old outer
+ * information to check against.
+ * @return a {@link OuterSchemaCompatibility} indicating whether or the 
new serializer's outer

Review Comment:
   ```suggestion
* @return a {@link OuterSchemaCompatibility} indicating whether the new 
serializer's outer
   ```



##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
  * program's serializer re-serializes the data, thus converting the format 
during the restore
  * operation.
  *
+ * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+ * #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
  * @param newSerializer the new serializer to check.
  * @return the serializer compatibility result.
  */
-TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
-TypeSerializer newSerializer);
+@Deprecated
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializer newSerializer) {
+return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+}
+
+/**
+ * Checks current serializer's compatibility to read data written by the 
prior serializer.
+ *
+ * When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+ * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+ * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+ * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+ * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+ * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+ * program's serializer re-serializes the data, thus converting the format 
during the restore
+ * operation.
+ *
+ * This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+ * details.
+ *
+ * @param oldSerializerSnapshot the old serializer snapshot to check.
+ * @return the serializer compatibility result.
+ */
+default TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+TypeSerializerSnapshot oldSerializerSnapshot) {
+return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());

Review Comment:
   I'm not sure should we add a default implementation for the new 

Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-11-26 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1827061892

   > Thanks @masteryhx for the contribution!
   > 
   > I didn't finish the review, and left some minor comments. Please take a 
look in your free time, thanks~
   
   Thanks a lot for your time!
   I have updated the pr by adding two commits.
   I will rebase some commits after the pr is approved.
   Please take a review again in your free time. Thanks a lot again!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-11-26 Thread via GitHub


masteryhx commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1405573786


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java:
##
@@ -34,21 +34,21 @@ public class CompositeTypeSerializerUtil {
  * can be used by legacy snapshot classes, which have a newer 
implementation implemented as a
  * {@link CompositeTypeSerializerSnapshot}.
  *
- * @param newSerializer the new serializer to check for compatibility.
+ * @param legacySerializerSnapshot the legacy serializer snapshot to check 
for compatibility.
  * @param newCompositeSnapshot an instance of the new snapshot class to 
delegate compatibility
  * checks to. This instance should already contain the outer snapshot 
information.
  * @param legacyNestedSnapshots the nested serializer snapshots of the 
legacy composite
  * snapshot.
  * @return the result compatibility.
  */
 public static  TypeSerializerSchemaCompatibility 
delegateCompatibilityCheckToNewSnapshot(
-TypeSerializer newSerializer,
-CompositeTypeSerializerSnapshot 
newCompositeSnapshot,
+TypeSerializerSnapshot legacySerializerSnapshot,

Review Comment:
   The `legacy` means it's deprecated and the code is just used for temporary 
compatibility check as before.
   It's different from regular compatibility check from old/previous to new one.
   
   I have migrated `previous` to `old` for some classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-11-23 Thread via GitHub


1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1402925601


##
docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##
@@ -442,4 +445,23 @@ migrate from the old abstractions. The steps to do this is 
as follows:
  `TypeSerializerConfigSnapshot` implementation as will as the
  `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the 
serializer).
 
+## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.18

Review Comment:
   ```suggestion
   ## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.19
   ```
   
   And this part has a series of 1.18



##
docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##
@@ -345,12 +345,15 @@ public final class GenericArraySerializerSnapshot 
extends CompositeTypeSerial
 this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
 }
 
-@Override
-protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer 
newSerializer) {
-return (this.componentClass == newSerializer.getComponentClass())
-? OuterSchemaCompatibility.COMPATIBLE_AS_IS
-: OuterSchemaCompatibility.INCOMPATIBLE;
-}
+   @Override
+   protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
+   TypeSerializerSnapshot oldSerializerSnapshot) {
+   GenericArraySerializerSnapshot 
oldGenericArraySerializerSnapshot =
+   (GenericArraySerializerSnapshot) 
oldSerializerSnapshot;
+   return (this.componentClass == 
oldGenericArraySerializerSnapshot.componentClass)
+   ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+   : OuterSchemaCompatibility.INCOMPATIBLE;
+   }

Review Comment:
   Same comment: we should use 4 spaces instead of tab.



##
docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##
@@ -342,12 +342,15 @@ public final class GenericArraySerializerSnapshot 
extends CompositeTypeSerial
 this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
 }
 
-@Override
-protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer 
newSerializer) {
-return (this.componentClass == newSerializer.getComponentClass())
-? OuterSchemaCompatibility.COMPATIBLE_AS_IS
-: OuterSchemaCompatibility.INCOMPATIBLE;
-}
+   @Override
+   protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
+   TypeSerializerSnapshot oldSerializerSnapshot) {
+   GenericArraySerializerSnapshot 
oldGenericArraySerializerSnapshot =
+   (GenericArraySerializerSnapshot) 
oldSerializerSnapshot;
+   return (this.componentClass == 
oldGenericArraySerializerSnapshot.componentClass)
+   ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
+   : OuterSchemaCompatibility.INCOMPATIBLE;
+   }

Review Comment:
   keep the code style are same. We should use 4 spaces instead of tab.



##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java:
##
@@ -34,21 +34,21 @@ public class CompositeTypeSerializerUtil {
  * can be used by legacy snapshot classes, which have a newer 
implementation implemented as a
  * {@link CompositeTypeSerializerSnapshot}.
  *
- * @param newSerializer the new serializer to check for compatibility.
+ * @param legacySerializerSnapshot the legacy serializer snapshot to check 
for compatibility.
  * @param newCompositeSnapshot an instance of the new snapshot class to 
delegate compatibility
  * checks to. This instance should already contain the outer snapshot 
information.
  * @param legacyNestedSnapshots the nested serializer snapshots of the 
legacy composite
  * snapshot.
  * @return the result compatibility.
  */
 public static  TypeSerializerSchemaCompatibility 
delegateCompatibilityCheckToNewSnapshot(
-TypeSerializer newSerializer,
-CompositeTypeSerializerSnapshot 
newCompositeSnapshot,
+TypeSerializerSnapshot legacySerializerSnapshot,

Review Comment:
   It use the `legacy` as the name prefix, and the caller uses `old` as the 
prefix.
   
   I see most of your changes use `old` or `previous` as the prefix, should we 
unify them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this 

Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]

2023-11-08 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1803157270

   rebased.
   @Zakelly Could you also help to take a look ? Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org