http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 88e619a..0d01733 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -36,10 +37,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; -import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; @@ -71,7 +71,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; * If an {@link Evictor} is specified it will be used to evict elements from the window after * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window. * When using an evictor window performance will degrade significantly, since - * pre-aggregation of window results cannot be used. + * incremental aggregation of window results cannot be used. * * <p> * Note that the {@code WindowedStream} is purely and API construct, during runtime @@ -120,7 +120,7 @@ public class WindowedStream<T, K, W extends Window> { * * <p> * Note: When using an evictor window performance will degrade significantly, since - * pre-aggregation of window results cannot be used. + * incremental aggregation of window results cannot be used. */ @PublicEvolving public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) { @@ -137,13 +137,14 @@ public class WindowedStream<T, K, W extends Window> { * Applies a reduce function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the reduce function is interpreted * as a regular non-windowed stream. + * * <p> - * This window will try and pre-aggregate data as much as the window policies permit. For example, - * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per - * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval, + * This window will try and incrementally aggregate data as much as the window policies permit. + * For example, tumbling time windows can aggregate the data, meaning that only one element per + * key is stored. Sliding time windows will aggregate on the granularity of the slide interval, * so a few elements are stored per key (one per slide interval). - * Custom windows may not be able to pre-aggregate, or may need to store extra values in an - * aggregation tree. + * Custom windows may not be able to incrementally aggregate, or may need to store extra values + * in an aggregation tree. * * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. @@ -159,48 +160,14 @@ public class WindowedStream<T, K, W extends Window> { function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); - String udfName = "Reduce at " + callLocation; + String udfName = "WindowedStream." + callLocation; SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName); if (result != null) { return result; } - String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; - KeySelector<T, K> keySel = input.getKeySelector(); - - OneInputStreamOperator<T, T> operator; - - boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; - - if (evictor != null) { - ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); - - operator = new EvictingWindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - stateDesc, - new ReduceIterableWindowFunction<K, W, T>(function), - trigger, - evictor).enableSetProcessingTime(setProcessingTime); - - } else { - ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", - function, - input.getType().createSerializer(getExecutionEnvironment().getConfig())); - - operator = new WindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - stateDesc, - new ReduceWindowFunction<K, W, T>(), - trigger).enableSetProcessingTime(setProcessingTime); - } - - return input.transform(opName, input.getType(), operator); + return apply(function, new PassThroughWindowFunction<K, W, T>()); } /** @@ -212,13 +179,15 @@ public class WindowedStream<T, K, W extends Window> { * @return The data stream that is the result of applying the fold function to the window. */ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) { - //clean the closure - function = input.getExecutionEnvironment().clean(function); + if (function instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + + "Please use apply(FoldFunction, WindowFunction) instead."); + } TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(), Utils.getCallLocationName(), true); - return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType); + return fold(initialValue, function, resultType); } /** @@ -230,9 +199,12 @@ public class WindowedStream<T, K, W extends Window> { * @return The data stream that is the result of applying the fold function to the window. */ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { - //clean the closure - function = input.getExecutionEnvironment().clean(function); - return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType); + if (function instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + + "Please use apply(FoldFunction, WindowFunction) instead."); + } + + return apply(initialValue, function, new PassThroughWindowFunction<K, W, R>(), resultType); } /** @@ -242,7 +214,7 @@ public class WindowedStream<T, K, W extends Window> { * * <p> * Not that this function requires that all data in the windows is buffered until the window - * is evaluated, as the function provides no means of pre-aggregation. + * is evaluated, as the function provides no means of incremental aggregation. * * @param function The window function. * @return The data stream that is the result of applying the window function to the window. @@ -263,7 +235,7 @@ public class WindowedStream<T, K, W extends Window> { * * <p> * Not that this function requires that all data in the windows is buffered until the window - * is evaluated, as the function provides no means of pre-aggregation. + * is evaluated, as the function provides no means of incremental aggregation. * * @param function The window function. * @param resultType Type information for the result type of the window function @@ -275,7 +247,7 @@ public class WindowedStream<T, K, W extends Window> { function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); - String udfName = "WindowApply at " + callLocation; + String udfName = "WindowedStream." + callLocation; SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName); if (result != null) { @@ -283,18 +255,19 @@ public class WindowedStream<T, K, W extends Window> { } - String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; + String opName; KeySelector<T, K> keySel = input.getKeySelector(); WindowOperator<K, T, Iterable<T>, R, W> operator; boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; - if (evictor != null) { ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, @@ -308,6 +281,8 @@ public class WindowedStream<T, K, W extends Window> { ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents", input.getType().createSerializer(getExecutionEnvironment().getConfig())); + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, @@ -326,9 +301,9 @@ public class WindowedStream<T, K, W extends Window> { * interpreted as a regular non-windowed stream. * * <p> - * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * Arriving data is incrementally aggregated using the given reducer. * - * @param reduceFunction The reduce function that is used for pre-aggregation + * @param reduceFunction The reduce function that is used for incremental aggregation. * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ @@ -347,16 +322,16 @@ public class WindowedStream<T, K, W extends Window> { * interpreted as a regular non-windowed stream. * * <p> - * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * Arriving data is incrementally aggregated using the given reducer. * - * @param reduceFunction The reduce function that is used for pre-aggregation + * @param reduceFunction The reduce function that is used for incremental aggregation. * @param function The window function. * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { if (reduceFunction instanceof RichFunction) { - throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction."); + throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); } //clean the closures @@ -364,9 +339,9 @@ public class WindowedStream<T, K, W extends Window> { reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); String callLocation = Utils.getCallLocationName(); - String udfName = "WindowApply at " + callLocation; + String udfName = "WindowedStream." + callLocation; - String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; + String opName; KeySelector<T, K> keySel = input.getKeySelector(); OneInputStreamOperator<T, R> operator; @@ -374,10 +349,11 @@ public class WindowedStream<T, K, W extends Window> { boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; if (evictor != null) { - ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, @@ -392,6 +368,8 @@ public class WindowedStream<T, K, W extends Window> { reduceFunction, input.getType().createSerializer(getExecutionEnvironment().getConfig())); + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, @@ -404,6 +382,96 @@ public class WindowedStream<T, K, W extends Window> { return input.transform(opName, resultType, operator); } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function) { + + TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), + Utils.getCallLocationName(), true); + + return apply(initialValue, foldFunction, function, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType) { + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); + } + + //clean the closures + function = input.getExecutionEnvironment().clean(function); + foldFunction = input.getExecutionEnvironment().clean(foldFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "WindowedStream." + callLocation; + + String opName; + KeySelector<T, K> keySel = input.getKeySelector(); + + OneInputStreamOperator<T, R> operator; + + boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + + if (evictor != null) { + + ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", + new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = new EvictingWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new FoldApplyWindowFunction<>(initialValue, foldFunction, function), + trigger, + evictor).enableSetProcessingTime(setProcessingTime); + + } else { + FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents", + initialValue, + foldFunction, + resultType); + + opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; + + operator = new WindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + function, + trigger).enableSetProcessingTime(setProcessingTime); + } + + return input.transform(opName, resultType, operator); + } + // ------------------------------------------------------------------------ // Aggregations on the keyed windows // ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java deleted file mode 100644 index 46f9b3c..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.windowing; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.operators.translation.WrappingFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.util.Collector; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -public class FoldAllWindowFunction<W extends Window, T, R> - extends WrappingFunction<FoldFunction<T, R>> - implements AllWindowFunction<Iterable<T>, R, W>, OutputTypeConfigurable<R> { - private static final long serialVersionUID = 1L; - - private byte[] serializedInitialValue; - private TypeSerializer<R> outSerializer; - private transient R initialValue; - - public FoldAllWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) { - super(reduceFunction); - this.initialValue = initialValue; - } - - @Override - public void open(Configuration configuration) throws Exception { - super.open(configuration); - - if (serializedInitialValue == null) { - throw new RuntimeException("No initial value was serialized for the fold " + - "window function. Probably the setOutputType method was not called."); - } - - ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); - DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); - initialValue = outSerializer.deserialize(in); - } - - @Override - public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception { - R result = outSerializer.copy(initialValue); - - for (T val: values) { - result = wrappedFunction.fold(result, val); - } - - out.collect(result); - } - - @Override - public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) { - outSerializer = outTypeInfo.createSerializer(executionConfig); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); - - try { - outSerializer.serialize(initialValue, out); - } catch (IOException ioe) { - throw new RuntimeException("Unable to serialize initial value of type " + - initialValue.getClass().getSimpleName() + " of fold window function.", ioe); - } - - serializedInitialValue = baos.toByteArray(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java new file mode 100644 index 0000000..7828a23 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class FoldApplyAllWindowFunction<W extends Window, T, ACC> + extends WrappingFunction<AllWindowFunction<ACC, ACC, W>> + implements AllWindowFunction<Iterable<T>, ACC, W>, OutputTypeConfigurable<ACC> { + + private static final long serialVersionUID = 1L; + + private final FoldFunction<T, ACC> foldFunction; + + private byte[] serializedInitialValue; + private TypeSerializer<ACC> accSerializer; + private transient ACC initialValue; + + public FoldApplyAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, ACC, W> windowFunction) { + super(windowFunction); + this.foldFunction = foldFunction; + this.initialValue = initialValue; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + + if (serializedInitialValue == null) { + throw new RuntimeException("No initial value was serialized for the fold " + + "window function. Probably the setOutputType method was not called."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); + initialValue = accSerializer.deserialize(in); + } + + @Override + public void apply(W window, Iterable<T> values, Collector<ACC> out) throws Exception { + ACC result = accSerializer.copy(initialValue); + + for (T val: values) { + result = foldFunction.fold(result, val); + } + + wrappedFunction.apply(window, result, out); + } + + @Override + public void setOutputType(TypeInformation<ACC> outTypeInfo, ExecutionConfig executionConfig) { + accSerializer = outTypeInfo.createSerializer(executionConfig); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + + try { + accSerializer.serialize(initialValue, out); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize initial value of type " + + initialValue.getClass().getSimpleName() + " of fold window function.", ioe); + } + + serializedInitialValue = baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java new file mode 100644 index 0000000..94356dc --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class FoldApplyWindowFunction<K, W extends Window, T, ACC> + extends WrappingFunction<WindowFunction<ACC, ACC, K, W>> + implements WindowFunction<Iterable<T>, ACC, K, W>, OutputTypeConfigurable<ACC> { + + private static final long serialVersionUID = 1L; + + private final FoldFunction<T, ACC> foldFunction; + + private byte[] serializedInitialValue; + private TypeSerializer<ACC> accSerializer; + private transient ACC initialValue; + + public FoldApplyWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, ACC, K, W> windowFunction) { + super(windowFunction); + this.foldFunction = foldFunction; + this.initialValue = initialValue; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + + if (serializedInitialValue == null) { + throw new RuntimeException("No initial value was serialized for the fold " + + "window function. Probably the setOutputType method was not called."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); + initialValue = accSerializer.deserialize(in); + } + + @Override + public void apply(K key, W window, Iterable<T> values, Collector<ACC> out) throws Exception { + ACC result = accSerializer.copy(initialValue); + + for (T val: values) { + result = foldFunction.fold(result, val); + } + + wrappedFunction.apply(key, window, result, out); + } + + @Override + public void setOutputType(TypeInformation<ACC> outTypeInfo, ExecutionConfig executionConfig) { + accSerializer = outTypeInfo.createSerializer(executionConfig); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + + try { + accSerializer.serialize(initialValue, out); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize initial value of type " + + initialValue.getClass().getSimpleName() + " of fold window function.", ioe); + } + + serializedInitialValue = baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java deleted file mode 100644 index db6d1bb..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.windowing; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.operators.translation.WrappingFunction; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.util.Collector; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; - -public class FoldWindowFunction<K, W extends Window, T, R> - extends WrappingFunction<FoldFunction<T, R>> - implements WindowFunction<Iterable<T>, R, K, W>, OutputTypeConfigurable<R> { - private static final long serialVersionUID = 1L; - - private byte[] serializedInitialValue; - private TypeSerializer<R> outSerializer; - private transient R initialValue; - - public FoldWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) { - super(reduceFunction); - this.initialValue = initialValue; - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - if (serializedInitialValue == null) { - throw new RuntimeException("No initial value was serialized for the fold " + - "window function. Probably the setOutputType method was not called."); - } - - ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); - DataInputViewStreamWrapper inStream = new DataInputViewStreamWrapper(bais); - initialValue = outSerializer.deserialize(inStream); - } - - @Override - public void apply(K k, W window, Iterable<T> values, Collector<R> out) throws Exception { - R result = outSerializer.copy(initialValue); - - for (T val: values) { - result = wrappedFunction.fold(result, val); - } - - out.collect(result); - } - - @Override - public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) { - outSerializer = outTypeInfo.createSerializer(executionConfig); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); - - try { - outSerializer.serialize(initialValue, out); - } catch (IOException ioe) { - throw new RuntimeException("Unable to serialize initial value of type " + - initialValue.getClass().getSimpleName() + " of fold window function.", ioe); - } - - serializedInitialValue = baos.toByteArray(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java new file mode 100644 index 0000000..3ac2e2c --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +public class PassThroughAllWindowFunction<W extends Window, T> implements AllWindowFunction<T, T, W> { + private static final long serialVersionUID = 1L; + + @Override + public void apply(W window, T input, Collector<T> out) throws Exception { + out.collect(input); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java new file mode 100644 index 0000000..254c489 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +public class PassThroughWindowFunction<K, W extends Window, T> implements WindowFunction<T, T, K, W> { + private static final long serialVersionUID = 1L; + + @Override + public void apply(K k, W window, T input, Collector<T> out) throws Exception { + out.collect(input); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java deleted file mode 100644 index 76b095b..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.functions.windowing; - -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.util.Collector; - -public class ReduceAllWindowFunction<W extends Window, T> extends RichAllWindowFunction<T, T, W> { - private static final long serialVersionUID = 1L; - - @Override - public void apply(W window, T input, Collector<T> out) throws Exception { - out.collect(input); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java deleted file mode 100644 index 8be4553b..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.functions.windowing; - -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.util.Collector; - -public class ReduceWindowFunction<K, W extends Window, T> implements WindowFunction<T, T, K, W> { - private static final long serialVersionUID = 1L; - - @Override - public void apply(K k, W window, T input, Collector<T> out) throws Exception { - out.collect(input); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java deleted file mode 100644 index fe42cd3..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.functions.windowing; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.util.Collector; - -public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichWindowFunction<T, Tuple2<W, T>, K, W> { - private static final long serialVersionUID = 1L; - - @Override - public void apply(K k, W window, T input, Collector<Tuple2<W, T>> out) throws Exception { - out.collect(Tuple2.of(window, input)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java new file mode 100644 index 0000000..bb91f2a --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; +import org.apache.flink.util.Collector; +import org.junit.Test; +import org.junit.Assert; + +import java.util.ArrayList; +import java.util.List; + +public class FoldApplyWindowFunctionTest { + + /** + * Tests that the FoldWindowFunction gets the output type serializer set by the + * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result. + */ + @Test + public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{ + StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment(); + + List<StreamTransformation<?>> transformations = new ArrayList<>(); + + int initValue = 1; + + FoldApplyWindowFunction<Integer, TimeWindow, Integer, Integer> foldWindowFunction = new FoldApplyWindowFunction<>( + initValue, + new FoldFunction<Integer, Integer>() { + private static final long serialVersionUID = -4849549768529720587L; + + @Override + public Integer fold(Integer accumulator, Integer value) throws Exception { + return accumulator + value; + } + }, + new WindowFunction<Integer, Integer, Integer, TimeWindow>() { + @Override + public void apply(Integer integer, + TimeWindow window, + Integer input, + Collector<Integer> out) throws Exception { + out.collect(input); + } + } + ); + + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( + foldWindowFunction, + new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = -7951310554369722809L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }, + IntSerializer.INSTANCE, + IntSerializer.INSTANCE, + 3000, + 3000 + ); + + SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){ + + private static final long serialVersionUID = 8297735565464653028L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + + } + + @Override + public void cancel() { + + } + }; + + SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<Integer>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); + + transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); + + StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); + + List<Integer> result = new ArrayList<>(); + List<Integer> input = new ArrayList<>(); + List<Integer> expected = new ArrayList<>(); + + input.add(1); + input.add(2); + input.add(3); + + for (int value : input) { + initValue += value; + } + + expected.add(initValue); + + foldWindowFunction.apply(0, new TimeWindow(0, 1), input, new ListCollector<Integer>(result)); + + Assert.assertEquals(expected, result); + } + + public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldWindowFunctionTest.java deleted file mode 100644 index 98e4d47..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldWindowFunctionTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.util.ListCollector; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.graph.StreamGraphGenerator; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.api.transformations.SourceTransformation; -import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; -import org.junit.Test; -import org.junit.Assert; - -import java.util.ArrayList; -import java.util.List; - -public class FoldWindowFunctionTest { - - /** - * Tests that the FoldWindowFunction gets the output type serializer set by the - * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result. - */ - @Test - public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{ - StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment(); - - List<StreamTransformation<?>> transformations = new ArrayList<>(); - - int initValue = 1; - - FoldWindowFunction<Integer, TimeWindow, Integer, Integer> foldWindowFunction = new FoldWindowFunction<>( - initValue, - new FoldFunction<Integer, Integer>() { - private static final long serialVersionUID = -4849549768529720587L; - - @Override - public Integer fold(Integer accumulator, Integer value) throws Exception { - return accumulator + value; - } - } - ); - - AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>( - foldWindowFunction, - new KeySelector<Integer, Integer>() { - private static final long serialVersionUID = -7951310554369722809L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }, - IntSerializer.INSTANCE, - IntSerializer.INSTANCE, - 3000, - 3000 - ); - - SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){ - - private static final long serialVersionUID = 8297735565464653028L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - - } - - @Override - public void cancel() { - - } - }; - - SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<Integer>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1); - - transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); - - List<Integer> result = new ArrayList<>(); - List<Integer> input = new ArrayList<>(); - List<Integer> expected = new ArrayList<>(); - - input.add(1); - input.add(2); - input.add(3); - - for (int value : input) { - initValue += value; - } - - expected.add(initValue); - - foldWindowFunction.apply(0, new TimeWindow(0, 1), input, new ListCollector<Integer>(result)); - - Assert.assertEquals(expected, result); - } - - public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 9d4a41a..c1111a0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; @@ -34,7 +35,6 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; @@ -140,7 +140,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(), + new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(), EventTimeTrigger.create()); operator.setInputType(inputType, new ExecutionConfig()); @@ -271,7 +271,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(), + new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(), EventTimeTrigger.create()); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -344,7 +344,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(), + new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(), ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -434,7 +434,7 @@ public class WindowOperatorTest { new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, - new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(), + new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(), PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse( http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java index 3a49331..3aa60dc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.runtime.state; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -120,6 +122,12 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { } @Override + protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + return null; + } + + @Override public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 8f0d785..e8d3e05 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -265,6 +265,66 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]) } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation folder. + * + * @param initialValue Initial value of the fold + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag]( + initialValue: R, + preAggregator: FoldFunction[T, R], + function: AllWindowFunction[R, R, W]): DataStream[R] = { + javaStream.apply( + initialValue, + clean(preAggregator), + clean(function), + implicitly[TypeInformation[R]]) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation folder. + * + * @param initialValue Initial value of the fold + * @param preAggregator The reduce function that is used for pre-aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag]( + initialValue: R, + preAggregator: (R, T) => R, + function: (W, R, Collector[R]) => Unit): DataStream[R] = { + if (function == null) { + throw new NullPointerException("Reduce function must not be null.") + } + if (function == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanFolder = clean(preAggregator) + val folder = new FoldFunction[T, R] { + def fold(v1: R, v2: T) = { cleanFolder(v1, v2) } + } + + val cleanApply = clean(function) + val applyFunction = new AllWindowFunction[R, R, W] { + def apply(window: W, input: R, out: Collector[R]): Unit = { + cleanApply(window, input, out) + } + } + javaStream.apply(initialValue, folder, applyFunction, implicitly[TypeInformation[R]]) + } + // ------------------------------------------------------------------------ // Aggregations on the keyed windows // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 6385831..22d24fa 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -272,6 +272,66 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]) } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold + * @param foldFunction The fold function that is used for incremental aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag]( + initialValue: R, + foldFunction: FoldFunction[T, R], + function: WindowFunction[R, R, K, W]): DataStream[R] = { + javaStream.apply( + initialValue, + clean(foldFunction), + clean(function), + implicitly[TypeInformation[R]]) + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is incrementally aggregated using the given fold function. + * + * @param foldFunction The fold function that is used for incremental aggregation + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def apply[R: TypeInformation: ClassTag]( + initialValue: R, + foldFunction: (R, T) => R, + function: (K, W, R, Collector[R]) => Unit): DataStream[R] = { + if (function == null) { + throw new NullPointerException("Fold function must not be null.") + } + if (function == null) { + throw new NullPointerException("WindowApply function must not be null.") + } + + val cleanFolder = clean(foldFunction) + val folder = new FoldFunction[T, R] { + def fold(acc: R, v: T) = { cleanFolder(acc, v) } + } + + val cleanApply = clean(function) + val applyFunction = new WindowFunction[R, R, K, W] { + def apply(key: K, window: W, input: R, out: Collector[R]): Unit = { + cleanApply(key, window, input, out) + } + } + javaStream.apply(initialValue, folder, applyFunction, implicitly[TypeInformation[R]]) + } + + // ------------------------------------------------------------------------ // Aggregations on the keyed windows // ------------------------------------------------------------------------