[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2962 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2962#discussion_r92220463 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state + * serialization logic here. + */ +public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable { + + private static final int VERSION = 1; + + private TypeSerializerSerializationProxy keySerializerProxy; + private List> namedStateSerializationProxies; + + private ClassLoader userCodeClassLoader; + + public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); + } + + public KeyedBackendSerializationProxy(TypeSerializer keySerializer, List> namedStateSerializationProxies) { + this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer)); + this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies); + Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE); + } + + public List> getNamedStateSerializationProxies() { + return namedStateSerializationProxies; + } + + public TypeSerializerSerializationProxy getKeySerializerProxy() { + return keySerializerProxy; + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + keySerializerProxy.write(out); + + out.writeShort(namedStateSerializationProxies.size()); + Map kVStateToId = new HashMap<>(namedStateSerializationProxies.size()); --- End diff -- Leftover from the previous code. Will remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2962#discussion_r92145660 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -1373,4 +1431,71 @@ private KeyGroupsStateHandle runSnapshot(RunnableFuture sn } return snapshotRunnableFuture.get(); } + + private static final TypeSerializer WRONG_VERSION_SERIALIZER = new TypeSerializer() { --- End diff -- Is it wrong version or wrong type, right now? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2962#discussion_r92145625 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -228,6 +229,21 @@ public void testValueState() throws Exception { assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); backend.dispose(); + --- End diff -- I would prefer if we could have these as a separate test. These tests are already too long, for my taste. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2962#discussion_r92144965 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state + * serialization logic here. + */ +public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable { + + private static final int VERSION = 1; + + private TypeSerializerSerializationProxy keySerializerProxy; + private List> namedStateSerializationProxies; + + private ClassLoader userCodeClassLoader; + + public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); + } + + public KeyedBackendSerializationProxy(TypeSerializer keySerializer, List> namedStateSerializationProxies) { + this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer)); + this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies); + Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE); + } + + public List> getNamedStateSerializationProxies() { + return namedStateSerializationProxies; + } + + public TypeSerializerSerializationProxy getKeySerializerProxy() { + return keySerializerProxy; + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + keySerializerProxy.write(out); + + out.writeShort(namedStateSerializationProxies.size()); + Map kVStateToId = new HashMap<>(namedStateSerializationProxies.size()); --- End diff -- What is this Map used for? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2962 [FLINK-5051] Backwards compatibility for serializers in backens This PR sits on top of PR #2781 and introduces future backwards compatibility for state serializers and backends. We do so by providing version compatibility checking for TypeSerializer and making the serializers mandatory part of a keyed backend's meta data in checkpoints (so that we have everything required to reconstruct states in a self contained way). A serialization proxy is introduced for keyed state backend and operator state backend. Currently this serialization proxy covers the meta data, not yet the actual data. For most parts, the PR essentially moves functionality to a different place or makes formats more explicit. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink serializer-backwards-compatibility-operator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2962.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2962 commit a373585c2fe71b467f49f0e295dc647b43ab7a9c Author: Stefan Richter Date: 2016-11-01T11:29:01Z Backwards compatibility 1.1 -> 1.2 commit 8e4e4bcede50e66a95928ec854e51d45a7df28bf Author: Stefan Richter Date: 2016-11-09T13:54:35Z Removing some unecessary code from migration classes commit 78bd66fade7f836eafbab978329caf1ea26f2ffc Author: Stefan Richter Date: 2016-11-09T17:21:13Z MultiStreamStateHandle commit a9355679c3476dd890b54312e1696b61c7839873 Author: Stefan Richter Date: 2016-11-10T13:18:55Z Added migration unit test commit d079bd4bdb762c307a3c5cd084590804b90996b1 Author: Stefan Richter Date: 2016-11-10T13:45:58Z rebase fixes commit 9f47bac9c25fc33993c3942a57462039cc578dcd Author: Stefan Richter Date: 2016-11-11T13:46:39Z Minor cleanups: deleting more unnecessary classes commit 2bbe66386d28c7914c62e2c3829ff3ab6840164c Author: Stefan Richter Date: 2016-11-23T13:15:33Z Versioned serialization commit 6460e27717ab208aada988ba2c83d5628b31b310 Author: Stefan Richter Date: 2016-11-23T17:59:45Z Common meta info introduced to keyed backends commit e7d66377730339523bad8e3e6e75865ea5a29a6b Author: Stefan Richter Date: 2016-11-23T21:40:26Z Introducing isCompatibleWith to TypeSerializers commit 89e3779d231fd0dadb01782791c92ec8ebb15a81 Author: Stefan Richter Date: 2016-11-23T22:33:42Z Splitting / Introducing interface for versiond and compatibile commit 434f9424e5cd0e01d45e51f44b917306606c5fb1 Author: Stefan Richter Date: 2016-11-24T10:59:01Z Cleanup and documentation commit 5cb40348dac235dfbe6c6fda532f2b87a6aee7f9 Author: Stefan Richter Date: 2016-11-24T16:19:51Z Better abstractions commit 500361fb07a428034cb96deb46ece3531d277080 Author: Stefan Richter Date: 2016-11-24T16:29:24Z Serialization proxies for operator state backend meta data commit 614ab7531a644eaf9edbc420383936bb6e39a34b Author: Stefan Richter Date: 2016-12-01T14:12:40Z handle one forgotten type of state handle commit 11792a84c6dbf5057a46fc98b5190abf2cfad014 Author: Stefan Richter Date: 2016-12-01T14:37:29Z Serialization Proxies for KeyedBackends, OperatorBackends, TypeSerializers. Still needs integration in OperatorBackend. commit 89dcc375bd732a370609c10bcaaf5c3b42e93b98 Author: Stefan Richter Date: 2016-12-02T00:19:44Z isCompatibleWith, code dedup and cleanup commit 3bf993be2aaf17d7f71f0b3709b15aeb78baed6b Author: Stefan Richter Date: 2016-12-02T00:20:05Z Tests commit 336fbedf8699792da3586ebe5d30d2644f0abe08 Author: Stefan Richter Date: 2016-12-02T00:43:22Z Some compatibility logic for compount tuple serializer commit 4ad6fc7884bede64f15d6251e752c97222ac665a Author: Stefan Richter Date: 2016-12-02T11:29:43Z Partial rollback, going for the simpelest approach. Also including some info about state type to serialization commit d7eed9bc8b13803aa01fc682b22c100ba0e76072 Author: Stefan Richter Date: 2016-12-02T12:15:59Z fixup for base branch, todo cherrypick commit e962751de45e00d25e063d2f6f19f82dff8f4838 Author: Stefan Richter Date: 2016-12-02T15:58:24Z Fix for 1/0 if UDF present commit d827f3dd27054844dd154e620e7a1cd75d43fdf5 Author: Stefan Richter Date: 2016-12-02T16:00:28Z Fix for Unknown State type in statemetainfo commit 5fb822e478acb6bd046b7376834e9e16e80dffd1 Author: Stefan Richter Date: 2016-12-05T13:23:49Z Introduce Eager restore and serialization proxies in DefaultOperatorStateBackend commit dbb54e765bc163c9b33f014b60ecfed8bc98f65c Author: Stefan Richter Date: 2016-12-06T15:00:59Z WIP offset stream commit 2331f62ff3a21d9a1337d336573c3eb4b8305e7e Author: Stefan Richter Date: 2016-12-07T1