http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java new file mode 100644 index 0000000..15d1fd5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.state.checkpoint; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.streaming.state.MapState; +import org.apache.flink.streaming.state.OperatorState; + +public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> { + + private static final long serialVersionUID = 1L; + + protected Set<K> removedItems; + protected Map<K, V> updatedItems; + protected boolean clear; + + @SuppressWarnings("unchecked") + public MapCheckpoint(OperatorState<Map<K, V>> operatorState) { + if (operatorState instanceof MapState) { + MapState<K, V> mapState = (MapState<K, V>) operatorState; + + this.removedItems = mapState.getRemovedItems(); + this.clear = mapState.isCleared(); + + this.updatedItems = new HashMap<K, V>(); + for (K key : mapState.getUpdatedItems()) { + this.updatedItems.put(key, mapState.get(key)); + } + this.checkpointedState = this.updatedItems; + + } else { + throw new RuntimeException("MapCheckpoint can only be used with MapState"); + } + + } + + @SuppressWarnings("unchecked") + @Override + public StateCheckpoint<Map<K, V>> update(StateCheckpoint<Map<K, V>> nextCheckpoint) { + MapCheckpoint<K, V> mapCheckpoint = (MapCheckpoint<K, V>) nextCheckpoint; + if (this.checkpointedState == null) { + this.checkpointedState = mapCheckpoint.updatedItems; + } else { + if (mapCheckpoint.clear) { + this.checkpointedState.clear(); + } + for (Object key : mapCheckpoint.removedItems) { + this.checkpointedState.remove(key); + } + this.checkpointedState.putAll(mapCheckpoint.updatedItems); + } + return this; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java new file mode 100644 index 0000000..8b76245 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.state.checkpoint; + +import java.io.Serializable; + +import org.apache.flink.streaming.state.OperatorState; + +/** + * Base class for creating checkpoints for {@link OperatorState}. This + * checkpoints will be used to backup states in stateful Flink operators and + * also to restore them in case of node failure. To allow incremental + * checkpoints override the {@link #update(StateCheckpoint)} method. + * + * @param <T> + * The type of the state. + */ +public class StateCheckpoint<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + T checkpointedState; + + /** + * Creates a state checkpoint from the given {@link OperatorState} + * + * @param operatorState + * The {@link OperatorState} to checkpoint. + */ + public StateCheckpoint(OperatorState<T> operatorState) { + this.checkpointedState = operatorState.getState(); + } + + public StateCheckpoint() { + this.checkpointedState = null; + } + + /** + * Returns the state object for the checkpoint. + * + * @return The checkpointed state object. + */ + public T getCheckpointedState() { + return checkpointedState; + } + + /** + * Updates the checkpoint from next one. Override this method to allow + * incremental updates. + * + * @param nextCheckpoint + * The {@link StateCheckpoint} will be used to update from. + */ + public StateCheckpoint<T> update(StateCheckpoint<T> nextCheckpoint) { + this.checkpointedState = nextCheckpoint.getCheckpointedState(); + return this; + } + + @Override + public String toString() { + return checkpointedState.toString(); + } + + public boolean stateEquals(StateCheckpoint<T> other) { + return checkpointedState.equals(other.checkpointedState); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java new file mode 100755 index 0000000..691b111 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import akka.actor.ActorRef; + +public class ClusterUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class); + public static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job"; + + /** + * Executes the given JobGraph locally, on a FlinkMiniCluster + * + * @param jobGraph + * jobGraph + * @param degreeOfParallelism + * numberOfTaskTrackers + * @param memorySize + * memorySize + */ + public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism, long memorySize) + throws Exception { + + Configuration configuration = jobGraph.getJobConfiguration(); + + LocalFlinkMiniCluster exec = null; + + configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize); + configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, degreeOfParallelism); + if (LOG.isInfoEnabled()) { + LOG.info("Running on mini cluster"); + } + + try { + exec = new LocalFlinkMiniCluster(configuration, true); + ActorRef jobClient = exec.getJobClient(); + + JobClient.submitJobAndWait(jobGraph, true, jobClient, exec.timeout()); + + } catch (Exception e) { + throw e; + } finally { + if (exec != null) { + exec.stop(); + } + } + } + + public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception { + runOnMiniCluster(jobGraph, numOfSlots, -1); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java new file mode 100644 index 0000000..73e6360 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.keys; + +import java.lang.reflect.Array; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; + +public class KeySelectorUtil { + + public static Class<?>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class, Tuple3.class, + Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, + Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, + Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, + Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, + Tuple25.class }; + + public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo) { + int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); + int keyLength = logicalKeyPositions.length; + boolean[] orders = new boolean[keyLength]; + TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator( + logicalKeyPositions, orders, 0); + return new ComparableKeySelector<X>(comparator, keyLength); + } + + public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> { + + private static final long serialVersionUID = 1L; + + private TypeComparator<IN> comparator; + private int keyLength; + private Object[] keyArray; + private Tuple key; + + public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength) { + this.comparator = comparator; + this.keyLength = keyLength; + keyArray = new Object[keyLength]; + try { + key = (Tuple) tupleClasses[keyLength - 1].newInstance(); + } catch (Exception e) { + } + } + + @Override + public Tuple getKey(IN value) throws Exception { + comparator.extractKeys(value, keyArray, 0); + for (int i = 0; i < keyLength; i++) { + key.setField(keyArray[i], i); + } + return key; + } + + } + + public static class ArrayKeySelector<IN> implements KeySelector<IN, Tuple> { + + private static final long serialVersionUID = 1L; + + Tuple key; + int[] fields; + + public ArrayKeySelector(int... fields) { + this.fields = fields; + try { + key = (Tuple) tupleClasses[fields.length - 1].newInstance(); + } catch (Exception e) { + } + } + + @Override + public Tuple getKey(IN value) throws Exception { + for (int i = 0; i < fields.length; i++) { + int pos = fields[i]; + key.setField(Array.get(value, fields[pos]), i); + } + return key; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java new file mode 100644 index 0000000..49cd497 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType; +import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; +import org.apache.flink.streaming.api.function.aggregation.SumAggregator; +import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable; +import org.apache.flink.streaming.util.MockContext; +import org.apache.flink.streaming.util.keys.KeySelectorUtil; +import org.junit.Test; + +public class AggregationFunctionTest { + + @Test + public void groupSumIntegerTest() { + + List<Tuple2<Integer, Integer>> expectedSumList = new ArrayList<Tuple2<Integer, Integer>>(); + List<Tuple2<Integer, Integer>> expectedMinList = new ArrayList<Tuple2<Integer, Integer>>(); + List<Tuple2<Integer, Integer>> expectedMaxList = new ArrayList<Tuple2<Integer, Integer>>(); + List<Integer> expectedSumList0 = new ArrayList<Integer>(); + List<Integer> expectedMinList0 = new ArrayList<Integer>(); + List<Integer> expectedMaxList0 = new ArrayList<Integer>(); + List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<Tuple2<Integer, Integer>>(); + List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<Tuple2<Integer, Integer>>(); + List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>(); + + List<Integer> simpleInput = new ArrayList<Integer>(); + + int groupedSum0 = 0; + int groupedSum1 = 0; + int groupedSum2 = 0; + + for (int i = 0; i < 9; i++) { + simpleInput.add(i); + expectedSumList.add(new Tuple2<Integer, Integer>(i % 3, (i + 1) * i / 2)); + expectedMinList.add(new Tuple2<Integer, Integer>(i % 3, 0)); + expectedMaxList.add(new Tuple2<Integer, Integer>(i % 3, i)); + + expectedSumList0.add((i + 1) * i / 2); + expectedMaxList0.add(i); + expectedMinList0.add(0); + + int groupedSum; + switch (i % 3) { + case 0: + groupedSum = groupedSum0 += i; + break; + case 1: + groupedSum = groupedSum1 += i; + break; + default: + groupedSum = groupedSum2 += i; + break; + } + + expectedGroupSumList.add(new Tuple2<Integer, Integer>(i % 3, groupedSum)); + expectedGroupMinList.add(new Tuple2<Integer, Integer>(i % 3, i % 3)); + expectedGroupMaxList.add(new Tuple2<Integer, Integer>(i % 3, i)); + } + + TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor + .getForObject(new Tuple2<Integer, Integer>(0, 0)); + TypeInformation<Integer> type2 = TypeExtractor.getForObject(2); + + ReduceFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregator.getSumFunction(1, + Integer.class, type1); + ReduceFunction<Integer> sumFunction0 = SumAggregator + .getSumFunction(0, Integer.class, type2); + ReduceFunction<Tuple2<Integer, Integer>> minFunction = ComparableAggregator.getAggregator( + 1, type1, AggregationType.MIN); + ReduceFunction<Integer> minFunction0 = ComparableAggregator.getAggregator(0, type2, + AggregationType.MIN); + ReduceFunction<Tuple2<Integer, Integer>> maxFunction = ComparableAggregator.getAggregator( + 1, type1, AggregationType.MAX); + ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2, + AggregationType.MAX); + List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList()); + + List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList()); + + List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList()); + + TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor + .getForObject(new Tuple2<Integer, Integer>(1, 1)); + + KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, typeInfo), + typeInfo); + + List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute( + new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, keySelector), + getInputList()); + + List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute( + new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, keySelector), + getInputList()); + + List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute( + new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, keySelector), + getInputList()); + + assertEquals(expectedSumList, sumList); + assertEquals(expectedMinList, minList); + assertEquals(expectedMaxList, maxList); + assertEquals(expectedGroupSumList, groupedSumList); + assertEquals(expectedGroupMinList, groupedMinList); + assertEquals(expectedGroupMaxList, groupedMaxList); + assertEquals(expectedSumList0, MockContext.createAndExecute( + new StreamReduceInvokable<Integer>(sumFunction0), simpleInput)); + assertEquals(expectedMinList0, MockContext.createAndExecute( + new StreamReduceInvokable<Integer>(minFunction0), simpleInput)); + assertEquals(expectedMaxList0, MockContext.createAndExecute( + new StreamReduceInvokable<Integer>(maxFunction0), simpleInput)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + try { + env.generateSequence(1, 100).min(1); + fail(); + } catch (Exception e) { + // Nothing to do here + } + try { + env.generateSequence(1, 100).min(2); + fail(); + } catch (Exception e) { + // Nothing to do here + } + try { + env.generateSequence(1, 100).min(3); + fail(); + } catch (Exception e) { + // Nothing to do here + } + + ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator + .getAggregator(0, type1, AggregationType.MAXBY, true); + ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator + .getAggregator(0, type1, AggregationType.MAXBY, false); + + ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator + .getAggregator(0, type1, AggregationType.MINBY, true); + ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator + .getAggregator(0, type1, AggregationType.MINBY, false); + + List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>(); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + + List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>(); + maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8)); + + List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>(); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + + List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>(); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); + + assertEquals(maxByFirstExpected, MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst), + getInputList())); + assertEquals(maxByLastExpected, MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast), + getInputList())); + assertEquals(minByLastExpected, MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast), + getInputList())); + assertEquals(minByFirstExpected, MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst), + getInputList())); + + } + + @Test + public void minMaxByTest() { + TypeInformation<Tuple2<Integer, Integer>> type1 = TypeExtractor + .getForObject(new Tuple2<Integer, Integer>(0, 0)); + + ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = ComparableAggregator + .getAggregator(0, type1, AggregationType.MAXBY, true); + ReduceFunction<Tuple2<Integer, Integer>> maxByFunctionLast = ComparableAggregator + .getAggregator(0, type1, AggregationType.MAXBY, false); + + ReduceFunction<Tuple2<Integer, Integer>> minByFunctionFirst = ComparableAggregator + .getAggregator(0, type1, AggregationType.MINBY, true); + ReduceFunction<Tuple2<Integer, Integer>> minByFunctionLast = ComparableAggregator + .getAggregator(0, type1, AggregationType.MINBY, false); + + List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>(); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2)); + + List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>(); + maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5)); + maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8)); + + List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>(); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0)); + + List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>(); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); + minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6)); + + assertEquals(maxByFirstExpected, MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst), + getInputList())); + assertEquals(maxByLastExpected, MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast), + getInputList())); + assertEquals(minByLastExpected, MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast), + getInputList())); + assertEquals(minByFirstExpected, MockContext.createAndExecute( + new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst), + getInputList())); + } + + private List<Tuple2<Integer, Integer>> getInputList() { + ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<Tuple2<Integer, Integer>>(); + for (int i = 0; i < 9; i++) { + inputList.add(new Tuple2<Integer, Integer>(i % 3, i)); + } + return inputList; + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java new file mode 100644 index 0000000..6ad827a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import static org.junit.Assert.assertTrue; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.IterativeDataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class IterateTest { + + private static final long MEMORYSIZE = 32; + private static boolean iterated = false; + + public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Boolean value, Collector<Boolean> out) throws Exception { + if (value) { + iterated = true; + } else { + out.collect(value); + } + + } + + } + + public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Boolean value, Collector<Boolean> out) throws Exception { + out.collect(true); + + } + + } + + public static final class MySink implements SinkFunction<Boolean> { + + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Boolean tuple) { + } + + } + + @Test + public void test() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + + env.setBufferTimeout(10); + + DataStream<Boolean> source = env.fromElements(false, false, false); + + IterativeDataStream<Boolean> iteration = source.iterate(3000); + + DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap( + new IterationTail()); + + iteration.closeWith(increment).addSink(new MySink()); + + env.execute(); + + assertTrue(iterated); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java new file mode 100644 index 0000000..2486715 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class OutputSplitterTest { + + private static final long MEMORYSIZE = 32; + + private static ArrayList<Integer> splitterResult1 = new ArrayList<Integer>(); + private static ArrayList<Integer> splitterResult2 = new ArrayList<Integer>(); + + + private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>(); + + @SuppressWarnings("unchecked") + @Test + public void testOnMergedDataStream() throws Exception { + splitterResult1.clear(); + splitterResult2.clear(); + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + env.setBufferTimeout(1); + + DataStream<Integer> d1 = env.fromElements(0,2,4,6,8); + DataStream<Integer> d2 = env.fromElements(1,3,5,7,9); + + d1 = d1.merge(d2); + + d1.split(new OutputSelector<Integer>() { + private static final long serialVersionUID = 8354166915727490130L; + + @Override + public Iterable<String> select(Integer value) { + List<String> s = new ArrayList<String>(); + if (value > 4) { + s.add(">"); + } else { + s.add("<"); + } + return s; + } + }).select(">").addSink(new SinkFunction<Integer>() { + + private static final long serialVersionUID = 5827187510526388104L; + + @Override + public void invoke(Integer value) { + splitterResult1.add(value); + } + }); + + d1.split(new OutputSelector<Integer>() { + private static final long serialVersionUID = -6822487543355994807L; + + @Override + public Iterable<String> select(Integer value) { + List<String> s = new ArrayList<String>(); + if (value % 3 == 0) { + s.add("yes"); + } else { + s.add("no"); + } + return s; + } + }).select("yes").addSink(new SinkFunction<Integer>() { + private static final long serialVersionUID = -2674335071267854599L; + + @Override + public void invoke(Integer value) { + splitterResult2.add(value); + } + }); + env.execute(); + + Collections.sort(splitterResult1); + Collections.sort(splitterResult2); + + expectedSplitterResult.clear(); + expectedSplitterResult.addAll(Arrays.asList(5,6,7,8,9)); + assertEquals(expectedSplitterResult, splitterResult1); + + expectedSplitterResult.clear(); + expectedSplitterResult.addAll(Arrays.asList(0,3,6,9)); + assertEquals(expectedSplitterResult, splitterResult2); + } + + @Test + public void testOnSingleDataStream() throws Exception { + splitterResult1.clear(); + splitterResult2.clear(); + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + env.setBufferTimeout(1); + + DataStream<Integer> ds = env.fromElements(0,1,2,3,4,5,6,7,8,9); + + ds.split(new OutputSelector<Integer>() { + private static final long serialVersionUID = 2524335410904414121L; + + @Override + public Iterable<String> select(Integer value) { + List<String> s = new ArrayList<String>(); + if (value % 2 == 0) { + s.add("even"); + } else { + s.add("odd"); + } + return s; + } + }).select("even").addSink(new SinkFunction<Integer>() { + + private static final long serialVersionUID = -2995092337537209535L; + + @Override + public void invoke(Integer value) { + splitterResult1.add(value); + } + }); + + ds.split(new OutputSelector<Integer>() { + + private static final long serialVersionUID = -511693919586034092L; + + @Override + public Iterable<String> select(Integer value) { + List<String> s = new ArrayList<String>(); + if (value % 4 == 0) { + s.add("yes"); + } else { + s.add("no"); + } + return s; + } + }).select("yes").addSink(new SinkFunction<Integer>() { + + private static final long serialVersionUID = -1749077049727705424L; + + @Override + public void invoke(Integer value) { + splitterResult2.add(value); + } + }); + env.execute(); + + Collections.sort(splitterResult1); + Collections.sort(splitterResult2); + + expectedSplitterResult.clear(); + expectedSplitterResult.addAll(Arrays.asList(0,2,4,6,8)); + assertEquals(expectedSplitterResult, splitterResult1); + + expectedSplitterResult.clear(); + expectedSplitterResult.addAll(Arrays.asList(0,4,8)); + assertEquals(expectedSplitterResult, splitterResult2); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java new file mode 100755 index 0000000..757f6f6 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.junit.Test; + +public class PrintTest implements Serializable { + + private static final long serialVersionUID = 1L; + private static final long MEMORYSIZE = 32; + + private static final class IdentityMap implements MapFunction<Long, Long> { + private static final long serialVersionUID = 1L; + + @Override + public Long map(Long value) throws Exception { + return value; + } + } + + private static final class FilterAll implements FilterFunction<Long> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Long value) throws Exception { + return true; + } + } + + @Test + public void test() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print(); + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java new file mode 100644 index 0000000..9be7de6 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.streaming.api.function.source.FromElementsFunction; +import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction; +import org.apache.flink.streaming.util.MockCollector; +import org.apache.flink.streaming.util.MockSource; +import org.junit.Test; + +public class SourceTest { + + @Test + public void fromElementsTest() { + List<Integer> expectedList = Arrays.asList(1, 2, 3); + List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1, + 2, 3)); + assertEquals(expectedList, actualList); + } + + @Test + public void fromCollectionTest() { + List<Integer> expectedList = Arrays.asList(1, 2, 3); + List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>( + Arrays.asList(1, 2, 3))); + assertEquals(expectedList, actualList); + } + + @Test + public void socketTextStreamTest() throws Exception { + List<String> expectedList = Arrays.asList("a", "b", "c"); + List<String> actualList = new ArrayList<String>(); + + byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' }; + + Socket socket = mock(Socket.class); + when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data)); + when(socket.isClosed()).thenReturn(false); + when(socket.isConnected()).thenReturn(true); + + new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>( + actualList), socket); + assertEquals(expectedList, actualList); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java new file mode 100644 index 0000000..3da6b5f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.junit.Test; + +public class WindowCrossJoinTest implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final long MEMORYSIZE = 32; + + private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); + private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); + + private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); + private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); + + @Test + public void test() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + env.setBufferTimeout(1); + + ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>(); + ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>(); + + in1.add(new Tuple2<Integer, String>(10, "a")); + in1.add(new Tuple2<Integer, String>(20, "b")); + in1.add(new Tuple2<Integer, String>(20, "x")); + in1.add(new Tuple2<Integer, String>(0, "y")); + + in2.add(new Tuple1<Integer>(0)); + in2.add(new Tuple1<Integer>(5)); + in2.add(new Tuple1<Integer>(20)); + + joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(20, "b"), 20)); + joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(20, "x"), 20)); + joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(0, "y"), 0)); + + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(10, "a"), 0)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(10, "a"), 5)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(10, "a"), 20)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(20, "b"), 0)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(20, "b"), 5)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(20, "b"), 20)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(20, "x"), 0)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(20, "x"), 5)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(20, "x"), 20)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(0, "y"), 0)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(0, "y"), 5)); + crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( + new Tuple2<Integer, String>(0, "y"), 20)); + + DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1); + DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2); + + inStream1 + .join(inStream2) + .onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(), + new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0) + .addSink(new JoinResultSink()); + + inStream1 + .cross(inStream2) + .onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(), + new MyTimestamp<Tuple1<Integer>>(), 100) + .with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() { + + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> cross( + Tuple2<Integer, String> val1, Tuple1<Integer> val2) throws Exception { + return new Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>(val1, val2); + } + }).addSink(new CrossResultSink()); + + env.execute(); + + assertEquals(joinExpectedResults, joinResults); + assertEquals(crossExpectedResults, crossResults); + } + + private static class MyTimestamp<T> implements Timestamp<T> { + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(T value) { + return 101L; + } + } + + private static class JoinResultSink implements + SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) { + joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0)); + } + } + + private static class CrossResultSink implements + SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) { + crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java new file mode 100644 index 0000000..78cbbe5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector; + +import org.apache.flink.streaming.api.datastream.SplitDataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class DirectedOutputTest { + + private static final String TEN = "ten"; + private static final String ODD = "odd"; + private static final String ALL = "all"; + private static final String EVEN_AND_ODD = "evenAndOdd"; + private static final String ODD_AND_TEN = "oddAndTen"; + private static final String EVEN = "even"; + private static final String NON_SELECTED = "nonSelected"; + + static final class MyOutputSelector implements OutputSelector<Long> { + private static final long serialVersionUID = 1L; + + List<String> outputs = new ArrayList<String>(); + + @Override + public Iterable<String> select(Long value) { + outputs.clear(); + if (value % 2 == 0) { + outputs.add(EVEN); + } else { + outputs.add(ODD); + } + + if (value == 10L) { + outputs.add(TEN); + } + + if (value == 11L) { + outputs.add(NON_SELECTED); + } + return outputs; + } + } + + static final class ListSink implements SinkFunction<Long> { + private static final long serialVersionUID = 1L; + + private String name; + private transient List<Long> list; + + public ListSink(String name) { + this.name = name; + } + + @Override + public void invoke(Long value) { + list.add(value); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, + ClassNotFoundException { + in.defaultReadObject(); + outputs.put(name, new ArrayList<Long>()); + this.list = outputs.get(name); + } + } + + private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>(); + + @Test + public void outputSelectorTest() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(1, 128); + + SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector()); + source.select(EVEN).addSink(new ListSink(EVEN)); + source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN)); + source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD)); + source.addSink(new ListSink(ALL)); + + env.execute(); + assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN)); + assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN)); + assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), + outputs.get(EVEN_AND_ODD)); + assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java new file mode 100755 index 0000000..1615a45 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.junit.Test; + +public class OutputSelectorTest { + + static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> { + + private static final long serialVersionUID = 1L; + + @Override + public Iterable<String> select(Tuple1<Integer> tuple) { + + String[] outputs = new String[tuple.f0]; + + for (Integer i = 0; i < tuple.f0; i++) { + outputs[i] = i.toString(); + } + return Arrays.asList(outputs); + } + } + + @Test + public void testGetOutputs() { + OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector(); + List<String> expectedOutputs = new ArrayList<String>(); + expectedOutputs.add("0"); + expectedOutputs.add("1"); + assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2))); + expectedOutputs.add("2"); + assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3))); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java new file mode 100755 index 0000000..49b3bf8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector; + +import static org.junit.Assert.assertArrayEquals; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.streamvertex.MockRecordWriter; +import org.apache.flink.streaming.util.MockRecordWriterFactory; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class StreamCollectorTest { + + @Test + public void testCollect() { + MockRecordWriter recWriter = MockRecordWriterFactory.create(); + SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new SerializationDelegate<StreamRecord<Tuple1<Integer>>>( + null); + sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>())); + + Collector<Tuple1<Integer>> collector = new StreamOutput<Tuple1<Integer>>(recWriter, 2, sd); + collector.collect(new Tuple1<Integer>(3)); + collector.collect(new Tuple1<Integer>(4)); + collector.collect(new Tuple1<Integer>(5)); + collector.collect(new Tuple1<Integer>(6)); + + assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, recWriter.emittedRecords.toArray()); + } + + @Test + public void testClose() { + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java new file mode 100644 index 0000000..a91bd0c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.co.CoFlatMapFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable; +import org.apache.flink.streaming.util.MockCoContext; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class CoFlatMapTest implements Serializable { + private static final long serialVersionUID = 1L; + + private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap1(String value, Collector<String> coll) { + for (int i = 0; i < value.length(); i++) { + coll.collect(value.substring(i, i + 1)); + } + } + + @Override + public void flatMap2(Integer value, Collector<String> coll) { + coll.collect(value.toString()); + } + } + + @Test + public void coFlatMapTest() { + CoFlatMapInvokable<String, Integer, String> invokable = new CoFlatMapInvokable<String, Integer, String>( + new MyCoFlatMap()); + + List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h", + "e", "3", "4", "5"); + List<String> actualList = MockCoContext.createAndExecute(invokable, + Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5)); + + assertEquals(expectedList, actualList); + } + + @SuppressWarnings("unchecked") + @Test + public void multipleInputTest() { + LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); + + DataStream<Integer> ds1 = env.fromElements(1, 3, 5); + DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1); + + try { + ds1.forward().merge(ds2); + fail(); + } catch (RuntimeException e) { + // expected + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java new file mode 100644 index 0000000..273bbae --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.function.co.CoReduceFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable; +import org.apache.flink.streaming.util.MockCoContext; +import org.junit.Test; + +public class CoGroupedReduceTest { + + private final static class MyCoReduceFunction implements + CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1, + Tuple3<String, String, String> value2) { + return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2); + } + + @Override + public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1, + Tuple2<Integer, Integer> value2) { + return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1); + } + + @Override + public String map1(Tuple3<String, String, String> value) { + return value.f1; + } + + @Override + public String map2(Tuple2<Integer, Integer> value) { + return value.f1.toString(); + } + } + + @SuppressWarnings("unchecked") + @Test + public void coGroupedReduceTest() { + Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b"); + Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a"); + Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a"); + Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1); + Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2); + Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3); + Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4); + Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5); + + KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() { + + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple3<String, String, String> value) throws Exception { + return value.f0; + } + }; + + KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() { + + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Tuple2<Integer, Integer> value) throws Exception { + return value.f0; + } + }; + + KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() { + + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple3<String, String, String> value) throws Exception { + return value.f2; + } + }; + + CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>( + new MyCoReduceFunction(), keySelector0, keySelector1); + + List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5", + "7"); + + List<String> actualList = MockCoContext.createAndExecute(invokable, + Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5)); + + assertEquals(expected, actualList); + + invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>( + new MyCoReduceFunction(), keySelector2, keySelector1); + + expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7"); + + actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3), + Arrays.asList(int1, int2, int3, int4, int5)); + + assertEquals(expected, actualList); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java new file mode 100644 index 0000000..93d1741 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.streaming.api.function.co.CoMapFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable; +import org.apache.flink.streaming.util.MockCoContext; +import org.junit.Test; + +public class CoMapTest implements Serializable { + private static final long serialVersionUID = 1L; + + private final static class MyCoMap implements CoMapFunction<Double, Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map1(Double value) { + return value.toString(); + } + + @Override + public String map2(Integer value) { + return value.toString(); + } + } + + @Test + public void coMapTest() { + CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap()); + + List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5"); + List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3)); + + assertEquals(expectedList, actualList); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java new file mode 100755 index 0000000..3343ba0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.streaming.api.function.co.CoReduceFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable; +import org.apache.flink.streaming.util.MockCoContext; +import org.junit.Test; + +public class CoStreamReduceTest { + + public static class MyCoReduceFunction implements + CoReduceFunction<Integer, String, Integer> { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce1(Integer value1, Integer value2) { + return value1 * value2; + } + + @Override + public String reduce2(String value1, String value2) { + return value1 + value2; + } + + @Override + public Integer map1(Integer value) { + return value; + } + + @Override + public Integer map2(String value) { + return Integer.parseInt(value); + } + + } + + @Test + public void coStreamReduceTest() { + + CoReduceInvokable<Integer, String, Integer> coReduce = new CoReduceInvokable<Integer, String, Integer>( + new MyCoReduceFunction()); + + List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24); + List<Integer> result = MockCoContext.createAndExecute(coReduce, + Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8")); + + assertEquals(expected1, result); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java new file mode 100644 index 0000000..4ab3492 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.function.co.CoWindowFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; +import org.apache.flink.streaming.util.MockCoContext; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class CoWindowTest { + + public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> { + + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unused") + @Override + public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out) + throws Exception { + Integer count1 = 0; + for (Integer i : first) { + count1++; + } + Integer count2 = 0; + for (Integer i : second) { + count2++; + } + out.collect(count1); + out.collect(count2); + + } + + } + + public static final class MyCoGroup2 implements + CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> { + + private static final long serialVersionUID = 1L; + + @Override + public void coWindow(List<Tuple2<Integer, Integer>> first, + List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception { + + Set<Integer> firstElements = new HashSet<Integer>(); + for (Tuple2<Integer, Integer> value : first) { + firstElements.add(value.f1); + } + for (Tuple2<Integer, Integer> value : second) { + if (firstElements.contains(value.f1)) { + out.collect(value.f1); + } + } + + } + + } + + private static final class MyTS1 implements Timestamp<Integer> { + + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(Integer value) { + return value; + } + + } + + private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> { + + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(Tuple2<Integer, Integer> value) { + return value.f0; + } + + } + + @Test + public void coWindowGroupReduceTest2() throws Exception { + + CoWindowInvokable<Integer, Integer, Integer> invokable1 = new CoWindowInvokable<Integer, Integer, Integer>( + new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1), + new TimestampWrapper<Integer>(new MyTS1(), 1)); + + // Windowsize 2, slide 1 + // 1,2|2,3|3,4|4,5 + + List<Integer> input11 = new ArrayList<Integer>(); + input11.add(1); + input11.add(1); + input11.add(2); + input11.add(3); + input11.add(3); + + List<Integer> input12 = new ArrayList<Integer>(); + input12.add(1); + input12.add(2); + input12.add(3); + input12.add(3); + input12.add(5); + + // Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5) + // expected output: 3,2|3,3|2,2|0,1 + + List<Integer> expected1 = new ArrayList<Integer>(); + expected1.add(3); + expected1.add(2); + expected1.add(3); + expected1.add(3); + expected1.add(2); + expected1.add(2); + expected1.add(0); + expected1.add(1); + + List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12); + assertEquals(expected1, actual1); + + CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>( + new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), + 1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1)); + + // WindowSize 2, slide 3 + // 1,2|4,5|7,8| + + List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>(); + input21.add(new Tuple2<Integer, Integer>(1, 1)); + input21.add(new Tuple2<Integer, Integer>(1, 2)); + input21.add(new Tuple2<Integer, Integer>(2, 3)); + input21.add(new Tuple2<Integer, Integer>(3, 4)); + input21.add(new Tuple2<Integer, Integer>(3, 5)); + input21.add(new Tuple2<Integer, Integer>(4, 6)); + input21.add(new Tuple2<Integer, Integer>(4, 7)); + input21.add(new Tuple2<Integer, Integer>(5, 8)); + + List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>(); + input22.add(new Tuple2<Integer, Integer>(1, 1)); + input22.add(new Tuple2<Integer, Integer>(2, 0)); + input22.add(new Tuple2<Integer, Integer>(2, 2)); + input22.add(new Tuple2<Integer, Integer>(3, 9)); + input22.add(new Tuple2<Integer, Integer>(3, 4)); + input22.add(new Tuple2<Integer, Integer>(4, 10)); + input22.add(new Tuple2<Integer, Integer>(5, 8)); + input22.add(new Tuple2<Integer, Integer>(5, 7)); + + List<Integer> expected2 = new ArrayList<Integer>(); + expected2.add(1); + expected2.add(2); + expected2.add(8); + expected2.add(7); + + List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22); + assertEquals(expected2, actual2); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java new file mode 100644 index 0000000..969a06b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.streaming.util.MockContext; +import org.junit.Test; + +public class CounterInvokableTest { + + @Test + public void counterTest() { + CounterInvokable<String> invokable = new CounterInvokable<String>(); + + List<Long> expected = Arrays.asList(1L, 2L, 3L); + List<Long> actual = MockContext.createAndExecute(invokable, Arrays.asList("one", "two", "three")); + + assertEquals(expected, actual); + } +}