[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...

2016-12-14 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/2962


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...

2016-12-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2962#discussion_r92220463
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serialization proxy for all meta data in keyed state backends. In the 
future we might also migrate the actual state
+ * serialization logic here.
+ */
+public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable {
+
+   private static final int VERSION = 1;
+
+   private TypeSerializerSerializationProxy keySerializerProxy;
+   private List> namedStateSerializationProxies;
+
+   private ClassLoader userCodeClassLoader;
+
+   public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+   this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+   }
+
+   public KeyedBackendSerializationProxy(TypeSerializer keySerializer, 
List> namedStateSerializationProxies) {
+   this.keySerializerProxy = new 
TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
+   this.namedStateSerializationProxies = 
Preconditions.checkNotNull(namedStateSerializationProxies);
+   
Preconditions.checkArgument(namedStateSerializationProxies.size() <= 
Short.MAX_VALUE);
+   }
+
+   public List> getNamedStateSerializationProxies() {
+   return namedStateSerializationProxies;
+   }
+
+   public TypeSerializerSerializationProxy getKeySerializerProxy() {
+   return keySerializerProxy;
+   }
+
+   @Override
+   public int getVersion() {
+   return VERSION;
+   }
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+   super.write(out);
+
+   keySerializerProxy.write(out);
+
+   out.writeShort(namedStateSerializationProxies.size());
+   Map kVStateToId = new 
HashMap<>(namedStateSerializationProxies.size());
--- End diff --

Leftover from the previous code. Will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...

2016-12-13 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2962#discussion_r92145660
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -1373,4 +1431,71 @@ private KeyGroupsStateHandle 
runSnapshot(RunnableFuture sn
}
return snapshotRunnableFuture.get();
}
+
+   private static final TypeSerializer WRONG_VERSION_SERIALIZER = 
new TypeSerializer() {
--- End diff --

Is it wrong version or wrong type, right now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...

2016-12-13 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2962#discussion_r92145625
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -228,6 +229,21 @@ public void testValueState() throws Exception {
assertEquals("u3", getSerializedValue(restoredKvState2, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
backend.dispose();
+
--- End diff --

I would prefer if we could have these as a separate test. These tests are 
already too long, for my taste. 😅 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...

2016-12-13 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2962#discussion_r92144965
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serialization proxy for all meta data in keyed state backends. In the 
future we might also migrate the actual state
+ * serialization logic here.
+ */
+public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable {
+
+   private static final int VERSION = 1;
+
+   private TypeSerializerSerializationProxy keySerializerProxy;
+   private List> namedStateSerializationProxies;
+
+   private ClassLoader userCodeClassLoader;
+
+   public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+   this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+   }
+
+   public KeyedBackendSerializationProxy(TypeSerializer keySerializer, 
List> namedStateSerializationProxies) {
+   this.keySerializerProxy = new 
TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
+   this.namedStateSerializationProxies = 
Preconditions.checkNotNull(namedStateSerializationProxies);
+   
Preconditions.checkArgument(namedStateSerializationProxies.size() <= 
Short.MAX_VALUE);
+   }
+
+   public List> getNamedStateSerializationProxies() {
+   return namedStateSerializationProxies;
+   }
+
+   public TypeSerializerSerializationProxy getKeySerializerProxy() {
+   return keySerializerProxy;
+   }
+
+   @Override
+   public int getVersion() {
+   return VERSION;
+   }
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+   super.write(out);
+
+   keySerializerProxy.write(out);
+
+   out.writeShort(namedStateSerializationProxies.size());
+   Map kVStateToId = new 
HashMap<>(namedStateSerializationProxies.size());
--- End diff --

What is this Map used for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2962: [FLINK-5051] Backwards compatibility for serialize...

2016-12-08 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2962

[FLINK-5051] Backwards compatibility for serializers in backens 

This PR sits on top of PR #2781 and introduces future backwards 
compatibility for state serializers and backends. We do so by providing version 
compatibility checking for TypeSerializer and making the serializers mandatory 
part of a keyed backend's meta data in checkpoints (so that we have everything 
required to reconstruct states in a self contained way). A serialization proxy 
is introduced for keyed state backend and operator state backend. Currently 
this serialization proxy covers the meta data, not yet the actual data. For 
most parts, the PR essentially moves functionality to a different place or 
makes formats more explicit.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
serializer-backwards-compatibility-operator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2962


commit a373585c2fe71b467f49f0e295dc647b43ab7a9c
Author: Stefan Richter 
Date:   2016-11-01T11:29:01Z

Backwards compatibility 1.1 -> 1.2

commit 8e4e4bcede50e66a95928ec854e51d45a7df28bf
Author: Stefan Richter 
Date:   2016-11-09T13:54:35Z

Removing some unecessary code from migration classes

commit 78bd66fade7f836eafbab978329caf1ea26f2ffc
Author: Stefan Richter 
Date:   2016-11-09T17:21:13Z

MultiStreamStateHandle

commit a9355679c3476dd890b54312e1696b61c7839873
Author: Stefan Richter 
Date:   2016-11-10T13:18:55Z

Added migration unit test

commit d079bd4bdb762c307a3c5cd084590804b90996b1
Author: Stefan Richter 
Date:   2016-11-10T13:45:58Z

rebase fixes

commit 9f47bac9c25fc33993c3942a57462039cc578dcd
Author: Stefan Richter 
Date:   2016-11-11T13:46:39Z

Minor cleanups: deleting more unnecessary classes

commit 2bbe66386d28c7914c62e2c3829ff3ab6840164c
Author: Stefan Richter 
Date:   2016-11-23T13:15:33Z

Versioned serialization

commit 6460e27717ab208aada988ba2c83d5628b31b310
Author: Stefan Richter 
Date:   2016-11-23T17:59:45Z

Common meta info introduced to keyed backends

commit e7d66377730339523bad8e3e6e75865ea5a29a6b
Author: Stefan Richter 
Date:   2016-11-23T21:40:26Z

Introducing isCompatibleWith to TypeSerializers

commit 89e3779d231fd0dadb01782791c92ec8ebb15a81
Author: Stefan Richter 
Date:   2016-11-23T22:33:42Z

Splitting / Introducing interface for versiond and compatibile

commit 434f9424e5cd0e01d45e51f44b917306606c5fb1
Author: Stefan Richter 
Date:   2016-11-24T10:59:01Z

Cleanup and documentation

commit 5cb40348dac235dfbe6c6fda532f2b87a6aee7f9
Author: Stefan Richter 
Date:   2016-11-24T16:19:51Z

Better abstractions

commit 500361fb07a428034cb96deb46ece3531d277080
Author: Stefan Richter 
Date:   2016-11-24T16:29:24Z

Serialization proxies for operator state backend meta data

commit 614ab7531a644eaf9edbc420383936bb6e39a34b
Author: Stefan Richter 
Date:   2016-12-01T14:12:40Z

handle one forgotten type of state handle

commit 11792a84c6dbf5057a46fc98b5190abf2cfad014
Author: Stefan Richter 
Date:   2016-12-01T14:37:29Z

Serialization Proxies for KeyedBackends, OperatorBackends, TypeSerializers. 
Still needs integration in OperatorBackend.

commit 89dcc375bd732a370609c10bcaaf5c3b42e93b98
Author: Stefan Richter 
Date:   2016-12-02T00:19:44Z

isCompatibleWith, code dedup and cleanup

commit 3bf993be2aaf17d7f71f0b3709b15aeb78baed6b
Author: Stefan Richter 
Date:   2016-12-02T00:20:05Z

Tests

commit 336fbedf8699792da3586ebe5d30d2644f0abe08
Author: Stefan Richter 
Date:   2016-12-02T00:43:22Z

Some compatibility logic for compount tuple serializer

commit 4ad6fc7884bede64f15d6251e752c97222ac665a
Author: Stefan Richter 
Date:   2016-12-02T11:29:43Z

Partial rollback, going for the simpelest approach. Also including some 
info about state type to serialization

commit d7eed9bc8b13803aa01fc682b22c100ba0e76072
Author: Stefan Richter 
Date:   2016-12-02T12:15:59Z

fixup for base branch, todo cherrypick

commit e962751de45e00d25e063d2f6f19f82dff8f4838
Author: Stefan Richter 
Date:   2016-12-02T15:58:24Z

Fix for 1/0 if UDF present

commit d827f3dd27054844dd154e620e7a1cd75d43fdf5
Author: Stefan Richter 
Date:   2016-12-02T16:00:28Z

Fix for Unknown State type in statemetainfo

commit 5fb822e478acb6bd046b7376834e9e16e80dffd1
Author: Stefan Richter 
Date:   2016-12-05T13:23:49Z

Introduce Eager restore and serialization proxies in 
DefaultOperatorStateBackend

commit dbb54e765bc163c9b33f014b60ecfed8bc98f65c
Author: Stefan Richter 
Date:   2016-12-06T15:00:59Z

WIP offset stream

commit 2331f62ff3a21d9a1337d336573c3eb4b8305e7e
Author: Stefan Richter 
Date:   2016-12-07T1