[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354388#comment-16354388
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8421:


Merged.

1.5 - a79916b88ef3d97c11980c25d3887d43964c152d
1.4 - fb1e24e25eddaded9668041ba4f24e35384962c3

> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354363#comment-16354363
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5362


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354073#comment-16354073
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5362
  
Thanks for the reviews!
Will merge after Travis gives green.


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354066#comment-16354066
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5362
  
I think this is good to go now.   


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350342#comment-16350342
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165646464
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 ---
@@ -52,7 +52,7 @@
 
private final ProcessingTimeService processingTimeService;
 
-   private final Map> timerServices;
+   public final Map> timerServices;
--- End diff --

no, this is a mistake that should be reverted.


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350259#comment-16350259
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165636384
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 ---
@@ -52,7 +52,7 @@
 
private final ProcessingTimeService processingTimeService;
 
-   private final Map> timerServices;
+   public final Map> timerServices;
--- End diff --

Is this only public for testing?


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349905#comment-16349905
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165573730
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.InputStreamViewWrapper;
+
+import java.io.IOException;
+import java.io.PushbackInputStream;
+import java.util.Arrays;
+
+/**
+ * A {@link VersionedIOReadableWritable} which allows to differentiate 
whether the previous
+ * data was versioned with a {@link VersionedIOReadableWritable}. This can 
be used if previously
+ * written data was not versioned, and is to be migrated to a versioned 
format.
+ */
+@Internal
+public abstract class PostVersionedIOReadableWritable extends 
VersionedIOReadableWritable {
+
+   /** NOTE: CANNOT CHANGE! */
+   private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, 
-51, -123, -97};
+
+   /**
+* Read from the provided {@link DataInputView in}. A flag {@code 
wasVersioned} can be
+* used to determine whether or not the data to read was previously 
written
+* by a {@link VersionedIOReadableWritable}.
+*/
+   protected abstract void read(DataInputView in, boolean wasVersioned) 
throws IOException;
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+   out.write(VERSIONED_IDENTIFIER);
+   super.write(out);
+   }
+
+   /**
+* This read attempts to first identify if the input view contains the 
special
+* {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few 
bytes.
+* If identified to be versioned, the usual version resolution read path
+* in {@link VersionedIOReadableWritable#read(DataInputView)} is 
invoked.
+* Otherwise, we "reset" the input view by pushing back the read 
buffered bytes
+* into the stream.
+*/
+   @Override
+   public final void read(DataInputView in) throws IOException {
+   PushbackInputStream stream = new PushbackInputStream(new 
InputStreamViewWrapper(in), VERSIONED_IDENTIFIER.length);
+
+   byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
+   stream.read(tmp);
+
+   if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) {
+   super.read(in);
+   read(in, true);
+   } else {
+   stream.unread(tmp);
+   read(new DataInputViewStreamWrapper(stream), false);
+   }
--- End diff --

Not-so-nice things about this current implementation is:
1) it requires several layers of transforming back and forth between a 
`DataInputView` and `InputStream`, and 
2) it uses a separate `read(DataInputView, boolean)` method in order to 
wrap a "reset" `DataInputView` for the remaining reads.

I think the implementation would have been much more elegant if 
`DataInputView` has an `unread(byte[])` method, though I'm not sure how 
non-trivial it is to support this across all subclasses.
Maybe a food for thought for the future ..


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: 

[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348936#comment-16348936
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5362
  
@aljoscha As discussed offline, I've:
- replaced `ByteArrayPrependedInputStream` with Java's `PushbackInputStream`
- use negative values in `VERSIONED_IDENTIFIER` to be extra safe


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339236#comment-16339236
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5362

[FLINK-8421] [DataStream] Make timer serializers reconfigurable on restore

## What is the purpose of the change

Previously, key and namespace serializers of the `HeapInternalTimerService` 
were not reconfigured on restore.

In Flink 1.4.0, we removed Avro dependency, and on restore if the Avro 
dependency is not present, a `DummyAvroKryoSerializerClass` was registered to 
Kryo as a placeholder, which altered the base Kryo registrations in the 
`KryoSerializer`. This change required a serializer reconfiguration in order 
for restores to work. Effectively, this allowed the issue in the 
`HeapInternalTimerService` to surface.

This PR fixes this by writing also the `TypeSerializerConfigSnapshot`s of 
the key and namespace serializer of the `HeapInternalTimerService` into 
savepoints, and use them to reconfigure new serializers on restore.
Since this would change the binary format of the written timer services, 
this PR also uses this opportunity to properly make the format versioned.

More details of the change is explained below.

## Brief change log

- 1bc3cd0: A preliminary migration test that took a savepoint of a 
`WindowOperator` with keys that required serialization using the 
`KryoSerializer`. Savepoint were taken for Flink versions 1.2 and 1.3. 
Restoring from this savepoint in Flink 1.4 fails, and requires the following 
commits to pass.

- b9a1695: Always use the `FailureTolerantObjectInputStream` to read 
objects in the `InstantiationUtil.deserializeObject(...)` methods. That special 
stream avoids restore failures with `ClassNotFoundException` if Avro is not 
present, but there were leaks where during the restore process, that special 
input stream was not used.

- ff2e6b7 and 8bd955d: Introduced `ByteArrayPrependedInputStream` and 
`PostVersionedIOReadableWritable`. These are utility classes that were required 
to migrate the serialization format of the timer services from non-versioned to 
versioned.

- bcdc1f1: The main change, which adds key / namespace serializer config 
snapshots and use them for serializer reconfiguration on restore. This commit 
also makes the format versioned.

## Verifying this change

- The migration test added in 1bc3cd0 will not pass without all fixes.
- Unit tests are added for the new `ByteArrayPrependedInputStream` and 
`PostVersionedIOReadableWritable` classes.
- The `testSnapshotAndRestore` and `testSnapshotAndRebalancedRestore` tests 
in `HeapInternalTimerServiceTest` are adapted to test both versioned and 
previous non-versioned formats.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**YES** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8421

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5362


commit 1bc3cd0214d2d17f19d76a9aa094429730b5ba13
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-24T16:08:13Z

[FLINK-8421] [DataStream, tests] Add WindowOperator migration test for 
Kryo-serialized window keys

commit b9a169535a91d5678ae916d8e54b7e60724a7486
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-24T16:15:08Z

[FLINK-8421] [core] Let InstantiationUtil.deserializeObject() always use 
FailureTolerantObjectInputStream

commit ff2e6b75f39f0d474ecca451ac1a47c0183e9a6f
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-24T17:07:14Z

[FLINK-8421] [core] Introduce ByteArrayPrependedInputStream

commit 8bd955d701f9f9278a5e52befea4308f42a60b45
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-24T17:08:52Z