http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index efbebf4..329ce18 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -24,23 +24,15 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Migration; - -import java.io.Serializable; import static java.util.Objects.requireNonNull; @@ -57,8 +49,7 @@ import static java.util.Objects.requireNonNull; @PublicEvolving public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> - implements OutputTypeConfigurable<OUT>, - StreamCheckpointedOperator { + implements OutputTypeConfigurable<OUT> { private static final long serialVersionUID = 1L; @@ -132,59 +123,6 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> // ------------------------------------------------------------------------ @Override - public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - if (userFunction instanceof Checkpointed) { - @SuppressWarnings("unchecked") - Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction; - - Serializable udfState; - try { - udfState = chkFunction.snapshotState(checkpointId, timestamp); - if (udfState != null) { - out.write(1); - InstantiationUtil.serializeObject(out, udfState); - } else { - out.write(0); - } - } catch (Exception e) { - throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e); - } - } - } - - @Override - public void restoreState(FSDataInputStream in) throws Exception { - boolean haveReadUdfStateFlag = false; - if (userFunction instanceof Checkpointed || - (userFunction instanceof CheckpointedRestoring)) { - @SuppressWarnings("unchecked") - CheckpointedRestoring<Serializable> chkFunction = (CheckpointedRestoring<Serializable>) userFunction; - - int hasUdfState = in.read(); - haveReadUdfStateFlag = true; - - if (hasUdfState == 1) { - Serializable functionState = InstantiationUtil.deserializeObject(in, getUserCodeClassloader()); - if (functionState != null) { - try { - chkFunction.restoreState(functionState); - } catch (Exception e) { - throw new Exception("Failed to restore state to function: " + e.getMessage(), e); - } - } - } - } - - if (in instanceof Migration && !haveReadUdfStateFlag) { - // absorb the introduced byte from the migration stream without too much further consequences - int hasUdfState = in.read(); - if (hasUdfState == 1) { - throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring"); - } - } - } - - @Override public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { super.notifyOfCompletedCheckpoint(checkpointId); @@ -219,23 +157,11 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> private void checkUdfCheckpointingPreconditions() { - boolean newCheckpointInferface = false; - - if (userFunction instanceof CheckpointedFunction) { - newCheckpointInferface = true; - } - - if (userFunction instanceof ListCheckpointed) { - if (newCheckpointInferface) { - throw new IllegalStateException("User functions are not allowed to implement " + - "CheckpointedFunction AND ListCheckpointed."); - } - newCheckpointInferface = true; - } + if (userFunction instanceof CheckpointedFunction + && userFunction instanceof ListCheckpointed) { - if (newCheckpointInferface && userFunction instanceof Checkpointed) { - throw new IllegalStateException("User functions are not allowed to implement Checkpointed AND " + - "CheckpointedFunction/ListCheckpointed."); + throw new IllegalStateException("User functions are not allowed to implement " + + "CheckpointedFunction AND ListCheckpointed."); } } }
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java deleted file mode 100644 index 33304e4..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/CheckpointedRestoringOperator.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.runtime.tasks.StreamTask; - -/** - * Interface for {@link StreamOperator StreamOperators} that can restore from a Flink 1.1 - * legacy snapshot that was done using the {@link StreamCheckpointedOperator} interface. - * - * @deprecated {@link Checkpointed} has been deprecated as well. This class can be - * removed when we remove that interface. - */ -@Deprecated -public interface CheckpointedRestoringOperator { - - /** - * Restores the operator state, if this operator's execution is recovering from a checkpoint. - * This method restores the operator state (if the operator is stateful) and the key/value state - * (if it had been used and was initialized when the snapshot occurred). - * - * <p>This method is called after {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} - * and before {@link StreamOperator#open()}. - * - * @param in The stream from which we have to restore our state. - * - * @throws Exception Exceptions during state restore should be forwarded, so that the system can - * properly react to failed state restore and fail the execution attempt. - */ - void restoreState(FSDataInputStream in) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java deleted file mode 100644 index 986e2b7..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.core.fs.FSDataOutputStream; - -/** - * @deprecated This interface is deprecated without replacement. - * All operators are now checkpointed. - */ -@Deprecated -public interface StreamCheckpointedOperator extends CheckpointedRestoringOperator { - - /** - * Called to draw a state snapshot from the operator. This method snapshots the operator state - * (if the operator is stateful). - * - * @param out The stream to which we have to write our state. - * @param checkpointId The ID of the checkpoint. - * @param timestamp The timestamp of the checkpoint. - * - * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator - * and the key/value state. - */ - void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 9d5e02b..38b4aee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -105,21 +104,6 @@ public interface StreamOperator<OUT> extends Serializable { CheckpointOptions checkpointOptions) throws Exception; /** - * Takes a snapshot of the legacy operator state defined via {@link StreamCheckpointedOperator}. - * - * @return The handle to the legacy operator state, or null, if no state was snapshotted. - * @throws Exception This method should forward any type of exception that happens during snapshotting. - * - * @deprecated This method will be removed as soon as no more operators use the legacy state code paths - */ - @SuppressWarnings("deprecation") - @Deprecated - StreamStateHandle snapshotLegacyOperatorState( - long checkpointId, - long timestamp, - CheckpointOptions checkpointOptions) throws Exception; - - /** * Provides state handles to restore the operator state. * * @param stateHandles state handles to the operator state. http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java deleted file mode 100644 index 252f997..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.assigners; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.windowing.time.Time; - -/** - * This is a special window assigner used to tell the system to use the - * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing. - * - * <p>Prior Flink versions used that operator automatically for simple processing time - * windows (tumbling and sliding) when no custom trigger and no evictor was specified. - * In the current Flink version, that operator is only used when programs explicitly - * specify this window assigner. This is only intended for special cases where programs relied on - * the better performance of the fast aligned window operator, and are willing to accept the lack - * of support for various features as indicated below: - * - * <ul> - * <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li> - * <li>The operator does not support key groups, meaning it cannot change the parallelism.</li> - * <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this - * operator.</li> - * </ul> - * - * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to - * the general window operator, so that future versions of Flink will not have the performance/functionality - * trade-off any more. - * - * <p>Note on implementation: The concrete operator instantiated by this assigner is either the - * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator} - * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}. - */ -@PublicEvolving -public final class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner { - - private static final long serialVersionUID = -6217477609512299842L; - - public TumblingAlignedProcessingTimeWindows(long size) { - super(size); - } - - /** - * Creates a new {@code TumblingAlignedProcessingTimeWindows} {@link WindowAssigner} that assigns - * elements to time windows based on the element timestamp. - * - * @param size The size of the generated windows. - */ - public static TumblingAlignedProcessingTimeWindows of(Time size) { - return new TumblingAlignedProcessingTimeWindows(size.toMilliseconds()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java deleted file mode 100644 index 83a7528..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; -import org.apache.flink.util.MathUtils; - -import org.apache.commons.math3.util.ArithmeticUtils; - -import static java.util.Objects.requireNonNull; - -/** - * Base class for special window operator implementation for windows that fire at the same time for - * all keys. - * - * @deprecated Deprecated in favour of the generic {@link WindowOperator}. This was an - * optimized implementation used for aligned windows. - */ -@Internal -@Deprecated -public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function> - extends AbstractUdfStreamOperator<OUT, F> - implements OneInputStreamOperator<IN, OUT>, ProcessingTimeCallback { - - private static final long serialVersionUID = 3245500864882459867L; - - private static final long MIN_SLIDE_TIME = 50; - - // ----- fields for operator parametrization ----- - - private final Function function; - private final KeySelector<IN, KEY> keySelector; - - private final TypeSerializer<KEY> keySerializer; - private final TypeSerializer<STATE> stateTypeSerializer; - - private final long windowSize; - private final long windowSlide; - private final long paneSize; - private final int numPanesPerWindow; - - // ----- fields for operator functionality ----- - - private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes; - - private transient TimestampedCollector<OUT> out; - - private transient RestoredState<IN, KEY, STATE, OUT> restoredState; - - private transient long nextEvaluationTime; - private transient long nextSlideTime; - - protected AbstractAlignedProcessingTimeWindowOperator( - F function, - KeySelector<IN, KEY> keySelector, - TypeSerializer<KEY> keySerializer, - TypeSerializer<STATE> stateTypeSerializer, - long windowLength, - long windowSlide) { - super(function); - - if (windowLength < MIN_SLIDE_TIME) { - throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs"); - } - if (windowSlide < MIN_SLIDE_TIME) { - throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs"); - } - if (windowLength < windowSlide) { - throw new IllegalArgumentException("The window size must be larger than the window slide"); - } - - final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide); - if (paneSlide < MIN_SLIDE_TIME) { - throw new IllegalArgumentException(String.format( - "Cannot compute window of size %d msecs sliding by %d msecs. " + - "The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide)); - } - - this.function = requireNonNull(function); - this.keySelector = requireNonNull(keySelector); - this.keySerializer = requireNonNull(keySerializer); - this.stateTypeSerializer = requireNonNull(stateTypeSerializer); - this.windowSize = windowLength; - this.windowSlide = windowSlide; - this.paneSize = paneSlide; - this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide); - } - - protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes( - KeySelector<IN, KEY> keySelector, Function function); - - // ------------------------------------------------------------------------ - // startup and shutdown - // ------------------------------------------------------------------------ - - @Override - public void open() throws Exception { - super.open(); - - out = new TimestampedCollector<>(output); - - // decide when to first compute the window and when to slide it - // the values should align with the start of time (that is, the UNIX epoch, not the big bang) - final long now = getProcessingTimeService().getCurrentProcessingTime(); - nextEvaluationTime = now + windowSlide - (now % windowSlide); - nextSlideTime = now + paneSize - (now % paneSize); - - final long firstTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); - - // check if we restored state and if we need to fire some windows based on that restored state - if (restoredState == null) { - // initial empty state: create empty panes that gather the elements per slide - panes = createPanes(keySelector, function); - } - else { - // restored state - panes = restoredState.panes; - - long nextPastEvaluationTime = restoredState.nextEvaluationTime; - long nextPastSlideTime = restoredState.nextSlideTime; - long nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime); - int numPanesRestored = panes.getNumPanes(); - - // fire windows from the past as long as there are more panes with data and as long - // as the missed trigger times have not caught up with the presence - while (numPanesRestored > 0 && nextPastTriggerTime < firstTriggerTime) { - // evaluate the window from the past - if (nextPastTriggerTime == nextPastEvaluationTime) { - computeWindow(nextPastTriggerTime); - nextPastEvaluationTime += windowSlide; - } - - // evaluate slide from the past - if (nextPastTriggerTime == nextPastSlideTime) { - panes.slidePanes(numPanesPerWindow); - numPanesRestored--; - nextPastSlideTime += paneSize; - } - - nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime); - } - } - - // make sure the first window happens - getProcessingTimeService().registerTimer(firstTriggerTime, this); - } - - @Override - public void close() throws Exception { - super.close(); - - // early stop the triggering thread, so it does not attempt to return any more data - stopTriggers(); - } - - @Override - public void dispose() throws Exception { - super.dispose(); - - // acquire the lock during shutdown, to prevent trigger calls at the same time - // fail-safe stop of the triggering thread (in case of an error) - stopTriggers(); - - // release the panes. panes may still be null if dispose is called - // after open() failed - if (panes != null) { - panes.dispose(); - } - } - - private void stopTriggers() { - // reset the action timestamps. this makes sure any pending triggers will not evaluate - nextEvaluationTime = -1L; - nextSlideTime = -1L; - } - - // ------------------------------------------------------------------------ - // Receiving elements and triggers - // ------------------------------------------------------------------------ - - @Override - public void processElement(StreamRecord<IN> element) throws Exception { - panes.addElementToLatestPane(element.getValue()); - } - - @Override - public void onProcessingTime(long timestamp) throws Exception { - // first we check if we actually trigger the window function - if (timestamp == nextEvaluationTime) { - // compute and output the results - computeWindow(timestamp); - - nextEvaluationTime += windowSlide; - } - - // check if we slide the panes by one. this may happen in addition to the - // window computation, or just by itself - if (timestamp == nextSlideTime) { - panes.slidePanes(numPanesPerWindow); - nextSlideTime += paneSize; - } - - long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); - getProcessingTimeService().registerTimer(nextTriggerTime, this); - } - - private void computeWindow(long timestamp) throws Exception { - out.setAbsoluteTimestamp(timestamp); - panes.truncatePanes(numPanesPerWindow); - panes.evaluateWindow(out, new TimeWindow(timestamp - windowSize, timestamp), this); - } - - // ------------------------------------------------------------------------ - // Checkpointing - // ------------------------------------------------------------------------ - - @Override - public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - super.snapshotState(out, checkpointId, timestamp); - - // we write the panes with the key/value maps into the stream, as well as when this state - // should have triggered and slided - - DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out); - - outView.writeLong(nextEvaluationTime); - outView.writeLong(nextSlideTime); - - panes.writeToOutput(outView, keySerializer, stateTypeSerializer); - - outView.flush(); - } - - @Override - public void restoreState(FSDataInputStream in) throws Exception { - super.restoreState(in); - - DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in); - - final long nextEvaluationTime = inView.readLong(); - final long nextSlideTime = inView.readLong(); - - AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = createPanes(keySelector, function); - - panes.readFromInput(inView, keySerializer, stateTypeSerializer); - - restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); - } - - // ------------------------------------------------------------------------ - // Property access (for testing) - // ------------------------------------------------------------------------ - - public long getWindowSize() { - return windowSize; - } - - public long getWindowSlide() { - return windowSlide; - } - - public long getPaneSize() { - return paneSize; - } - - public int getNumPanesPerWindow() { - return numPanesPerWindow; - } - - public long getNextEvaluationTime() { - return nextEvaluationTime; - } - - public long getNextSlideTime() { - return nextSlideTime; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')'; - } - - // ------------------------------------------------------------------------ - // ------------------------------------------------------------------------ - - private static final class RestoredState<IN, KEY, STATE, OUT> { - - final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes; - final long nextEvaluationTime; - final long nextSlideTime; - - RestoredState(AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes, long nextEvaluationTime, long nextSlideTime) { - this.panes = panes; - this.nextEvaluationTime = nextEvaluationTime; - this.nextSlideTime = nextSlideTime; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java deleted file mode 100644 index d67121a..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.runtime.state.ArrayListSerializer; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; - -import java.util.ArrayList; - -/** - * Special window operator implementation for windows that fire at the same time for all keys with - * accumulating windows. - * - * @deprecated Deprecated in favour of the generic {@link WindowOperator}. This was an - * optimized implementation used for aligned windows. - */ -@Internal -@Deprecated -public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> - extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> { - - private static final long serialVersionUID = 7305948082830843475L; - - public AccumulatingProcessingTimeWindowOperator( - InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function, - KeySelector<IN, KEY> keySelector, - TypeSerializer<KEY> keySerializer, - TypeSerializer<IN> valueSerializer, - long windowLength, - long windowSlide) { - super(function, keySelector, keySerializer, - new ArrayListSerializer<>(valueSerializer), windowLength, windowSlide); - } - - @Override - protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { - @SuppressWarnings("unchecked") - InternalWindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (InternalWindowFunction<Iterable<IN>, OUT, KEY, Window>) function; - - return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java deleted file mode 100644 index 6747383..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.functions.KeySelector; - -/** - * Special window operator implementation for windows that fire at the same time for all keys with - * aggregating windows. - * - * @deprecated Deprecated in favour of the generic {@link WindowOperator}. This was an - * optimized implementation used for aligned windows. - */ -@Internal -@Deprecated -public class AggregatingProcessingTimeWindowOperator<KEY, IN> - extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> { - - private static final long serialVersionUID = 7305948082830843475L; - - public AggregatingProcessingTimeWindowOperator( - ReduceFunction<IN> function, - KeySelector<IN, KEY> keySelector, - TypeSerializer<KEY> keySerializer, - TypeSerializer<IN> aggregateSerializer, - long windowLength, - long windowSlide) { - super(function, keySelector, keySerializer, aggregateSerializer, windowLength, windowSlide); - } - - @Override - protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) { - @SuppressWarnings("unchecked") - ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function; - - return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 880907d..b14739f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -41,17 +41,12 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.internal.InternalAppendingState; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMergingState; -import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -61,8 +56,6 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; @@ -70,16 +63,9 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.OutputTag; -import org.apache.flink.util.Preconditions; -import org.apache.commons.math3.util.ArithmeticUtils; - -import java.io.IOException; import java.io.Serializable; import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -180,34 +166,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> protected transient InternalTimerService<W> internalTimerService; - // ------------------------------------------------------------------------ - // State restored in case of migration from an older version (backwards compatibility) - // ------------------------------------------------------------------------ - - /** - * A flag indicating if we are migrating from a regular {@link WindowOperator} - * or one of the deprecated {@link AccumulatingProcessingTimeWindowOperator} and - * {@link AggregatingProcessingTimeWindowOperator}. - */ - private final LegacyWindowOperatorType legacyWindowOperatorType; - - /** - * The elements restored when migrating from an older, deprecated - * {@link AccumulatingProcessingTimeWindowOperator} or - * {@link AggregatingProcessingTimeWindowOperator}. */ - private transient PriorityQueue<StreamRecord<IN>> restoredFromLegacyAlignedOpRecords; - - /** - * The restored processing time timers when migrating from an - * older version of the {@link WindowOperator}. - */ - private transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers; - - /** The restored event time timer when migrating from an - * older version of the {@link WindowOperator}. - */ - private transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers; - /** * Creates a new {@code WindowOperator} based on the given policies and user functions. */ @@ -222,25 +180,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> long allowedLateness, OutputTag<IN> lateDataOutputTag) { - this(windowAssigner, windowSerializer, keySelector, keySerializer, - windowStateDescriptor, windowFunction, trigger, allowedLateness, lateDataOutputTag, LegacyWindowOperatorType.NONE); - } - - /** - * Creates a new {@code WindowOperator} based on the given policies and user functions. - */ - public WindowOperator( - WindowAssigner<? super IN, W> windowAssigner, - TypeSerializer<W> windowSerializer, - KeySelector<IN, K> keySelector, - TypeSerializer<K> keySerializer, - StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor, - InternalWindowFunction<ACC, OUT, K, W> windowFunction, - Trigger<? super IN, ? super W> trigger, - long allowedLateness, - OutputTag<IN> lateDataOutputTag, - LegacyWindowOperatorType legacyWindowOperatorType) { - super(windowFunction); checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner), @@ -261,7 +200,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> this.trigger = checkNotNull(trigger); this.allowedLateness = allowedLateness; this.lateDataOutputTag = lateDataOutputTag; - this.legacyWindowOperatorType = legacyWindowOperatorType; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -321,8 +259,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor); mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE); } - - registerRestoredLegacyStateState(); } @Override @@ -1037,256 +973,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } // ------------------------------------------------------------------------ - // Restoring / Migrating from an older Flink version. - // ------------------------------------------------------------------------ - - private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42; - - private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5; - - @Override - public void restoreState(FSDataInputStream in) throws Exception { - super.restoreState(in); - - LOG.info("{} (taskIdx={}) restoring {} state from an older Flink version.", - getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask()); - - DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in); - - switch (legacyWindowOperatorType) { - case NONE: - restoreFromLegacyWindowOperator(streamWrapper); - break; - case FAST_ACCUMULATING: - case FAST_AGGREGATING: - restoreFromLegacyAlignedWindowOperator(streamWrapper); - break; - } - } - - public void registerRestoredLegacyStateState() throws Exception { - - switch (legacyWindowOperatorType) { - case NONE: - reregisterStateFromLegacyWindowOperator(); - break; - case FAST_ACCUMULATING: - case FAST_AGGREGATING: - reregisterStateFromLegacyAlignedWindowOperator(); - break; - } - } - - private void restoreFromLegacyAlignedWindowOperator(DataInputViewStreamWrapper in) throws IOException { - Preconditions.checkArgument(legacyWindowOperatorType != LegacyWindowOperatorType.NONE); - - final long nextEvaluationTime = in.readLong(); - final long nextSlideTime = in.readLong(); - - validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, in.readInt()); - - restoredFromLegacyAlignedOpRecords = new PriorityQueue<>(42, - new Comparator<StreamRecord<IN>>() { - @Override - public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) { - return Long.compare(o1.getTimestamp(), o2.getTimestamp()); - } - } - ); - - switch (legacyWindowOperatorType) { - case FAST_ACCUMULATING: - restoreElementsFromLegacyAccumulatingAlignedWindowOperator(in, nextSlideTime); - break; - case FAST_AGGREGATING: - restoreElementsFromLegacyAggregatingAlignedWindowOperator(in, nextSlideTime); - break; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("{} (taskIdx={}) restored {} events from legacy {}.", - getClass().getSimpleName(), - getRuntimeContext().getIndexOfThisSubtask(), - restoredFromLegacyAlignedOpRecords.size(), - legacyWindowOperatorType); - } - } - - private void restoreElementsFromLegacyAccumulatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException { - int numPanes = in.readInt(); - final long paneSize = getPaneSize(); - long nextElementTimestamp = nextSlideTime - (numPanes * paneSize); - - @SuppressWarnings("unchecked") - ArrayListSerializer<IN> ser = new ArrayListSerializer<>((TypeSerializer<IN>) getStateDescriptor().getSerializer()); - - while (numPanes > 0) { - validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt()); - - nextElementTimestamp += paneSize - 1; // the -1 is so that the elements fall into the correct time-frame - - final int numElementsInPane = in.readInt(); - for (int i = numElementsInPane - 1; i >= 0; i--) { - K key = keySerializer.deserialize(in); - - @SuppressWarnings("unchecked") - List<IN> valueList = ser.deserialize(in); - for (IN record: valueList) { - restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(record, nextElementTimestamp)); - } - } - numPanes--; - } - } - - private void restoreElementsFromLegacyAggregatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException { - int numPanes = in.readInt(); - final long paneSize = getPaneSize(); - long nextElementTimestamp = nextSlideTime - (numPanes * paneSize); - - while (numPanes > 0) { - validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt()); - - nextElementTimestamp += paneSize - 1; // the -1 is so that the elements fall into the correct time-frame - - final int numElementsInPane = in.readInt(); - for (int i = numElementsInPane - 1; i >= 0; i--) { - K key = keySerializer.deserialize(in); - - @SuppressWarnings("unchecked") - IN value = (IN) getStateDescriptor().getSerializer().deserialize(in); - restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(value, nextElementTimestamp)); - } - numPanes--; - } - } - - private long getPaneSize() { - Preconditions.checkArgument( - legacyWindowOperatorType == LegacyWindowOperatorType.FAST_ACCUMULATING || - legacyWindowOperatorType == LegacyWindowOperatorType.FAST_AGGREGATING); - - final long paneSlide; - if (windowAssigner instanceof SlidingProcessingTimeWindows) { - SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner; - paneSlide = ArithmeticUtils.gcd(timeWindows.getSize(), timeWindows.getSlide()); - } else { - TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner; - paneSlide = timeWindows.getSize(); // this is valid as windowLength == windowSlide == timeWindows.getSize - } - return paneSlide; - } - - private static void validateMagicNumber(int expected, int found) throws IOException { - if (expected != found) { - throw new IOException("Corrupt state stream - wrong magic number. " + - "Expected '" + Integer.toHexString(expected) + - "', found '" + Integer.toHexString(found) + '\''); - } - } - - private void restoreFromLegacyWindowOperator(DataInputViewStreamWrapper in) throws IOException { - Preconditions.checkArgument(legacyWindowOperatorType == LegacyWindowOperatorType.NONE); - - int numWatermarkTimers = in.readInt(); - this.restoredFromLegacyEventTimeTimers = new PriorityQueue<>(Math.max(numWatermarkTimers, 1)); - - for (int i = 0; i < numWatermarkTimers; i++) { - K key = keySerializer.deserialize(in); - W window = windowSerializer.deserialize(in); - long timestamp = in.readLong(); - - Timer<K, W> timer = new Timer<>(timestamp, key, window); - restoredFromLegacyEventTimeTimers.add(timer); - } - - int numProcessingTimeTimers = in.readInt(); - this.restoredFromLegacyProcessingTimeTimers = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1)); - - for (int i = 0; i < numProcessingTimeTimers; i++) { - K key = keySerializer.deserialize(in); - W window = windowSerializer.deserialize(in); - long timestamp = in.readLong(); - - Timer<K, W> timer = new Timer<>(timestamp, key, window); - restoredFromLegacyProcessingTimeTimers.add(timer); - } - - // just to read all the rest, although we do not really use this information. - int numProcessingTimeTimerTimestamp = in.readInt(); - for (int i = 0; i < numProcessingTimeTimerTimestamp; i++) { - in.readLong(); - in.readInt(); - } - - if (LOG.isDebugEnabled()) { - int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); - - if (restoredFromLegacyEventTimeTimers != null && !restoredFromLegacyEventTimeTimers.isEmpty()) { - LOG.debug("{} (taskIdx={}) restored {} event time timers from an older Flink version: {}", - getClass().getSimpleName(), subtaskIdx, - restoredFromLegacyEventTimeTimers.size(), - restoredFromLegacyEventTimeTimers); - } - - if (restoredFromLegacyProcessingTimeTimers != null && !restoredFromLegacyProcessingTimeTimers.isEmpty()) { - LOG.debug("{} (taskIdx={}) restored {} processing time timers from an older Flink version: {}", - getClass().getSimpleName(), subtaskIdx, - restoredFromLegacyProcessingTimeTimers.size(), - restoredFromLegacyProcessingTimeTimers); - } - } - } - - public void reregisterStateFromLegacyWindowOperator() { - // if we restore from an older version, - // we have to re-register the recovered state. - - if (restoredFromLegacyEventTimeTimers != null && !restoredFromLegacyEventTimeTimers.isEmpty()) { - - LOG.info("{} (taskIdx={}) re-registering event-time timers from an older Flink version.", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); - - for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) { - setCurrentKey(timer.key); - internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp); - } - } - - if (restoredFromLegacyProcessingTimeTimers != null && !restoredFromLegacyProcessingTimeTimers.isEmpty()) { - - LOG.info("{} (taskIdx={}) re-registering processing-time timers from an older Flink version.", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); - - for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) { - setCurrentKey(timer.key); - internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp); - } - } - - // gc friendliness - restoredFromLegacyEventTimeTimers = null; - restoredFromLegacyProcessingTimeTimers = null; - } - - public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception { - if (restoredFromLegacyAlignedOpRecords != null && !restoredFromLegacyAlignedOpRecords.isEmpty()) { - - LOG.info("{} (taskIdx={}) re-registering timers from legacy {} from an older Flink version.", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), legacyWindowOperatorType); - - while (!restoredFromLegacyAlignedOpRecords.isEmpty()) { - StreamRecord<IN> record = restoredFromLegacyAlignedOpRecords.poll(); - setCurrentKey(keySelector.getKey(record.getValue())); - processElement(record); - } - } - - // gc friendliness - restoredFromLegacyAlignedOpRecords = null; - } - - // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index 1dc0ee2..d0ab60a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; @@ -292,9 +291,6 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) { previousTypeSerializerAndConfig = ((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - } else if (configSnapshot instanceof MultiplexingStreamRecordSerializer.MultiplexingStreamRecordSerializerConfigSnapshot) { - previousTypeSerializerAndConfig = - ((MultiplexingStreamRecordSerializer.MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); } else { return CompatibilityResult.requiresMigration(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java index 4914075..0b03b79 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.CollectionUtil; import java.util.Collection; @@ -37,8 +36,6 @@ public class OperatorStateHandles { private final int operatorChainIndex; - private final StreamStateHandle legacyOperatorState; - private final Collection<KeyedStateHandle> managedKeyedState; private final Collection<KeyedStateHandle> rawKeyedState; private final Collection<OperatorStateHandle> managedOperatorState; @@ -46,24 +43,18 @@ public class OperatorStateHandles { public OperatorStateHandles( int operatorChainIndex, - StreamStateHandle legacyOperatorState, Collection<KeyedStateHandle> managedKeyedState, Collection<KeyedStateHandle> rawKeyedState, Collection<OperatorStateHandle> managedOperatorState, Collection<OperatorStateHandle> rawOperatorState) { this.operatorChainIndex = operatorChainIndex; - this.legacyOperatorState = legacyOperatorState; this.managedKeyedState = managedKeyedState; this.rawKeyedState = rawKeyedState; this.managedOperatorState = managedOperatorState; this.rawOperatorState = rawOperatorState; } - public StreamStateHandle getLegacyOperatorState() { - return legacyOperatorState; - } - public Collection<KeyedStateHandle> getManagedKeyedState() { return managedKeyedState; } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 1ba5fb1..310df4d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -44,8 +44,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StateUtil; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -836,8 +834,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress; - private Map<OperatorID, StreamStateHandle> nonPartitionedStateHandles; - private final CheckpointMetaData checkpointMetaData; private final CheckpointMetrics checkpointMetrics; @@ -848,7 +844,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> AsyncCheckpointRunnable( StreamTask<?, ?> owner, - Map<OperatorID, StreamStateHandle> nonPartitionedStateHandles, Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, @@ -858,7 +853,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> this.operatorSnapshotsInProgress = Preconditions.checkNotNull(operatorSnapshotsInProgress); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); - this.nonPartitionedStateHandles = nonPartitionedStateHandles; this.asyncStartNanos = asyncStartNanos; } @@ -876,7 +870,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> OperatorSnapshotResult snapshotInProgress = entry.getValue(); OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - nonPartitionedStateHandles.get(operatorID), FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()), FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()), FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()), @@ -968,13 +961,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - // discard non partitioned state handles - try { - StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles.values()); - } catch (Exception discardException) { - exception = ExceptionUtils.firstOrSuppressed(discardException, exception); - } - if (null != exception) { throw exception; } @@ -1008,7 +994,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // ------------------------ - private final Map<OperatorID, StreamStateHandle> nonPartitionedStates; private final Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress; public CheckpointingOperation( @@ -1022,7 +1007,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions); this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.allOperators = owner.operatorChain.getAllOperators(); - this.nonPartitionedStates = new HashMap<>(allOperators.length); this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length); } @@ -1068,18 +1052,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - // Cleanup non partitioned state handles - for (StreamStateHandle nonPartitionedState : nonPartitionedStates.values()) { - if (nonPartitionedState != null) { - try { - nonPartitionedState.discardState(); - } catch (Exception e) { - LOG.warn("Could not properly discard a non partitioned " + - "state. This might leave some orphaned files behind.", e); - } - } - } - if (LOG.isDebugEnabled()) { LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." + "Alignment duration: {} ms, snapshot duration {} ms", @@ -1094,20 +1066,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @SuppressWarnings("deprecation") private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { if (null != op) { - // first call the legacy checkpoint code paths - StreamStateHandle legacyOperatorState = op.snapshotLegacyOperatorState( - checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp(), - checkpointOptions); - - OperatorID operatorID = op.getOperatorID(); - nonPartitionedStates.put(operatorID, legacyOperatorState); OperatorSnapshotResult snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); - operatorSnapshotsInProgress.put(operatorID, snapshotInProgress); + operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress); } } @@ -1115,7 +1079,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable( owner, - nonPartitionedStates, operatorSnapshotsInProgress, checkpointMetaData, checkpointMetrics, http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index ff5f589..4ed689d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; @@ -65,7 +64,6 @@ public class AbstractUdfStreamOperatorLifecycleTest { "UDF::open", "OPERATOR::run", "UDF::run", - "OPERATOR::snapshotLegacyOperatorState", "OPERATOR::snapshotState", "OPERATOR::close", "UDF::close", @@ -93,7 +91,6 @@ public class AbstractUdfStreamOperatorLifecycleTest { "setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " + "org.apache.flink.streaming.api.graph.StreamConfig, interface " + "org.apache.flink.streaming.api.operators.Output], " + - "snapshotLegacyOperatorState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions], " + "snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions]]"; private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" + @@ -207,7 +204,7 @@ public class AbstractUdfStreamOperatorLifecycleTest { } private static class LifecycleTrackingStreamSource<OUT, SRC extends SourceFunction<OUT>> - extends StreamSource<OUT, SRC> implements Serializable, StreamCheckpointedOperator { + extends StreamSource<OUT, SRC> implements Serializable { private static final long serialVersionUID = 2431488948886850562L; private transient Thread testCheckpointer; @@ -266,12 +263,6 @@ public class AbstractUdfStreamOperatorLifecycleTest { } @Override - public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { - ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState"); - return super.snapshotLegacyOperatorState(checkpointId, timestamp, checkpointOptions); - } - - @Override public void initializeState(StateInitializationContext context) throws Exception { ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState"); super.initializeState(context); http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java deleted file mode 100644 index 7dba4af..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.util.ListCollector; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.KeyedStateStore; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.ByteSerializer; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.graph.StreamGraphGenerator; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.api.transformations.SourceTransformation; -import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; -import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction; -import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction; -import org.apache.flink.util.Collector; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -/** - * Tests for {@link FoldApplyProcessWindowFunction}. - */ -public class FoldApplyProcessWindowFunctionTest { - - /** - * Tests that the FoldWindowFunction gets the output type serializer set by the - * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result. - */ - @Test - public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{ - StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment(); - - List<StreamTransformation<?>> transformations = new ArrayList<>(); - - int initValue = 1; - - FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessWindowFunction<>( - initValue, - new FoldFunction<Integer, Integer>() { - @Override - public Integer fold(Integer accumulator, Integer value) throws Exception { - return accumulator + value; - } - - }, - new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() { - @Override - public void process(Integer integer, - Context context, - Iterable<Integer> input, - Collector<Integer> out) throws Exception { - for (Integer in: input) { - out.collect(in); - } - } - }, - BasicTypeInfo.INT_TYPE_INFO - ); - - AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( - new InternalIterableProcessWindowFunction<>(foldWindowFunction), - new KeySelector<Integer, Integer>() { - private static final long serialVersionUID = -7951310554369722809L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }, - IntSerializer.INSTANCE, - IntSerializer.INSTANCE, - 3000, - 3000 - ); - - SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){ - - private static final long serialVersionUID = 8297735565464653028L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - - } - - @Override - public void cancel() { - - } - }; - - SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); - - transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); - - List<Integer> result = new ArrayList<>(); - List<Integer> input = new ArrayList<>(); - List<Integer> expected = new ArrayList<>(); - - input.add(1); - input.add(2); - input.add(3); - - for (int value : input) { - initValue += value; - } - - expected.add(initValue); - - FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() { - @Override - public TimeWindow window() { - return new TimeWindow(0, 1); - } - - @Override - public long currentProcessingTime() { - return 0; - } - - @Override - public long currentWatermark() { - return 0; - } - - @Override - public KeyedStateStore windowState() { - return new DummyKeyedStateStore(); - } - - @Override - public KeyedStateStore globalState() { - return new DummyKeyedStateStore(); - } - }; - - foldWindowFunction.open(new Configuration()); - - foldWindowFunction.process(0, ctx, input, new ListCollector<>(result)); - - Assert.assertEquals(expected, result); - } - - /** - * Tests that the FoldWindowFunction gets the output type serializer set by the - * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result. - */ - @Test - public void testFoldAllWindowFunctionOutputTypeConfigurable() throws Exception{ - StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment(); - - List<StreamTransformation<?>> transformations = new ArrayList<>(); - - int initValue = 1; - - FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessAllWindowFunction<>( - initValue, - new FoldFunction<Integer, Integer>() { - @Override - public Integer fold(Integer accumulator, Integer value) throws Exception { - return accumulator + value; - } - - }, - new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() { - @Override - public void process(Context context, - Iterable<Integer> input, - Collector<Integer> out) throws Exception { - for (Integer in: input) { - out.collect(in); - } - } - }, - BasicTypeInfo.INT_TYPE_INFO - ); - - AccumulatingProcessingTimeWindowOperator<Byte, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( - new InternalIterableProcessAllWindowFunction<>(foldWindowFunction), - new KeySelector<Integer, Byte>() { - private static final long serialVersionUID = -7951310554369722809L; - - @Override - public Byte getKey(Integer value) throws Exception { - return 0; - } - }, - ByteSerializer.INSTANCE, - IntSerializer.INSTANCE, - 3000, - 3000 - ); - - SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){ - - private static final long serialVersionUID = 8297735565464653028L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - - } - - @Override - public void cancel() { - - } - }; - - SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); - - transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); - - List<Integer> result = new ArrayList<>(); - List<Integer> input = new ArrayList<>(); - List<Integer> expected = new ArrayList<>(); - - input.add(1); - input.add(2); - input.add(3); - - for (int value : input) { - initValue += value; - } - - expected.add(initValue); - - FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() { - @Override - public TimeWindow window() { - return new TimeWindow(0, 1); - } - - @Override - public KeyedStateStore windowState() { - return new DummyKeyedStateStore(); - } - - @Override - public KeyedStateStore globalState() { - return new DummyKeyedStateStore(); - } - }; - - foldWindowFunction.open(new Configuration()); - - foldWindowFunction.process(ctx, input, new ListCollector<>(result)); - - Assert.assertEquals(expected, result); - } - - private static class DummyKeyedStateStore implements KeyedStateStore { - - @Override - public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { - return null; - } - - @Override - public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { - return null; - } - - @Override - public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { - return null; - } - - @Override - public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { - return null; - } - - @Override - public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { - return null; - } - } - - private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java deleted file mode 100644 index 7cf18dd..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.util.ListCollector; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.WindowFunction; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.graph.StreamGraphGenerator; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.api.transformations.SourceTransformation; -import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; -import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; -import org.apache.flink.util.Collector; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -/** - * Tests for {@link FoldApplyWindowFunction}. - */ -public class FoldApplyWindowFunctionTest { - - /** - * Tests that the FoldWindowFunction gets the output type serializer set by the - * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result. - */ - @Test - public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{ - StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment(); - - List<StreamTransformation<?>> transformations = new ArrayList<>(); - - int initValue = 1; - - FoldApplyWindowFunction<Integer, TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyWindowFunction<>( - initValue, - new FoldFunction<Integer, Integer>() { - private static final long serialVersionUID = -4849549768529720587L; - - @Override - public Integer fold(Integer accumulator, Integer value) throws Exception { - return accumulator + value; - } - }, - new WindowFunction<Integer, Integer, Integer, TimeWindow>() { - @Override - public void apply(Integer integer, - TimeWindow window, - Iterable<Integer> input, - Collector<Integer> out) throws Exception { - for (Integer in: input) { - out.collect(in); - } - } - }, - BasicTypeInfo.INT_TYPE_INFO - ); - - AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( - new InternalIterableWindowFunction<>( - foldWindowFunction), - new KeySelector<Integer, Integer>() { - private static final long serialVersionUID = -7951310554369722809L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }, - IntSerializer.INSTANCE, - IntSerializer.INSTANCE, - 3000, - 3000 - ); - - SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){ - - private static final long serialVersionUID = 8297735565464653028L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - - } - - @Override - public void cancel() { - - } - }; - - SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); - - transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); - - List<Integer> result = new ArrayList<>(); - List<Integer> input = new ArrayList<>(); - List<Integer> expected = new ArrayList<>(); - - input.add(1); - input.add(2); - input.add(3); - - for (int value : input) { - initValue += value; - } - - expected.add(initValue); - - foldWindowFunction.apply(0, new TimeWindow(0, 1), input, new ListCollector<Integer>(result)); - - Assert.assertEquals(expected, result); - } - - private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java index 58898d8..6dd08f6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java @@ -437,7 +437,8 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { StreamGraph streamGraph = env.getStreamGraph(); int idx = 1; for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) { - Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(), userHashes.get(idx)); + List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives(); + Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx)); --idx; } }