[flink] 04/32: [FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new serialization compatibility APIs for key / namespace serializer checks

2019-02-28 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d451b7399ff8c1d9fa8497f366047dcfcae5391a
Author: Tzu-Li (Gordon) Tai 
AuthorDate: Wed Feb 27 12:10:08 2019 +0800

[FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new 
serialization compatibility APIs for key / namespace serializer checks

This commit lets the InternalTimerServiceImpl properly use
TypeSerializerSchemaCompatibility /
TypeSerializerSnapshot#resolveSchemaCompatibility when attempting to
check the compatibility of new key and namespace serializers.

This also fixes the fact that this check was previously broken, in that
the key / namespace serializer was not reassigned to be reconfigured
ones.
---
 .../api/operators/InternalTimerServiceImpl.java| 42 --
 .../InternalTimersSnapshotReaderWriters.java   | 10 --
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index c2088d0..a7a3490 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -19,9 +19,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
@@ -142,26 +141,31 @@ public class InternalTimerServiceImpl implements 
InternalTimerService,
 
// the following is the case where we restore
if (restoredTimersSnapshot != null) {
-   CompatibilityResult 
keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-   this.keyDeserializer,
-   null,
-   
restoredTimersSnapshot.getKeySerializerSnapshot(),
-   keySerializer);
-
-   CompatibilityResult 
namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-   this.namespaceDeserializer,
-   null,
-   
restoredTimersSnapshot.getNamespaceSerializerSnapshot(),
-   namespaceSerializer);
-
-   if 
(keySerializerCompatibility.isRequiresMigration() || 
namespaceSerializerCompatibility.isRequiresMigration()) {
-   throw new IllegalStateException("Tried 
to initialize restored TimerService " +
-   "with incompatible serializers 
than those used to snapshot its state.");
+   TypeSerializerSchemaCompatibility 
keySerializerCompatibility =
+   
restoredTimersSnapshot.getKeySerializerSnapshot().resolveSchemaCompatibility(keySerializer);
+
+   if (keySerializerCompatibility.isIncompatible() 
|| keySerializerCompatibility.isCompatibleAfterMigration()) {
+   throw new IllegalStateException(
+   "Tried to initialize restored 
TimerService with new key serializer that requires migration or is 
incompatible.");
}
+
+   TypeSerializerSchemaCompatibility 
namespaceSerializerCompatibility =
+   
restoredTimersSnapshot.getNamespaceSerializerSnapshot().resolveSchemaCompatibility(namespaceSerializer);
+
+   if 
(namespaceSerializerCompatibility.isIncompatible() || 
namespaceSerializerCompatibility.isCompatibleAfterMigration()) {
+   throw new IllegalStateException(
+   "Tried to initialize restored 
TimerService with new namespace serializer that requires migration or is 
incompatible.");
+   }
+
+   this.keySerializer = 
keySerializerCompatibility.isCompatibleAsIs()
+  

[flink] 04/32: [FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new serialization compatibility APIs for key / namespace serializer checks

2019-02-28 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 14aae5937e27a9df806dc6cce9bcb88c9188802b
Author: Tzu-Li (Gordon) Tai 
AuthorDate: Wed Feb 27 12:10:08 2019 +0800

[FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new 
serialization compatibility APIs for key / namespace serializer checks

This commit lets the InternalTimerServiceImpl properly use
TypeSerializerSchemaCompatibility /
TypeSerializerSnapshot#resolveSchemaCompatibility when attempting to
check the compatibility of new key and namespace serializers.

This also fixes the fact that this check was previously broken, in that
the key / namespace serializer was not reassigned to be reconfigured
ones.
---
 .../api/operators/InternalTimerServiceImpl.java| 42 --
 .../InternalTimersSnapshotReaderWriters.java   | 10 --
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index c2088d0..a7a3490 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -19,9 +19,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
@@ -142,26 +141,31 @@ public class InternalTimerServiceImpl implements 
InternalTimerService,
 
// the following is the case where we restore
if (restoredTimersSnapshot != null) {
-   CompatibilityResult 
keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-   this.keyDeserializer,
-   null,
-   
restoredTimersSnapshot.getKeySerializerSnapshot(),
-   keySerializer);
-
-   CompatibilityResult 
namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-   this.namespaceDeserializer,
-   null,
-   
restoredTimersSnapshot.getNamespaceSerializerSnapshot(),
-   namespaceSerializer);
-
-   if 
(keySerializerCompatibility.isRequiresMigration() || 
namespaceSerializerCompatibility.isRequiresMigration()) {
-   throw new IllegalStateException("Tried 
to initialize restored TimerService " +
-   "with incompatible serializers 
than those used to snapshot its state.");
+   TypeSerializerSchemaCompatibility 
keySerializerCompatibility =
+   
restoredTimersSnapshot.getKeySerializerSnapshot().resolveSchemaCompatibility(keySerializer);
+
+   if (keySerializerCompatibility.isIncompatible() 
|| keySerializerCompatibility.isCompatibleAfterMigration()) {
+   throw new IllegalStateException(
+   "Tried to initialize restored 
TimerService with new key serializer that requires migration or is 
incompatible.");
}
+
+   TypeSerializerSchemaCompatibility 
namespaceSerializerCompatibility =
+   
restoredTimersSnapshot.getNamespaceSerializerSnapshot().resolveSchemaCompatibility(namespaceSerializer);
+
+   if 
(namespaceSerializerCompatibility.isIncompatible() || 
namespaceSerializerCompatibility.isCompatibleAfterMigration()) {
+   throw new IllegalStateException(
+   "Tried to initialize restored 
TimerService with new namespace serializer that requires migration or is 
incompatible.");
+   }
+
+   this.keySerializer = 
keySerializerCompatibility.isCompatibleAsIs()
+