http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index efbebf4..329ce18 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -24,23 +24,15 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Migration;
-
-import java.io.Serializable;
 
 import static java.util.Objects.requireNonNull;
 
@@ -57,8 +49,7 @@ import static java.util.Objects.requireNonNull;
 @PublicEvolving
 public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
                extends AbstractStreamOperator<OUT>
-               implements OutputTypeConfigurable<OUT>,
-               StreamCheckpointedOperator {
+               implements OutputTypeConfigurable<OUT> {
 
        private static final long serialVersionUID = 1L;
 
@@ -132,59 +123,6 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
        // 
------------------------------------------------------------------------
 
        @Override
-       public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
-               if (userFunction instanceof Checkpointed) {
-                       @SuppressWarnings("unchecked")
-                       Checkpointed<Serializable> chkFunction = 
(Checkpointed<Serializable>) userFunction;
-
-                       Serializable udfState;
-                       try {
-                               udfState = 
chkFunction.snapshotState(checkpointId, timestamp);
-                               if (udfState != null) {
-                                       out.write(1);
-                                       InstantiationUtil.serializeObject(out, 
udfState);
-                               } else {
-                                       out.write(0);
-                               }
-                       } catch (Exception e) {
-                               throw new Exception("Failed to draw state 
snapshot from function: " + e.getMessage(), e);
-                       }
-               }
-       }
-
-       @Override
-       public void restoreState(FSDataInputStream in) throws Exception {
-               boolean haveReadUdfStateFlag = false;
-               if (userFunction instanceof Checkpointed ||
-                               (userFunction instanceof 
CheckpointedRestoring)) {
-                       @SuppressWarnings("unchecked")
-                       CheckpointedRestoring<Serializable> chkFunction = 
(CheckpointedRestoring<Serializable>) userFunction;
-
-                       int hasUdfState = in.read();
-                       haveReadUdfStateFlag = true;
-
-                       if (hasUdfState == 1) {
-                               Serializable functionState = 
InstantiationUtil.deserializeObject(in, getUserCodeClassloader());
-                               if (functionState != null) {
-                                       try {
-                                               
chkFunction.restoreState(functionState);
-                                       } catch (Exception e) {
-                                               throw new Exception("Failed to 
restore state to function: " + e.getMessage(), e);
-                                       }
-                               }
-                       }
-               }
-
-               if (in instanceof Migration && !haveReadUdfStateFlag) {
-                       // absorb the introduced byte from the migration stream 
without too much further consequences
-                       int hasUdfState = in.read();
-                       if (hasUdfState == 1) {
-                               throw new Exception("Found UDF state but 
operator is not instance of CheckpointedRestoring");
-                       }
-               }
-       }
-
-       @Override
        public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {
                super.notifyOfCompletedCheckpoint(checkpointId);
 
@@ -219,23 +157,11 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
 
        private void checkUdfCheckpointingPreconditions() {
 
-               boolean newCheckpointInferface = false;
-
-               if (userFunction instanceof CheckpointedFunction) {
-                       newCheckpointInferface = true;
-               }
-
-               if (userFunction instanceof ListCheckpointed) {
-                       if (newCheckpointInferface) {
-                               throw new IllegalStateException("User functions 
are not allowed to implement " +
-                                               "CheckpointedFunction AND 
ListCheckpointed.");
-                       }
-                       newCheckpointInferface = true;
-               }
+               if (userFunction instanceof CheckpointedFunction
+                       && userFunction instanceof ListCheckpointed) {
 
-               if (newCheckpointInferface && userFunction instanceof 
Checkpointed) {
-                       throw new IllegalStateException("User functions are not 
allowed to implement Checkpointed AND " +
-                                       
"CheckpointedFunction/ListCheckpointed.");
+                       throw new IllegalStateException("User functions are not 
allowed to implement " +
+                               "CheckpointedFunction AND ListCheckpointed.");
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
deleted file mode 100644
index 33304e4..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-/**
- * Interface for {@link StreamOperator StreamOperators} that can restore from 
a Flink 1.1
- * legacy snapshot that was done using the {@link StreamCheckpointedOperator} 
interface.
- *
- * @deprecated {@link Checkpointed} has been deprecated as well. This class 
can be
- * removed when we remove that interface.
- */
-@Deprecated
-public interface CheckpointedRestoringOperator {
-
-       /**
-        * Restores the operator state, if this operator's execution is 
recovering from a checkpoint.
-        * This method restores the operator state (if the operator is 
stateful) and the key/value state
-        * (if it had been used and was initialized when the snapshot occurred).
-        *
-        * <p>This method is called after {@link 
StreamOperator#setup(StreamTask, StreamConfig, Output)}
-        * and before {@link StreamOperator#open()}.
-        *
-        * @param in The stream from which we have to restore our state.
-        *
-        * @throws Exception Exceptions during state restore should be 
forwarded, so that the system can
-        *                   properly react to failed state restore and fail 
the execution attempt.
-        */
-       void restoreState(FSDataInputStream in) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
deleted file mode 100644
index 986e2b7..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.core.fs.FSDataOutputStream;
-
-/**
- * @deprecated This interface is deprecated without replacement.
- * All operators are now checkpointed.
- */
-@Deprecated
-public interface StreamCheckpointedOperator extends 
CheckpointedRestoringOperator {
-
-       /**
-        * Called to draw a state snapshot from the operator. This method 
snapshots the operator state
-        * (if the operator is stateful).
-        *
-        * @param out The stream to which we have to write our state.
-        * @param checkpointId The ID of the checkpoint.
-        * @param timestamp The timestamp of the checkpoint.
-        *
-        * @throws Exception Forwards exceptions that occur while drawing 
snapshots from the operator
-        *                   and the key/value state.
-        */
-       void snapshotState(FSDataOutputStream out, long checkpointId, long 
timestamp) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 9d5e02b..38b4aee 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -105,21 +104,6 @@ public interface StreamOperator<OUT> extends Serializable {
                CheckpointOptions checkpointOptions) throws Exception;
 
        /**
-        * Takes a snapshot of the legacy operator state defined via {@link 
StreamCheckpointedOperator}.
-        *
-        * @return The handle to the legacy operator state, or null, if no 
state was snapshotted.
-        * @throws Exception This method should forward any type of exception 
that happens during snapshotting.
-        *
-        * @deprecated This method will be removed as soon as no more operators 
use the legacy state code paths
-        */
-       @SuppressWarnings("deprecation")
-       @Deprecated
-       StreamStateHandle snapshotLegacyOperatorState(
-               long checkpointId,
-               long timestamp,
-               CheckpointOptions checkpointOptions) throws Exception;
-
-       /**
         * Provides state handles to restore the operator state.
         *
         * @param stateHandles state handles to the operator state.

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
deleted file mode 100644
index 252f997..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.windowing.time.Time;
-
-/**
- * This is a special window assigner used to tell the system to use the
- * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
- *
- * <p>Prior Flink versions used that operator automatically for simple 
processing time
- * windows (tumbling and sliding) when no custom trigger and no evictor was 
specified.
- * In the current Flink version, that operator is only used when programs 
explicitly
- * specify this window assigner. This is only intended for special cases where 
programs relied on
- * the better performance of the fast aligned window operator, and are willing 
to accept the lack
- * of support for various features as indicated below:
- *
- * <ul>
- *     <li>No custom state backend can be selected, the operator always stores 
data on the Java heap.</li>
- *     <li>The operator does not support key groups, meaning it cannot change 
the parallelism.</li>
- *     <li>Future versions of Flink may not be able to resume from 
checkpoints/savepoints taken by this
- *         operator.</li>
- * </ul>
- *
- * <p>Future implementation plans: We plan to add some of the optimizations 
used by this operator to
- * the general window operator, so that future versions of Flink will not have 
the performance/functionality
- * trade-off any more.
- *
- * <p>Note on implementation: The concrete operator instantiated by this 
assigner is either the
- * {@link 
org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
- * or {@link 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
- */
-@PublicEvolving
-public final class TumblingAlignedProcessingTimeWindows extends 
BaseAlignedWindowAssigner {
-
-       private static final long serialVersionUID = -6217477609512299842L;
-
-       public TumblingAlignedProcessingTimeWindows(long size) {
-               super(size);
-       }
-
-       /**
-        * Creates a new {@code TumblingAlignedProcessingTimeWindows} {@link 
WindowAssigner} that assigns
-        * elements to time windows based on the element timestamp.
-        *
-        * @param size The size of the generated windows.
-        */
-       public static TumblingAlignedProcessingTimeWindows of(Time size) {
-               return new 
TumblingAlignedProcessingTimeWindows(size.toMilliseconds());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
deleted file mode 100644
index 83a7528..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.apache.flink.util.MathUtils;
-
-import org.apache.commons.math3.util.ArithmeticUtils;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Base class for special window operator implementation for windows that fire 
at the same time for
- * all keys.
- *
- * @deprecated Deprecated in favour of the generic {@link WindowOperator}. 
This was an
- * optimized implementation used for aligned windows.
- */
-@Internal
-@Deprecated
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, STATE, F extends Function>
-               extends AbstractUdfStreamOperator<OUT, F>
-               implements OneInputStreamOperator<IN, OUT>, 
ProcessingTimeCallback {
-
-       private static final long serialVersionUID = 3245500864882459867L;
-
-       private static final long MIN_SLIDE_TIME = 50;
-
-       // ----- fields for operator parametrization -----
-
-       private final Function function;
-       private final KeySelector<IN, KEY> keySelector;
-
-       private final TypeSerializer<KEY> keySerializer;
-       private final TypeSerializer<STATE> stateTypeSerializer;
-
-       private final long windowSize;
-       private final long windowSlide;
-       private final long paneSize;
-       private final int numPanesPerWindow;
-
-       // ----- fields for operator functionality -----
-
-       private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
-
-       private transient TimestampedCollector<OUT> out;
-
-       private transient RestoredState<IN, KEY, STATE, OUT> restoredState;
-
-       private transient long nextEvaluationTime;
-       private transient long nextSlideTime;
-
-       protected AbstractAlignedProcessingTimeWindowOperator(
-                       F function,
-                       KeySelector<IN, KEY> keySelector,
-                       TypeSerializer<KEY> keySerializer,
-                       TypeSerializer<STATE> stateTypeSerializer,
-                       long windowLength,
-                       long windowSlide) {
-               super(function);
-
-               if (windowLength < MIN_SLIDE_TIME) {
-                       throw new IllegalArgumentException("Window length must 
be at least " + MIN_SLIDE_TIME + " msecs");
-               }
-               if (windowSlide < MIN_SLIDE_TIME) {
-                       throw new IllegalArgumentException("Window slide must 
be at least " + MIN_SLIDE_TIME + " msecs");
-               }
-               if (windowLength < windowSlide) {
-                       throw new IllegalArgumentException("The window size 
must be larger than the window slide");
-               }
-
-               final long paneSlide = ArithmeticUtils.gcd(windowLength, 
windowSlide);
-               if (paneSlide < MIN_SLIDE_TIME) {
-                       throw new IllegalArgumentException(String.format(
-                                       "Cannot compute window of size %d msecs 
sliding by %d msecs. " +
-                                                       "The unit of grouping 
is too small: %d msecs", windowLength, windowSlide, paneSlide));
-               }
-
-               this.function = requireNonNull(function);
-               this.keySelector = requireNonNull(keySelector);
-               this.keySerializer = requireNonNull(keySerializer);
-               this.stateTypeSerializer = requireNonNull(stateTypeSerializer);
-               this.windowSize = windowLength;
-               this.windowSlide = windowSlide;
-               this.paneSize = paneSlide;
-               this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength 
/ paneSlide);
-       }
-
-       protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> 
createPanes(
-                       KeySelector<IN, KEY> keySelector, Function function);
-
-       // 
------------------------------------------------------------------------
-       //  startup and shutdown
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void open() throws Exception {
-               super.open();
-
-               out = new TimestampedCollector<>(output);
-
-               // decide when to first compute the window and when to slide it
-               // the values should align with the start of time (that is, the 
UNIX epoch, not the big bang)
-               final long now = 
getProcessingTimeService().getCurrentProcessingTime();
-               nextEvaluationTime = now + windowSlide - (now % windowSlide);
-               nextSlideTime = now + paneSize - (now % paneSize);
-
-               final long firstTriggerTime = Math.min(nextEvaluationTime, 
nextSlideTime);
-
-               // check if we restored state and if we need to fire some 
windows based on that restored state
-               if (restoredState == null) {
-                       // initial empty state: create empty panes that gather 
the elements per slide
-                       panes = createPanes(keySelector, function);
-               }
-               else {
-                       // restored state
-                       panes = restoredState.panes;
-
-                       long nextPastEvaluationTime = 
restoredState.nextEvaluationTime;
-                       long nextPastSlideTime = restoredState.nextSlideTime;
-                       long nextPastTriggerTime = 
Math.min(nextPastEvaluationTime, nextPastSlideTime);
-                       int numPanesRestored = panes.getNumPanes();
-
-                       // fire windows from the past as long as there are more 
panes with data and as long
-                       // as the missed trigger times have not caught up with 
the presence
-                       while (numPanesRestored > 0 && nextPastTriggerTime < 
firstTriggerTime) {
-                               // evaluate the window from the past
-                               if (nextPastTriggerTime == 
nextPastEvaluationTime) {
-                                       computeWindow(nextPastTriggerTime);
-                                       nextPastEvaluationTime += windowSlide;
-                               }
-
-                               // evaluate slide from the past
-                               if (nextPastTriggerTime == nextPastSlideTime) {
-                                       panes.slidePanes(numPanesPerWindow);
-                                       numPanesRestored--;
-                                       nextPastSlideTime += paneSize;
-                               }
-
-                               nextPastTriggerTime = 
Math.min(nextPastEvaluationTime, nextPastSlideTime);
-                       }
-               }
-
-               // make sure the first window happens
-               getProcessingTimeService().registerTimer(firstTriggerTime, 
this);
-       }
-
-       @Override
-       public void close() throws Exception {
-               super.close();
-
-               // early stop the triggering thread, so it does not attempt to 
return any more data
-               stopTriggers();
-       }
-
-       @Override
-       public void dispose() throws Exception {
-               super.dispose();
-
-               // acquire the lock during shutdown, to prevent trigger calls 
at the same time
-               // fail-safe stop of the triggering thread (in case of an error)
-               stopTriggers();
-
-               // release the panes. panes may still be null if dispose is 
called
-               // after open() failed
-               if (panes != null) {
-                       panes.dispose();
-               }
-       }
-
-       private void stopTriggers() {
-               // reset the action timestamps. this makes sure any pending 
triggers will not evaluate
-               nextEvaluationTime = -1L;
-               nextSlideTime = -1L;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Receiving elements and triggers
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void processElement(StreamRecord<IN> element) throws Exception {
-               panes.addElementToLatestPane(element.getValue());
-       }
-
-       @Override
-       public void onProcessingTime(long timestamp) throws Exception {
-               // first we check if we actually trigger the window function
-               if (timestamp == nextEvaluationTime) {
-                       // compute and output the results
-                       computeWindow(timestamp);
-
-                       nextEvaluationTime += windowSlide;
-               }
-
-               // check if we slide the panes by one. this may happen in 
addition to the
-               // window computation, or just by itself
-               if (timestamp == nextSlideTime) {
-                       panes.slidePanes(numPanesPerWindow);
-                       nextSlideTime += paneSize;
-               }
-
-               long nextTriggerTime = Math.min(nextEvaluationTime, 
nextSlideTime);
-               getProcessingTimeService().registerTimer(nextTriggerTime, this);
-       }
-
-       private void computeWindow(long timestamp) throws Exception {
-               out.setAbsoluteTimestamp(timestamp);
-               panes.truncatePanes(numPanesPerWindow);
-               panes.evaluateWindow(out, new TimeWindow(timestamp - 
windowSize, timestamp), this);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Checkpointing
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
-               super.snapshotState(out, checkpointId, timestamp);
-
-               // we write the panes with the key/value maps into the stream, 
as well as when this state
-               // should have triggered and slided
-
-               DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(out);
-
-               outView.writeLong(nextEvaluationTime);
-               outView.writeLong(nextSlideTime);
-
-               panes.writeToOutput(outView, keySerializer, 
stateTypeSerializer);
-
-               outView.flush();
-       }
-
-       @Override
-       public void restoreState(FSDataInputStream in) throws Exception {
-               super.restoreState(in);
-
-               DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(in);
-
-               final long nextEvaluationTime = inView.readLong();
-               final long nextSlideTime = inView.readLong();
-
-               AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = 
createPanes(keySelector, function);
-
-               panes.readFromInput(inView, keySerializer, stateTypeSerializer);
-
-               restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Property access (for testing)
-       // 
------------------------------------------------------------------------
-
-       public long getWindowSize() {
-               return windowSize;
-       }
-
-       public long getWindowSlide() {
-               return windowSlide;
-       }
-
-       public long getPaneSize() {
-               return paneSize;
-       }
-
-       public int getNumPanesPerWindow() {
-               return numPanesPerWindow;
-       }
-
-       public long getNextEvaluationTime() {
-               return nextEvaluationTime;
-       }
-
-       public long getNextSlideTime() {
-               return nextSlideTime;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "Window (processing time) (length=" + windowSize + ", 
slide=" + windowSlide + ')';
-       }
-
-       // 
------------------------------------------------------------------------
-       // 
------------------------------------------------------------------------
-
-       private static final class RestoredState<IN, KEY, STATE, OUT> {
-
-               final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
-               final long nextEvaluationTime;
-               final long nextSlideTime;
-
-               RestoredState(AbstractKeyedTimePanes<IN, KEY, STATE, OUT> 
panes, long nextEvaluationTime, long nextSlideTime) {
-                       this.panes = panes;
-                       this.nextEvaluationTime = nextEvaluationTime;
-                       this.nextSlideTime = nextSlideTime;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
deleted file mode 100644
index d67121a..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.ArrayListSerializer;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
-
-import java.util.ArrayList;
-
-/**
- * Special window operator implementation for windows that fire at the same 
time for all keys with
- * accumulating windows.
- *
- * @deprecated Deprecated in favour of the generic {@link WindowOperator}. 
This was an
- * optimized implementation used for aligned windows.
- */
-@Internal
-@Deprecated
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
-               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, ArrayList<IN>, InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> 
{
-
-       private static final long serialVersionUID = 7305948082830843475L;
-
-       public AccumulatingProcessingTimeWindowOperator(
-                       InternalWindowFunction<Iterable<IN>, OUT, KEY, 
TimeWindow> function,
-                       KeySelector<IN, KEY> keySelector,
-                       TypeSerializer<KEY> keySerializer,
-                       TypeSerializer<IN> valueSerializer,
-                       long windowLength,
-                       long windowSlide) {
-               super(function, keySelector, keySerializer,
-                               new ArrayListSerializer<>(valueSerializer), 
windowLength, windowSlide);
-       }
-
-       @Override
-       protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
-               @SuppressWarnings("unchecked")
-               InternalWindowFunction<Iterable<IN>, OUT, KEY, Window> 
windowFunction = (InternalWindowFunction<Iterable<IN>, OUT, KEY, Window>) 
function;
-
-               return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
deleted file mode 100644
index 6747383..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-
-/**
- * Special window operator implementation for windows that fire at the same 
time for all keys with
- * aggregating windows.
- *
- * @deprecated Deprecated in favour of the generic {@link WindowOperator}. 
This was an
- * optimized implementation used for aligned windows.
- */
-@Internal
-@Deprecated
-public class AggregatingProcessingTimeWindowOperator<KEY, IN>
-               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
IN, IN, ReduceFunction<IN>> {
-
-       private static final long serialVersionUID = 7305948082830843475L;
-
-       public AggregatingProcessingTimeWindowOperator(
-                       ReduceFunction<IN> function,
-                       KeySelector<IN, KEY> keySelector,
-                       TypeSerializer<KEY> keySerializer,
-                       TypeSerializer<IN> aggregateSerializer,
-                       long windowLength,
-                       long windowSlide) {
-               super(function, keySelector, keySerializer, 
aggregateSerializer, windowLength, windowSlide);
-       }
-
-       @Override
-       protected AggregatingKeyedTimePanes<IN, KEY> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
-               @SuppressWarnings("unchecked")
-               ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) 
function;
-
-               return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, 
windowFunction);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 880907d..b14739f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -41,17 +41,12 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
-import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -61,8 +56,6 @@ import 
org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import 
org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
@@ -70,16 +63,9 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.math3.util.ArithmeticUtils;
-
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -180,34 +166,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
        protected transient InternalTimerService<W> internalTimerService;
 
-       // 
------------------------------------------------------------------------
-       // State restored in case of migration from an older version (backwards 
compatibility)
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A flag indicating if we are migrating from a regular {@link 
WindowOperator}
-        * or one of the deprecated {@link 
AccumulatingProcessingTimeWindowOperator} and
-        * {@link AggregatingProcessingTimeWindowOperator}.
-        */
-       private final LegacyWindowOperatorType legacyWindowOperatorType;
-
-       /**
-        * The elements restored when migrating from an older, deprecated
-        * {@link AccumulatingProcessingTimeWindowOperator} or
-        * {@link AggregatingProcessingTimeWindowOperator}. */
-       private transient PriorityQueue<StreamRecord<IN>> 
restoredFromLegacyAlignedOpRecords;
-
-       /**
-        * The restored processing time timers when migrating from an
-        * older version of the {@link WindowOperator}.
-        */
-       private transient PriorityQueue<Timer<K, W>> 
restoredFromLegacyProcessingTimeTimers;
-
-       /** The restored event time timer when migrating from an
-        * older version of the {@link WindowOperator}.
-        */
-       private transient PriorityQueue<Timer<K, W>> 
restoredFromLegacyEventTimeTimers;
-
        /**
         * Creates a new {@code WindowOperator} based on the given policies and 
user functions.
         */
@@ -222,25 +180,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        long allowedLateness,
                        OutputTag<IN> lateDataOutputTag) {
 
-               this(windowAssigner, windowSerializer, keySelector, 
keySerializer,
-                       windowStateDescriptor, windowFunction, trigger, 
allowedLateness, lateDataOutputTag, LegacyWindowOperatorType.NONE);
-       }
-
-       /**
-        * Creates a new {@code WindowOperator} based on the given policies and 
user functions.
-        */
-       public WindowOperator(
-                       WindowAssigner<? super IN, W> windowAssigner,
-                       TypeSerializer<W> windowSerializer,
-                       KeySelector<IN, K> keySelector,
-                       TypeSerializer<K> keySerializer,
-                       StateDescriptor<? extends AppendingState<IN, ACC>, ?> 
windowStateDescriptor,
-                       InternalWindowFunction<ACC, OUT, K, W> windowFunction,
-                       Trigger<? super IN, ? super W> trigger,
-                       long allowedLateness,
-                       OutputTag<IN> lateDataOutputTag,
-                       LegacyWindowOperatorType legacyWindowOperatorType) {
-
                super(windowFunction);
 
                checkArgument(!(windowAssigner instanceof 
BaseAlignedWindowAssigner),
@@ -261,7 +200,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                this.trigger = checkNotNull(trigger);
                this.allowedLateness = allowedLateness;
                this.lateDataOutputTag = lateDataOutputTag;
-               this.legacyWindowOperatorType = legacyWindowOperatorType;
 
                setChainingStrategy(ChainingStrategy.ALWAYS);
        }
@@ -321,8 +259,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        
getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, 
mergingSetsStateDescriptor);
                        
mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
                }
-
-               registerRestoredLegacyStateState();
        }
 
        @Override
@@ -1037,256 +973,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        // 
------------------------------------------------------------------------
-       //  Restoring / Migrating from an older Flink version.
-       // 
------------------------------------------------------------------------
-
-       private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
-
-       private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
-
-       @Override
-       public void restoreState(FSDataInputStream in) throws Exception {
-               super.restoreState(in);
-
-               LOG.info("{} (taskIdx={}) restoring {} state from an older 
Flink version.",
-                       getClass().getSimpleName(), legacyWindowOperatorType, 
getRuntimeContext().getIndexOfThisSubtask());
-
-               DataInputViewStreamWrapper streamWrapper = new 
DataInputViewStreamWrapper(in);
-
-               switch (legacyWindowOperatorType) {
-                       case NONE:
-                               restoreFromLegacyWindowOperator(streamWrapper);
-                               break;
-                       case FAST_ACCUMULATING:
-                       case FAST_AGGREGATING:
-                               
restoreFromLegacyAlignedWindowOperator(streamWrapper);
-                               break;
-               }
-       }
-
-       public void registerRestoredLegacyStateState() throws Exception {
-
-               switch (legacyWindowOperatorType) {
-                       case NONE:
-                               reregisterStateFromLegacyWindowOperator();
-                               break;
-                       case FAST_ACCUMULATING:
-                       case FAST_AGGREGATING:
-                               
reregisterStateFromLegacyAlignedWindowOperator();
-                               break;
-               }
-       }
-
-       private void 
restoreFromLegacyAlignedWindowOperator(DataInputViewStreamWrapper in) throws 
IOException {
-               Preconditions.checkArgument(legacyWindowOperatorType != 
LegacyWindowOperatorType.NONE);
-
-               final long nextEvaluationTime = in.readLong();
-               final long nextSlideTime = in.readLong();
-
-               validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, in.readInt());
-
-               restoredFromLegacyAlignedOpRecords = new PriorityQueue<>(42,
-                       new Comparator<StreamRecord<IN>>() {
-                               @Override
-                               public int compare(StreamRecord<IN> o1, 
StreamRecord<IN> o2) {
-                                       return Long.compare(o1.getTimestamp(), 
o2.getTimestamp());
-                               }
-                       }
-               );
-
-               switch (legacyWindowOperatorType) {
-                       case FAST_ACCUMULATING:
-                               
restoreElementsFromLegacyAccumulatingAlignedWindowOperator(in, nextSlideTime);
-                               break;
-                       case FAST_AGGREGATING:
-                               
restoreElementsFromLegacyAggregatingAlignedWindowOperator(in, nextSlideTime);
-                               break;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{} (taskIdx={}) restored {} events from 
legacy {}.",
-                               getClass().getSimpleName(),
-                               getRuntimeContext().getIndexOfThisSubtask(),
-                               restoredFromLegacyAlignedOpRecords.size(),
-                               legacyWindowOperatorType);
-               }
-       }
-
-       private void 
restoreElementsFromLegacyAccumulatingAlignedWindowOperator(DataInputView in, 
long nextSlideTime) throws IOException {
-               int numPanes = in.readInt();
-               final long paneSize = getPaneSize();
-               long nextElementTimestamp = nextSlideTime - (numPanes * 
paneSize);
-
-               @SuppressWarnings("unchecked")
-               ArrayListSerializer<IN> ser = new 
ArrayListSerializer<>((TypeSerializer<IN>) 
getStateDescriptor().getSerializer());
-
-               while (numPanes > 0) {
-                       validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, 
in.readInt());
-
-                       nextElementTimestamp += paneSize - 1; // the -1 is so 
that the elements fall into the correct time-frame
-
-                       final int numElementsInPane = in.readInt();
-                       for (int i = numElementsInPane - 1; i >= 0; i--) {
-                               K key = keySerializer.deserialize(in);
-
-                               @SuppressWarnings("unchecked")
-                               List<IN> valueList = ser.deserialize(in);
-                               for (IN record: valueList) {
-                                       
restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(record, 
nextElementTimestamp));
-                               }
-                       }
-                       numPanes--;
-               }
-       }
-
-       private void 
restoreElementsFromLegacyAggregatingAlignedWindowOperator(DataInputView in, 
long nextSlideTime) throws IOException {
-               int numPanes = in.readInt();
-               final long paneSize = getPaneSize();
-               long nextElementTimestamp = nextSlideTime - (numPanes * 
paneSize);
-
-               while (numPanes > 0) {
-                       validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, 
in.readInt());
-
-                       nextElementTimestamp += paneSize - 1; // the -1 is so 
that the elements fall into the correct time-frame
-
-                       final int numElementsInPane = in.readInt();
-                       for (int i = numElementsInPane - 1; i >= 0; i--) {
-                               K key = keySerializer.deserialize(in);
-
-                               @SuppressWarnings("unchecked")
-                               IN value = (IN) 
getStateDescriptor().getSerializer().deserialize(in);
-                               restoredFromLegacyAlignedOpRecords.add(new 
StreamRecord<>(value, nextElementTimestamp));
-                       }
-                       numPanes--;
-               }
-       }
-
-       private long getPaneSize() {
-               Preconditions.checkArgument(
-                       legacyWindowOperatorType == 
LegacyWindowOperatorType.FAST_ACCUMULATING ||
-                               legacyWindowOperatorType == 
LegacyWindowOperatorType.FAST_AGGREGATING);
-
-               final long paneSlide;
-               if (windowAssigner instanceof SlidingProcessingTimeWindows) {
-                       SlidingProcessingTimeWindows timeWindows = 
(SlidingProcessingTimeWindows) windowAssigner;
-                       paneSlide = ArithmeticUtils.gcd(timeWindows.getSize(), 
timeWindows.getSlide());
-               } else {
-                       TumblingProcessingTimeWindows timeWindows = 
(TumblingProcessingTimeWindows) windowAssigner;
-                       paneSlide = timeWindows.getSize(); // this is valid as 
windowLength == windowSlide == timeWindows.getSize
-               }
-               return paneSlide;
-       }
-
-       private static void validateMagicNumber(int expected, int found) throws 
IOException {
-               if (expected != found) {
-                       throw new IOException("Corrupt state stream - wrong 
magic number. " +
-                               "Expected '" + Integer.toHexString(expected) +
-                               "', found '" + Integer.toHexString(found) + 
'\'');
-               }
-       }
-
-       private void restoreFromLegacyWindowOperator(DataInputViewStreamWrapper 
in) throws IOException {
-               Preconditions.checkArgument(legacyWindowOperatorType == 
LegacyWindowOperatorType.NONE);
-
-               int numWatermarkTimers = in.readInt();
-               this.restoredFromLegacyEventTimeTimers = new 
PriorityQueue<>(Math.max(numWatermarkTimers, 1));
-
-               for (int i = 0; i < numWatermarkTimers; i++) {
-                       K key = keySerializer.deserialize(in);
-                       W window = windowSerializer.deserialize(in);
-                       long timestamp = in.readLong();
-
-                       Timer<K, W> timer = new Timer<>(timestamp, key, window);
-                       restoredFromLegacyEventTimeTimers.add(timer);
-               }
-
-               int numProcessingTimeTimers = in.readInt();
-               this.restoredFromLegacyProcessingTimeTimers = new 
PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
-
-               for (int i = 0; i < numProcessingTimeTimers; i++) {
-                       K key = keySerializer.deserialize(in);
-                       W window = windowSerializer.deserialize(in);
-                       long timestamp = in.readLong();
-
-                       Timer<K, W> timer = new Timer<>(timestamp, key, window);
-                       restoredFromLegacyProcessingTimeTimers.add(timer);
-               }
-
-               // just to read all the rest, although we do not really use 
this information.
-               int numProcessingTimeTimerTimestamp = in.readInt();
-               for (int i = 0; i < numProcessingTimeTimerTimestamp; i++) {
-                       in.readLong();
-                       in.readInt();
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
-
-                       if (restoredFromLegacyEventTimeTimers != null && 
!restoredFromLegacyEventTimeTimers.isEmpty()) {
-                               LOG.debug("{} (taskIdx={}) restored {} event 
time timers from an older Flink version: {}",
-                                       getClass().getSimpleName(), subtaskIdx,
-                                       
restoredFromLegacyEventTimeTimers.size(),
-                                       restoredFromLegacyEventTimeTimers);
-                       }
-
-                       if (restoredFromLegacyProcessingTimeTimers != null && 
!restoredFromLegacyProcessingTimeTimers.isEmpty()) {
-                               LOG.debug("{} (taskIdx={}) restored {} 
processing time timers from an older Flink version: {}",
-                                       getClass().getSimpleName(), subtaskIdx,
-                                       
restoredFromLegacyProcessingTimeTimers.size(),
-                                       restoredFromLegacyProcessingTimeTimers);
-                       }
-               }
-       }
-
-       public void reregisterStateFromLegacyWindowOperator() {
-               // if we restore from an older version,
-               // we have to re-register the recovered state.
-
-               if (restoredFromLegacyEventTimeTimers != null && 
!restoredFromLegacyEventTimeTimers.isEmpty()) {
-
-                       LOG.info("{} (taskIdx={}) re-registering event-time 
timers from an older Flink version.",
-                               getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask());
-
-                       for (Timer<K, W> timer : 
restoredFromLegacyEventTimeTimers) {
-                               setCurrentKey(timer.key);
-                               
internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
-                       }
-               }
-
-               if (restoredFromLegacyProcessingTimeTimers != null && 
!restoredFromLegacyProcessingTimeTimers.isEmpty()) {
-
-                       LOG.info("{} (taskIdx={}) re-registering 
processing-time timers from an older Flink version.",
-                               getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask());
-
-                       for (Timer<K, W> timer : 
restoredFromLegacyProcessingTimeTimers) {
-                               setCurrentKey(timer.key);
-                               
internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
-                       }
-               }
-
-               // gc friendliness
-               restoredFromLegacyEventTimeTimers = null;
-               restoredFromLegacyProcessingTimeTimers = null;
-       }
-
-       public void reregisterStateFromLegacyAlignedWindowOperator() throws 
Exception {
-               if (restoredFromLegacyAlignedOpRecords != null && 
!restoredFromLegacyAlignedOpRecords.isEmpty()) {
-
-                       LOG.info("{} (taskIdx={}) re-registering timers from 
legacy {} from an older Flink version.",
-                               getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask(), legacyWindowOperatorType);
-
-                       while (!restoredFromLegacyAlignedOpRecords.isEmpty()) {
-                               StreamRecord<IN> record = 
restoredFromLegacyAlignedOpRecords.poll();
-                               
setCurrentKey(keySelector.getKey(record.getValue()));
-                               processElement(record);
-                       }
-               }
-
-               // gc friendliness
-               restoredFromLegacyAlignedOpRecords = null;
-       }
-
-       // 
------------------------------------------------------------------------
        // Getters for testing
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 1dc0ee2..d0ab60a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import 
org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 
@@ -292,9 +291,6 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
                if (configSnapshot instanceof 
StreamElementSerializerConfigSnapshot) {
                        previousTypeSerializerAndConfig =
                                ((StreamElementSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
-               } else if (configSnapshot instanceof 
MultiplexingStreamRecordSerializer.MultiplexingStreamRecordSerializerConfigSnapshot)
 {
-                       previousTypeSerializerAndConfig =
-                               
((MultiplexingStreamRecordSerializer.MultiplexingStreamRecordSerializerConfigSnapshot)
 configSnapshot).getSingleNestedSerializerAndConfig();
                } else {
                        return CompatibilityResult.requiresMigration();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
index 4914075..0b03b79 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.Collection;
@@ -37,8 +36,6 @@ public class OperatorStateHandles {
 
        private final int operatorChainIndex;
 
-       private final StreamStateHandle legacyOperatorState;
-
        private final Collection<KeyedStateHandle> managedKeyedState;
        private final Collection<KeyedStateHandle> rawKeyedState;
        private final Collection<OperatorStateHandle> managedOperatorState;
@@ -46,24 +43,18 @@ public class OperatorStateHandles {
 
        public OperatorStateHandles(
                        int operatorChainIndex,
-                       StreamStateHandle legacyOperatorState,
                        Collection<KeyedStateHandle> managedKeyedState,
                        Collection<KeyedStateHandle> rawKeyedState,
                        Collection<OperatorStateHandle> managedOperatorState,
                        Collection<OperatorStateHandle> rawOperatorState) {
 
                this.operatorChainIndex = operatorChainIndex;
-               this.legacyOperatorState = legacyOperatorState;
                this.managedKeyedState = managedKeyedState;
                this.rawKeyedState = rawKeyedState;
                this.managedOperatorState = managedOperatorState;
                this.rawOperatorState = rawOperatorState;
        }
 
-       public StreamStateHandle getLegacyOperatorState() {
-               return legacyOperatorState;
-       }
-
        public Collection<KeyedStateHandle> getManagedKeyedState() {
                return managedKeyedState;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1ba5fb1..310df4d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -44,8 +44,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -836,8 +834,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                private final Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress;
 
-               private Map<OperatorID, StreamStateHandle> 
nonPartitionedStateHandles;
-
                private final CheckpointMetaData checkpointMetaData;
                private final CheckpointMetrics checkpointMetrics;
 
@@ -848,7 +844,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                AsyncCheckpointRunnable(
                                StreamTask<?, ?> owner,
-                               Map<OperatorID, StreamStateHandle> 
nonPartitionedStateHandles,
                                Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress,
                                CheckpointMetaData checkpointMetaData,
                                CheckpointMetrics checkpointMetrics,
@@ -858,7 +853,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        this.operatorSnapshotsInProgress = 
Preconditions.checkNotNull(operatorSnapshotsInProgress);
                        this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
                        this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
-                       this.nonPartitionedStateHandles = 
nonPartitionedStateHandles;
                        this.asyncStartNanos = asyncStartNanos;
                }
 
@@ -876,7 +870,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        OperatorSnapshotResult 
snapshotInProgress = entry.getValue();
 
                                        OperatorSubtaskState 
operatorSubtaskState = new OperatorSubtaskState(
-                                               
nonPartitionedStateHandles.get(operatorID),
                                                
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()),
                                                
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()),
                                                
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()),
@@ -968,13 +961,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        }
                                }
 
-                               // discard non partitioned state handles
-                               try {
-                                       
StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles.values());
-                               } catch (Exception discardException) {
-                                       exception = 
ExceptionUtils.firstOrSuppressed(discardException, exception);
-                               }
-
                                if (null != exception) {
                                        throw exception;
                                }
@@ -1008,7 +994,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                // ------------------------
 
-               private final Map<OperatorID, StreamStateHandle> 
nonPartitionedStates;
                private final Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress;
 
                public CheckpointingOperation(
@@ -1022,7 +1007,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        this.checkpointOptions = 
Preconditions.checkNotNull(checkpointOptions);
                        this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
                        this.allOperators = 
owner.operatorChain.getAllOperators();
-                       this.nonPartitionedStates = new 
HashMap<>(allOperators.length);
                        this.operatorSnapshotsInProgress = new 
HashMap<>(allOperators.length);
                }
 
@@ -1068,18 +1052,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                                }
                                        }
 
-                                       // Cleanup non partitioned state handles
-                                       for (StreamStateHandle 
nonPartitionedState : nonPartitionedStates.values()) {
-                                               if (nonPartitionedState != 
null) {
-                                                       try {
-                                                               
nonPartitionedState.discardState();
-                                                       } catch (Exception e) {
-                                                               LOG.warn("Could 
not properly discard a non partitioned " +
-                                                                       "state. 
This might leave some orphaned files behind.", e);
-                                                       }
-                                               }
-                                       }
-
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("{} - did NOT finish 
synchronous part of checkpoint {}." +
                                                                "Alignment 
duration: {} ms, snapshot duration {} ms",
@@ -1094,20 +1066,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                @SuppressWarnings("deprecation")
                private void checkpointStreamOperator(StreamOperator<?> op) 
throws Exception {
                        if (null != op) {
-                               // first call the legacy checkpoint code paths
-                               StreamStateHandle legacyOperatorState = 
op.snapshotLegacyOperatorState(
-                                       checkpointMetaData.getCheckpointId(),
-                                       checkpointMetaData.getTimestamp(),
-                                       checkpointOptions);
-
-                               OperatorID operatorID = op.getOperatorID();
-                               nonPartitionedStates.put(operatorID, 
legacyOperatorState);
 
                                OperatorSnapshotResult snapshotInProgress = 
op.snapshotState(
                                                
checkpointMetaData.getCheckpointId(),
                                                
checkpointMetaData.getTimestamp(),
                                                checkpointOptions);
-                               operatorSnapshotsInProgress.put(operatorID, 
snapshotInProgress);
+                               
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
                        }
                }
 
@@ -1115,7 +1079,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                        AsyncCheckpointRunnable asyncCheckpointRunnable = new 
AsyncCheckpointRunnable(
                                        owner,
-                                       nonPartitionedStates,
                                        operatorSnapshotsInProgress,
                                        checkpointMetaData,
                                        checkpointMetrics,

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index ff5f589..4ed689d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -65,7 +64,6 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                        "UDF::open",
                        "OPERATOR::run",
                        "UDF::run",
-                       "OPERATOR::snapshotLegacyOperatorState",
                        "OPERATOR::snapshotState",
                        "OPERATOR::close",
                        "UDF::close",
@@ -93,7 +91,6 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                        "setup[class 
org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
                        "org.apache.flink.streaming.api.graph.StreamConfig, 
interface " +
                        "org.apache.flink.streaming.api.operators.Output], " +
-                       "snapshotLegacyOperatorState[long, long, class 
org.apache.flink.runtime.checkpoint.CheckpointOptions], " +
                        "snapshotState[long, long, class 
org.apache.flink.runtime.checkpoint.CheckpointOptions]]";
 
        private static final String ALL_METHODS_RICH_FUNCTION = "[close[], 
getIterationRuntimeContext[], getRuntimeContext[]" +
@@ -207,7 +204,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
        }
 
        private static class LifecycleTrackingStreamSource<OUT, SRC extends 
SourceFunction<OUT>>
-                       extends StreamSource<OUT, SRC> implements Serializable, 
StreamCheckpointedOperator {
+                       extends StreamSource<OUT, SRC> implements Serializable {
 
                private static final long serialVersionUID = 
2431488948886850562L;
                private transient Thread testCheckpointer;
@@ -266,12 +263,6 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                }
 
                @Override
-               public StreamStateHandle snapshotLegacyOperatorState(long 
checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
Exception {
-                       
ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
-                       return super.snapshotLegacyOperatorState(checkpointId, 
timestamp, checkpointOptions);
-               }
-
-               @Override
                public void initializeState(StateInitializationContext context) 
throws Exception {
                        ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState");
                        super.initializeState(context);

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
deleted file mode 100644
index 7dba4af..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.util.ListCollector;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyedStateStore;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.ByteSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
-import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests for {@link FoldApplyProcessWindowFunction}.
- */
-public class FoldApplyProcessWindowFunctionTest {
-
-       /**
-        * Tests that the FoldWindowFunction gets the output type serializer 
set by the
-        * StreamGraphGenerator and checks that the FoldWindowFunction computes 
the correct result.
-        */
-       @Test
-       public void testFoldWindowFunctionOutputTypeConfigurable() throws 
Exception{
-               StreamExecutionEnvironment env = new 
DummyStreamExecutionEnvironment();
-
-               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
-
-               int initValue = 1;
-
-               FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, 
Integer, Integer> foldWindowFunction = new FoldApplyProcessWindowFunction<>(
-                       initValue,
-                       new FoldFunction<Integer, Integer>() {
-                               @Override
-                               public Integer fold(Integer accumulator, 
Integer value) throws Exception {
-                                       return accumulator + value;
-                               }
-
-                       },
-                       new ProcessWindowFunction<Integer, Integer, Integer, 
TimeWindow>() {
-                               @Override
-                               public void process(Integer integer,
-                                                                       Context 
context,
-                                                                       
Iterable<Integer> input,
-                                                                       
Collector<Integer> out) throws Exception {
-                                       for (Integer in: input) {
-                                               out.collect(in);
-                                       }
-                               }
-                       },
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-
-               AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
-                       new 
InternalIterableProcessWindowFunction<>(foldWindowFunction),
-                       new KeySelector<Integer, Integer>() {
-                               private static final long serialVersionUID = 
-7951310554369722809L;
-
-                               @Override
-                               public Integer getKey(Integer value) throws 
Exception {
-                                       return value;
-                               }
-                       },
-                       IntSerializer.INSTANCE,
-                       IntSerializer.INSTANCE,
-                       3000,
-                       3000
-               );
-
-               SourceFunction<Integer> sourceFunction = new 
SourceFunction<Integer>(){
-
-                       private static final long serialVersionUID = 
8297735565464653028L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-
-                       }
-
-                       @Override
-                       public void cancel() {
-
-                       }
-               };
-
-               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
-
-               transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
-
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
-
-               List<Integer> result = new ArrayList<>();
-               List<Integer> input = new ArrayList<>();
-               List<Integer> expected = new ArrayList<>();
-
-               input.add(1);
-               input.add(2);
-               input.add(3);
-
-               for (int value : input) {
-                       initValue += value;
-               }
-
-               expected.add(initValue);
-
-               FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, 
Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
-                       @Override
-                       public TimeWindow window() {
-                               return new TimeWindow(0, 1);
-                       }
-
-                       @Override
-                       public long currentProcessingTime() {
-                               return 0;
-                       }
-
-                       @Override
-                       public long currentWatermark() {
-                               return 0;
-                       }
-
-                       @Override
-                       public KeyedStateStore windowState() {
-                               return new DummyKeyedStateStore();
-                       }
-
-                       @Override
-                       public KeyedStateStore globalState() {
-                               return new DummyKeyedStateStore();
-                       }
-               };
-
-               foldWindowFunction.open(new Configuration());
-
-               foldWindowFunction.process(0, ctx, input, new 
ListCollector<>(result));
-
-               Assert.assertEquals(expected, result);
-       }
-
-               /**
-        * Tests that the FoldWindowFunction gets the output type serializer 
set by the
-        * StreamGraphGenerator and checks that the FoldWindowFunction computes 
the correct result.
-        */
-       @Test
-       public void testFoldAllWindowFunctionOutputTypeConfigurable() throws 
Exception{
-               StreamExecutionEnvironment env = new 
DummyStreamExecutionEnvironment();
-
-               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
-
-               int initValue = 1;
-
-               FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, 
Integer> foldWindowFunction = new FoldApplyProcessAllWindowFunction<>(
-                       initValue,
-                       new FoldFunction<Integer, Integer>() {
-                               @Override
-                               public Integer fold(Integer accumulator, 
Integer value) throws Exception {
-                                       return accumulator + value;
-                               }
-
-                       },
-                       new ProcessAllWindowFunction<Integer, Integer, 
TimeWindow>() {
-                               @Override
-                               public void process(Context context,
-                                                                       
Iterable<Integer> input,
-                                                                       
Collector<Integer> out) throws Exception {
-                                       for (Integer in: input) {
-                                               out.collect(in);
-                                       }
-                               }
-                       },
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-
-               AccumulatingProcessingTimeWindowOperator<Byte, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
-                       new 
InternalIterableProcessAllWindowFunction<>(foldWindowFunction),
-                       new KeySelector<Integer, Byte>() {
-                               private static final long serialVersionUID = 
-7951310554369722809L;
-
-                               @Override
-                               public Byte getKey(Integer value) throws 
Exception {
-                                       return 0;
-                               }
-                       },
-                       ByteSerializer.INSTANCE,
-                       IntSerializer.INSTANCE,
-                       3000,
-                       3000
-               );
-
-               SourceFunction<Integer> sourceFunction = new 
SourceFunction<Integer>(){
-
-                       private static final long serialVersionUID = 
8297735565464653028L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-
-                       }
-
-                       @Override
-                       public void cancel() {
-
-                       }
-               };
-
-               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
-
-               transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
-
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
-
-               List<Integer> result = new ArrayList<>();
-               List<Integer> input = new ArrayList<>();
-               List<Integer> expected = new ArrayList<>();
-
-               input.add(1);
-               input.add(2);
-               input.add(3);
-
-               for (int value : input) {
-                       initValue += value;
-               }
-
-               expected.add(initValue);
-
-               FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, 
Integer>.Context ctx = foldWindowFunction.new Context() {
-                       @Override
-                       public TimeWindow window() {
-                               return new TimeWindow(0, 1);
-                       }
-
-                       @Override
-                       public KeyedStateStore windowState() {
-                               return new DummyKeyedStateStore();
-                       }
-
-                       @Override
-                       public KeyedStateStore globalState() {
-                               return new DummyKeyedStateStore();
-                       }
-               };
-
-               foldWindowFunction.open(new Configuration());
-
-               foldWindowFunction.process(ctx, input, new 
ListCollector<>(result));
-
-               Assert.assertEquals(expected, result);
-       }
-
-       private static class DummyKeyedStateStore implements KeyedStateStore {
-
-               @Override
-               public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
-                       return null;
-               }
-
-               @Override
-               public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
-                       return null;
-               }
-
-               @Override
-               public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
-                       return null;
-               }
-
-               @Override
-               public <T, ACC> FoldingState<T, ACC> 
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-                       return null;
-               }
-
-               @Override
-               public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
-                       return null;
-               }
-       }
-
-       private static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
-
-               @Override
-               public JobExecutionResult execute(String jobName) throws 
Exception {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
deleted file mode 100644
index 7cf18dd..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.util.ListCollector;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests for {@link FoldApplyWindowFunction}.
- */
-public class FoldApplyWindowFunctionTest {
-
-       /**
-        * Tests that the FoldWindowFunction gets the output type serializer 
set by the
-        * StreamGraphGenerator and checks that the FoldWindowFunction computes 
the correct result.
-        */
-       @Test
-       public void testFoldWindowFunctionOutputTypeConfigurable() throws 
Exception{
-               StreamExecutionEnvironment env = new 
DummyStreamExecutionEnvironment();
-
-               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
-
-               int initValue = 1;
-
-               FoldApplyWindowFunction<Integer, TimeWindow, Integer, Integer, 
Integer> foldWindowFunction = new FoldApplyWindowFunction<>(
-                       initValue,
-                       new FoldFunction<Integer, Integer>() {
-                               private static final long serialVersionUID = 
-4849549768529720587L;
-
-                               @Override
-                               public Integer fold(Integer accumulator, 
Integer value) throws Exception {
-                                       return accumulator + value;
-                               }
-                       },
-                       new WindowFunction<Integer, Integer, Integer, 
TimeWindow>() {
-                               @Override
-                               public void apply(Integer integer,
-                                       TimeWindow window,
-                                       Iterable<Integer> input,
-                                       Collector<Integer> out) throws 
Exception {
-                                       for (Integer in: input) {
-                                               out.collect(in);
-                                       }
-                               }
-                       },
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-
-               AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
-                       new InternalIterableWindowFunction<>(
-                                       foldWindowFunction),
-                               new KeySelector<Integer, Integer>() {
-                                       private static final long 
serialVersionUID = -7951310554369722809L;
-
-                                       @Override
-                                       public Integer getKey(Integer value) 
throws Exception {
-                                               return value;
-                                       }
-                               },
-                               IntSerializer.INSTANCE,
-                               IntSerializer.INSTANCE,
-                               3000,
-                               3000
-               );
-
-               SourceFunction<Integer> sourceFunction = new 
SourceFunction<Integer>(){
-
-                       private static final long serialVersionUID = 
8297735565464653028L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-
-                       }
-
-                       @Override
-                       public void cancel() {
-
-                       }
-               };
-
-               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
-
-               transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
-
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
-
-               List<Integer> result = new ArrayList<>();
-               List<Integer> input = new ArrayList<>();
-               List<Integer> expected = new ArrayList<>();
-
-               input.add(1);
-               input.add(2);
-               input.add(3);
-
-               for (int value : input) {
-                       initValue += value;
-               }
-
-               expected.add(initValue);
-
-               foldWindowFunction.apply(0, new TimeWindow(0, 1), input, new 
ListCollector<Integer>(result));
-
-               Assert.assertEquals(expected, result);
-       }
-
-       private static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
-
-               @Override
-               public JobExecutionResult execute(String jobName) throws 
Exception {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 58898d8..6dd08f6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -437,7 +437,8 @@ public class StreamingJobGraphGeneratorNodeHashTest extends 
TestLogger {
                StreamGraph streamGraph = env.getStreamGraph();
                int idx = 1;
                for (JobVertex jobVertex : 
streamGraph.getJobGraph().getVertices()) {
-                       
Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(), 
userHashes.get(idx));
+                       List<JobVertexID> idAlternatives = 
jobVertex.getIdAlternatives();
+                       
Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), 
userHashes.get(idx));
                        --idx;
                }
        }

Reply via email to