[flink] 04/32: [FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new serialization compatibility APIs for key / namespace serializer checks
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git commit d451b7399ff8c1d9fa8497f366047dcfcae5391a Author: Tzu-Li (Gordon) Tai AuthorDate: Wed Feb 27 12:10:08 2019 +0800 [FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new serialization compatibility APIs for key / namespace serializer checks This commit lets the InternalTimerServiceImpl properly use TypeSerializerSchemaCompatibility / TypeSerializerSnapshot#resolveSchemaCompatibility when attempting to check the compatibility of new key and namespace serializers. This also fixes the fact that this check was previously broken, in that the key / namespace serializer was not reassigned to be reconfigured ones. --- .../api/operators/InternalTimerServiceImpl.java| 42 -- .../InternalTimersSnapshotReaderWriters.java | 10 -- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index c2088d0..a7a3490 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -19,9 +19,8 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; @@ -142,26 +141,31 @@ public class InternalTimerServiceImpl implements InternalTimerService, // the following is the case where we restore if (restoredTimersSnapshot != null) { - CompatibilityResult keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( - this.keyDeserializer, - null, - restoredTimersSnapshot.getKeySerializerSnapshot(), - keySerializer); - - CompatibilityResult namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( - this.namespaceDeserializer, - null, - restoredTimersSnapshot.getNamespaceSerializerSnapshot(), - namespaceSerializer); - - if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) { - throw new IllegalStateException("Tried to initialize restored TimerService " + - "with incompatible serializers than those used to snapshot its state."); + TypeSerializerSchemaCompatibility keySerializerCompatibility = + restoredTimersSnapshot.getKeySerializerSnapshot().resolveSchemaCompatibility(keySerializer); + + if (keySerializerCompatibility.isIncompatible() || keySerializerCompatibility.isCompatibleAfterMigration()) { + throw new IllegalStateException( + "Tried to initialize restored TimerService with new key serializer that requires migration or is incompatible."); } + + TypeSerializerSchemaCompatibility namespaceSerializerCompatibility = + restoredTimersSnapshot.getNamespaceSerializerSnapshot().resolveSchemaCompatibility(namespaceSerializer); + + if (namespaceSerializerCompatibility.isIncompatible() || namespaceSerializerCompatibility.isCompatibleAfterMigration()) { + throw new IllegalStateException( + "Tried to initialize restored TimerService with new namespace serializer that requires migration or is incompatible."); + } + + this.keySerializer = keySerializerCompatibility.isCompatibleAsIs() +
[flink] 04/32: [FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new serialization compatibility APIs for key / namespace serializer checks
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 14aae5937e27a9df806dc6cce9bcb88c9188802b Author: Tzu-Li (Gordon) Tai AuthorDate: Wed Feb 27 12:10:08 2019 +0800 [FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new serialization compatibility APIs for key / namespace serializer checks This commit lets the InternalTimerServiceImpl properly use TypeSerializerSchemaCompatibility / TypeSerializerSnapshot#resolveSchemaCompatibility when attempting to check the compatibility of new key and namespace serializers. This also fixes the fact that this check was previously broken, in that the key / namespace serializer was not reassigned to be reconfigured ones. --- .../api/operators/InternalTimerServiceImpl.java| 42 -- .../InternalTimersSnapshotReaderWriters.java | 10 -- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index c2088d0..a7a3490 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -19,9 +19,8 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; @@ -142,26 +141,31 @@ public class InternalTimerServiceImpl implements InternalTimerService, // the following is the case where we restore if (restoredTimersSnapshot != null) { - CompatibilityResult keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( - this.keyDeserializer, - null, - restoredTimersSnapshot.getKeySerializerSnapshot(), - keySerializer); - - CompatibilityResult namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( - this.namespaceDeserializer, - null, - restoredTimersSnapshot.getNamespaceSerializerSnapshot(), - namespaceSerializer); - - if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) { - throw new IllegalStateException("Tried to initialize restored TimerService " + - "with incompatible serializers than those used to snapshot its state."); + TypeSerializerSchemaCompatibility keySerializerCompatibility = + restoredTimersSnapshot.getKeySerializerSnapshot().resolveSchemaCompatibility(keySerializer); + + if (keySerializerCompatibility.isIncompatible() || keySerializerCompatibility.isCompatibleAfterMigration()) { + throw new IllegalStateException( + "Tried to initialize restored TimerService with new key serializer that requires migration or is incompatible."); } + + TypeSerializerSchemaCompatibility namespaceSerializerCompatibility = + restoredTimersSnapshot.getNamespaceSerializerSnapshot().resolveSchemaCompatibility(namespaceSerializer); + + if (namespaceSerializerCompatibility.isIncompatible() || namespaceSerializerCompatibility.isCompatibleAfterMigration()) { + throw new IllegalStateException( + "Tried to initialize restored TimerService with new namespace serializer that requires migration or is incompatible."); + } + + this.keySerializer = keySerializerCompatibility.isCompatibleAsIs() +