http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java new file mode 100644 index 0000000..403dd17 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.streaming.util.MockContext; +import org.junit.Test; + +public class FilterTest implements Serializable { + private static final long serialVersionUID = 1L; + + static class MyFilter implements FilterFunction<Integer> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Integer value) throws Exception { + return value % 2 == 0; + } + } + + @Test + public void test() { + FilterInvokable<Integer> invokable = new FilterInvokable<Integer>(new MyFilter()); + + List<Integer> expected = Arrays.asList(2, 4, 6); + List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + + assertEquals(expected, actual); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java new file mode 100644 index 0000000..7424e21 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.streaming.util.MockContext; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class FlatMapTest { + + public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Integer value, Collector<Integer> out) throws Exception { + if (value % 2 == 0) { + out.collect(value); + out.collect(value * value); + } + } + } + + @Test + public void flatMapTest() { + FlatMapInvokable<Integer, Integer> invokable = new FlatMapInvokable<Integer, Integer>(new MyFlatMap()); + + List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64); + List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)); + + assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java new file mode 100755 index 0000000..ce47c67 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.util.MockContext; +import org.junit.Test; + +public class GroupedReduceInvokableTest { + + private static class MyReducer implements ReduceFunction<Integer> { + + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + + } + + @Test + public void test() { + GroupedReduceInvokable<Integer> invokable1 = new GroupedReduceInvokable<Integer>( + new MyReducer(), new KeySelector<Integer, Integer>() { + + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }); + + List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3); + List<Integer> actual = MockContext.createAndExecute(invokable1, + Arrays.asList(1, 1, 2, 2, 3)); + + assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java new file mode 100644 index 0000000..f38d5c1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; +import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper; +import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; +import org.apache.flink.streaming.util.MockContext; +import org.junit.Test; + +public class GroupedWindowInvokableTest { + + KeySelector<Tuple2<Integer, String>, ?> keySelector = new KeySelector<Tuple2<Integer, String>, String>() { + + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple2<Integer, String> value) throws Exception { + return value.f1; + } + }; + + /** + * Tests that illegal arguments result in failure. The following cases are + * tested: 1) having no trigger 2) having no eviction 3) having neither + * eviction nor trigger 4) having both, central and distributed eviction. + */ + @Test + public void testGroupedWindowInvokableFailTest() { + + // create dummy reduce function + ReduceFunction<Object> userFunction = new ReduceFunction<Object>() { + + private static final long serialVersionUID = 1L; + + @Override + public Object reduce(Object value1, Object value2) throws Exception { + return null; + } + }; + + // create dummy keySelector + KeySelector<Object, Object> keySelector = new KeySelector<Object, Object>() { + + private static final long serialVersionUID = 1L; + + @Override + public Object getKey(Object value) throws Exception { + return null; + } + }; + + // create policy lists + LinkedList<CloneableEvictionPolicy<Object>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<Object>>(); + LinkedList<CloneableTriggerPolicy<Object>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<Object>>(); + LinkedList<EvictionPolicy<Object>> centralEvictionPolicies = new LinkedList<EvictionPolicy<Object>>(); + LinkedList<TriggerPolicy<Object>> centralTriggerPolicies = new LinkedList<TriggerPolicy<Object>>(); + + // empty trigger and policy lists should fail + try { + new GroupedWindowInvokable<Object, Object>(userFunction, keySelector, + distributedTriggerPolicies, distributedEvictionPolicies, + centralTriggerPolicies, centralEvictionPolicies); + fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (1)"); + } catch (UnsupportedOperationException e) { + // that's the expected case + } + + // null for trigger and policy lists should fail + try { + new GroupedWindowInvokable<Object, Object>(userFunction, keySelector, null, null, null, + null); + fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (2)"); + } catch (UnsupportedOperationException e) { + // that's the expected case + } + + // empty eviction should still fail + centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5)); + distributedTriggerPolicies.add(new CountTriggerPolicy<Object>(5)); + try { + new GroupedWindowInvokable<Object, Object>(userFunction, keySelector, + distributedTriggerPolicies, distributedEvictionPolicies, + centralTriggerPolicies, centralEvictionPolicies); + fail("Creating instance without any eviction policy should cause an UnsupportedOperationException but didn't. (3)"); + } catch (UnsupportedOperationException e) { + // that's the expected case + } + + // empty trigger should still fail + centralTriggerPolicies.clear(); + distributedTriggerPolicies.clear(); + centralEvictionPolicies.add(new CountEvictionPolicy<Object>(5)); + try { + new GroupedWindowInvokable<Object, Object>(userFunction, keySelector, + distributedTriggerPolicies, distributedEvictionPolicies, + centralTriggerPolicies, centralEvictionPolicies); + fail("Creating instance without any trigger policy should cause an UnsupportedOperationException but didn't. (4)"); + } catch (UnsupportedOperationException e) { + // that's the expected case + } + + // having both, central and distributed eviction, at the same time + // should fail + centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5)); + distributedEvictionPolicies.add(new CountEvictionPolicy<Object>(5)); + try { + new GroupedWindowInvokable<Object, Object>(userFunction, keySelector, + distributedTriggerPolicies, distributedEvictionPolicies, + centralTriggerPolicies, centralEvictionPolicies); + fail("Creating instance with central and distributed eviction should cause an UnsupportedOperationException but didn't. (4)"); + } catch (UnsupportedOperationException e) { + // that's the expected case + } + + } + + /** + * Test for not active distributed triggers with single field + */ + @Test + public void testGroupedWindowInvokableDistributedTriggerSimple() { + List<Integer> inputs = new ArrayList<Integer>(); + inputs.add(1); + inputs.add(1); + inputs.add(5); + inputs.add(5); + inputs.add(5); + inputs.add(1); + inputs.add(1); + inputs.add(5); + inputs.add(1); + inputs.add(5); + + List<Integer> expectedDistributedEviction = new ArrayList<Integer>(); + expectedDistributedEviction.add(15); + expectedDistributedEviction.add(3); + expectedDistributedEviction.add(3); + expectedDistributedEviction.add(15); + + List<Integer> expectedCentralEviction = new ArrayList<Integer>(); + expectedCentralEviction.add(2); + expectedCentralEviction.add(5); + expectedCentralEviction.add(15); + expectedCentralEviction.add(2); + expectedCentralEviction.add(5); + expectedCentralEviction.add(2); + expectedCentralEviction.add(5); + expectedCentralEviction.add(1); + expectedCentralEviction.add(5); + + LinkedList<CloneableTriggerPolicy<Integer>> triggers = new LinkedList<CloneableTriggerPolicy<Integer>>(); + // Trigger on every 2nd element, but the first time after the 3rd + triggers.add(new CountTriggerPolicy<Integer>(2, -1)); + + LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>(); + // On every 2nd element, remove the oldest 2 elements, but the first + // time after the 3rd element + evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1)); + + LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>(); + + ReduceFunction<Integer> reduceFunction = new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + }; + + KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) { + return value; + } + }; + + GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>( + reduceFunction, keySelector, triggers, evictions, centralTriggers, null); + + List<Integer> result = MockContext.createAndExecute(invokable, inputs); + + List<Integer> actual = new LinkedList<Integer>(); + for (Integer current : result) { + actual.add(current); + } + + assertEquals(new HashSet<Integer>(expectedDistributedEviction), + new HashSet<Integer>(actual)); + assertEquals(expectedDistributedEviction.size(), actual.size()); + + // Run test with central eviction + triggers.clear(); + centralTriggers.add(new CountTriggerPolicy<Integer>(2, -1)); + LinkedList<EvictionPolicy<Integer>> centralEvictions = new LinkedList<EvictionPolicy<Integer>>(); + centralEvictions.add(new CountEvictionPolicy<Integer>(2, 2, -1)); + + invokable = new GroupedWindowInvokable<Integer, Integer>(reduceFunction, keySelector, + triggers, null, centralTriggers, centralEvictions); + + result = MockContext.createAndExecute(invokable, inputs); + actual = new LinkedList<Integer>(); + for (Integer current : result) { + actual.add(current); + } + + assertEquals(new HashSet<Integer>(expectedCentralEviction), new HashSet<Integer>(actual)); + assertEquals(expectedCentralEviction.size(), actual.size()); + } + + /** + * Test for non active distributed triggers with separated key field + */ + @Test + public void testGroupedWindowInvokableDistributedTriggerComplex() { + List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>(); + inputs2.add(new Tuple2<Integer, String>(1, "a")); + inputs2.add(new Tuple2<Integer, String>(0, "b")); + inputs2.add(new Tuple2<Integer, String>(2, "a")); + inputs2.add(new Tuple2<Integer, String>(-1, "a")); + inputs2.add(new Tuple2<Integer, String>(-2, "a")); + inputs2.add(new Tuple2<Integer, String>(10, "a")); + inputs2.add(new Tuple2<Integer, String>(2, "b")); + inputs2.add(new Tuple2<Integer, String>(1, "a")); + + List<Tuple2<Integer, String>> expected2 = new ArrayList<Tuple2<Integer, String>>(); + expected2.add(new Tuple2<Integer, String>(-1, "a")); + expected2.add(new Tuple2<Integer, String>(-2, "a")); + expected2.add(new Tuple2<Integer, String>(0, "b")); + + LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>(); + // Trigger on every 2nd element, but the first time after the 3rd + triggers.add(new CountTriggerPolicy<Tuple2<Integer, String>>(3)); + + LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>(); + // On every 2nd element, remove the oldest 2 elements, but the first + // time after the 3rd element + evictions.add(new TumblingEvictionPolicy<Tuple2<Integer, String>>()); + + LinkedList<TriggerPolicy<Tuple2<Integer, String>>> centralTriggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>(); + + GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable2 = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>( + new ReduceFunction<Tuple2<Integer, String>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, + Tuple2<Integer, String> value2) throws Exception { + if (value1.f0 <= value2.f0) { + return value1; + } else { + return value2; + } + } + }, keySelector, triggers, evictions, centralTriggers, null); + + List<Tuple2<Integer, String>> result = MockContext.createAndExecute(invokable2, inputs2); + + List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>(); + for (Tuple2<Integer, String> current : result) { + actual2.add(current); + } + + assertEquals(new HashSet<Tuple2<Integer, String>>(expected2), + new HashSet<Tuple2<Integer, String>>(actual2)); + assertEquals(expected2.size(), actual2.size()); + } + + /** + * Test for active centralized trigger + */ + @Test + public void testGroupedWindowInvokableCentralActiveTrigger() { + + List<Tuple2<Integer, String>> inputs = new ArrayList<Tuple2<Integer, String>>(); + inputs.add(new Tuple2<Integer, String>(1, "a")); + inputs.add(new Tuple2<Integer, String>(1, "b")); + inputs.add(new Tuple2<Integer, String>(1, "c")); + inputs.add(new Tuple2<Integer, String>(2, "a")); + inputs.add(new Tuple2<Integer, String>(2, "b")); + inputs.add(new Tuple2<Integer, String>(2, "c")); + inputs.add(new Tuple2<Integer, String>(2, "b")); + inputs.add(new Tuple2<Integer, String>(2, "a")); + inputs.add(new Tuple2<Integer, String>(2, "c")); + inputs.add(new Tuple2<Integer, String>(3, "c")); + inputs.add(new Tuple2<Integer, String>(3, "a")); + inputs.add(new Tuple2<Integer, String>(3, "b")); + inputs.add(new Tuple2<Integer, String>(4, "a")); + inputs.add(new Tuple2<Integer, String>(4, "b")); + inputs.add(new Tuple2<Integer, String>(4, "c")); + inputs.add(new Tuple2<Integer, String>(5, "c")); + inputs.add(new Tuple2<Integer, String>(5, "a")); + inputs.add(new Tuple2<Integer, String>(5, "b")); + inputs.add(new Tuple2<Integer, String>(10, "b")); + inputs.add(new Tuple2<Integer, String>(10, "a")); + inputs.add(new Tuple2<Integer, String>(10, "c")); + inputs.add(new Tuple2<Integer, String>(11, "a")); + inputs.add(new Tuple2<Integer, String>(11, "a")); + inputs.add(new Tuple2<Integer, String>(11, "c")); + inputs.add(new Tuple2<Integer, String>(11, "c")); + inputs.add(new Tuple2<Integer, String>(11, "b")); + inputs.add(new Tuple2<Integer, String>(11, "b")); + + // Expected result: + // For each group (a,b and c): + // 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11 + // 12-12-5-10-32 + + List<Tuple2<Integer, String>> expected = new ArrayList<Tuple2<Integer, String>>(); + expected.add(new Tuple2<Integer, String>(12, "a")); + expected.add(new Tuple2<Integer, String>(12, "b")); + expected.add(new Tuple2<Integer, String>(12, "c")); + expected.add(new Tuple2<Integer, String>(12, "a")); + expected.add(new Tuple2<Integer, String>(12, "b")); + expected.add(new Tuple2<Integer, String>(12, "c")); + expected.add(new Tuple2<Integer, String>(5, "a")); + expected.add(new Tuple2<Integer, String>(5, "b")); + expected.add(new Tuple2<Integer, String>(5, "c")); + expected.add(new Tuple2<Integer, String>(10, "a")); + expected.add(new Tuple2<Integer, String>(10, "b")); + expected.add(new Tuple2<Integer, String>(10, "c")); + expected.add(new Tuple2<Integer, String>(32, "a")); + expected.add(new Tuple2<Integer, String>(32, "b")); + expected.add(new Tuple2<Integer, String>(32, "c")); + + Timestamp<Tuple2<Integer, String>> myTimeStamp = new Timestamp<Tuple2<Integer, String>>() { + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(Tuple2<Integer, String> value) { + return value.f0; + } + }; + + TimestampWrapper<Tuple2<Integer, String>> myTimeStampWrapper = new TimestampWrapper<Tuple2<Integer, String>>( + myTimeStamp, 1); + + ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new ReduceFunction<Tuple2<Integer, String>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, + Tuple2<Integer, String> value2) throws Exception { + return new Tuple2<Integer, String>(value1.f0 + value2.f0, value1.f1); + } + }; + + LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>(); + // Trigger every 2 time units but delay the first trigger by 2 (First + // trigger after 4, then every 2) + triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L)); + + LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>(); + // Always delete all elements older then 4 + evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper)); + + LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>(); + + GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>( + myReduceFunction, keySelector, distributedTriggers, evictions, triggers, null); + + ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>(); + for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) { + result.add(t); + } + + assertEquals(new HashSet<Tuple2<Integer, String>>(expected), + new HashSet<Tuple2<Integer, String>>(result)); + assertEquals(expected.size(), result.size()); + + // repeat the test with central eviction. The result should be the same. + triggers.clear(); + triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L)); + evictions.clear(); + LinkedList<EvictionPolicy<Tuple2<Integer, String>>> centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>(); + centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper)); + + invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>( + myReduceFunction, keySelector, distributedTriggers, evictions, triggers, + centralEvictions); + + result = new ArrayList<Tuple2<Integer, String>>(); + for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) { + result.add(t); + } + + assertEquals(new HashSet<Tuple2<Integer, String>>(expected), + new HashSet<Tuple2<Integer, String>>(result)); + assertEquals(expected.size(), result.size()); + } + + /** + * Test for multiple centralized trigger + */ + @Test + public void testGroupedWindowInvokableMultipleCentralTrigger() { + LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>(); + triggers.add(new CountTriggerPolicy<Integer>(8)); + triggers.add(new CountTriggerPolicy<Integer>(5)); + + LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>(); + // The active wrapper causes eviction even on (fake) elements which + // triggered, but does not belong to the group. + evictions.add(new ActiveCloneableEvictionPolicyWrapper<Integer>( + new TumblingEvictionPolicy<Integer>())); + + LinkedList<CloneableTriggerPolicy<Integer>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Integer>>(); + + List<Integer> inputs = new ArrayList<Integer>(); + inputs.add(1); + inputs.add(2); + inputs.add(2); + inputs.add(2); + inputs.add(1); + // 1st Trigger: 2;6 + inputs.add(2); + inputs.add(1); + inputs.add(2); + // 2nd Trigger: 1;4 + inputs.add(2); + inputs.add(1); + // Final: 1,2 + + List<Integer> expected = new ArrayList<Integer>(); + expected.add(2); + expected.add(6); + expected.add(4); + expected.add(1); + expected.add(2); + expected.add(1); + + ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + }; + + GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>( + myReduceFunction, new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) { + return value; + } + }, distributedTriggers, evictions, triggers, null); + + ArrayList<Integer> result = new ArrayList<Integer>(); + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { + result.add(t); + } + + assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result)); + assertEquals(expected.size(), result.size()); + } + + /** + * Test for combination of centralized trigger and distributed trigger at + * the same time + */ + @Test + public void testGroupedWindowInvokableCentralAndDistrTrigger() { + LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>(); + triggers.add(new CountTriggerPolicy<Integer>(8)); + triggers.add(new CountTriggerPolicy<Integer>(5)); + + LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>(); + // The active wrapper causes eviction even on (fake) elements which + // triggered, but does not belong to the group. + evictions.add(new ActiveCloneableEvictionPolicyWrapper<Integer>( + new TumblingEvictionPolicy<Integer>())); + + LinkedList<CloneableTriggerPolicy<Integer>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Integer>>(); + distributedTriggers.add(new CountTriggerPolicy<Integer>(2)); + + List<Integer> inputs = new ArrayList<Integer>(); + inputs.add(1); + inputs.add(2); + inputs.add(2); + // local on 2 => 4 + inputs.add(2); + inputs.add(1); + // local on 1 => 2 + // and 1st Central: 2;2 + // SUMS up to 2;2 + inputs.add(2); + // local on 2 => 2 + inputs.add(1); + inputs.add(2); + // 2nd Central: 1;2 + inputs.add(2); + inputs.add(1); + // Final: 1,2 + + List<Integer> expected = new ArrayList<Integer>(); + expected.add(4); + expected.add(2); + expected.add(2); + expected.add(2); + expected.add(1); + expected.add(2); + expected.add(1); + expected.add(2); + + ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + }; + + GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>( + myReduceFunction, new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) { + return value; + } + }, distributedTriggers, evictions, triggers, null); + + ArrayList<Integer> result = new ArrayList<Integer>(); + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { + result.add(t); + } + + assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result)); + assertEquals(expected.size(), result.size()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java new file mode 100644 index 0000000..5390ec9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.util.MockContext; +import org.junit.Test; + +public class MapTest { + + private static class Map implements MapFunction<Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map(Integer value) throws Exception { + return "+" + (value + 1); + } + } + + @Test + public void mapInvokableTest() { + MapInvokable<Integer, String> invokable = new MapInvokable<Integer, String>(new Map()); + + List<String> expectedList = Arrays.asList("+2", "+3", "+4"); + List<String> actualList = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3)); + + assertEquals(expectedList, actualList); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java new file mode 100644 index 0000000..11c44cd --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.datastream.StreamProjection; +import org.apache.flink.streaming.util.MockContext; +import org.junit.Test; + +public class ProjectTest implements Serializable { + private static final long serialVersionUID = 1L; + + @Test + public void test() { + + TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor + .getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", + 4)); + + int[] fields = new int[] { 4, 4, 3 }; + Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class }; + + @SuppressWarnings("unchecked") + ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>( + fields, + (TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection + .extractFieldTypes(fields, classes, inType)); + + List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>(); + input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4)); + input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2)); + input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2)); + input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7)); + + List<Tuple3<Integer, Integer, String>> expected = new ArrayList<Tuple3<Integer, Integer, String>>(); + expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b")); + expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c")); + expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c")); + expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a")); + + assertEquals(expected, MockContext.createAndExecute(invokable, input)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java new file mode 100755 index 0000000..ae866e6 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.util.MockContext; +import org.junit.Test; + +public class StreamReduceTest { + + private static class MyReducer implements ReduceFunction<Integer>{ + + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1+value2; + } + + } + + @Test + public void test() { + StreamReduceInvokable<Integer> invokable1 = new StreamReduceInvokable<Integer>( + new MyReducer()); + + List<Integer> expected = Arrays.asList(1,2,4,7,10); + List<Integer> actual = MockContext.createAndExecute(invokable1, + Arrays.asList(1, 1, 2, 3, 3)); + + assertEquals(expected, actual); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java new file mode 100644 index 0000000..83b4596 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; +import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; +import org.apache.flink.streaming.util.MockContext; +import org.junit.Test; + +public class WindowInvokableTest { + + /** + * Test case equal to {@link WindowReduceInvokableTest} + */ + @Test + public void testWindowInvokableWithTimePolicy() { + + List<Integer> inputs = new ArrayList<Integer>(); + inputs.add(1); + inputs.add(2); + inputs.add(2); + inputs.add(3); + inputs.add(4); + inputs.add(5); + inputs.add(10); + inputs.add(11); + inputs.add(11); + // 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11 + // 12-12-5-10-32 + + List<Integer> expected = new ArrayList<Integer>(); + expected.add(12); + expected.add(12); + expected.add(5); + expected.add(10); + expected.add(32); + + Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(Integer value) { + return value; + } + }; + + ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + }; + + LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>(); + // Trigger every 2 time units but delay the first trigger by 2 (First + // trigger after 4, then every 2) + triggers.add(new TimeTriggerPolicy<Integer>(2L, new TimestampWrapper<Integer>(myTimeStamp, + 1), 2L)); + + LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>(); + // Always delete all elements older then 4 + evictions.add(new TimeEvictionPolicy<Integer>(4L, new TimestampWrapper<Integer>( + myTimeStamp, 1))); + + WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>( + myReduceFunction, triggers, evictions); + + ArrayList<Integer> result = new ArrayList<Integer>(); + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { + result.add(t); + } + + assertEquals(expected, result); + } + + /** + * Test case equal to {@link BatchReduceTest} + */ + @Test + public void testWindowInvokableWithCountPolicy() { + + List<Integer> inputs = new ArrayList<Integer>(); + for (Integer i = 1; i <= 10; i++) { + inputs.add(i); + } + + ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + }; + + /* + * The following setup reassembles the batch size 3 and the slide size 2 + * of the BatchReduceInvokable. + */ + LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>(); + // Trigger on every 2nd element, but the first time after the 3rd + triggers.add(new CountTriggerPolicy<Integer>(2, -1)); + + LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>(); + // On every 2nd element, remove the oldest 2 elements, but the first + // time after the 3rd element + evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1)); + + WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>( + myReduceFunction, triggers, evictions); + + List<Integer> expected = new ArrayList<Integer>(); + expected.add(6); + expected.add(12); + expected.add(18); + expected.add(24); + expected.add(19); + List<Integer> result = new ArrayList<Integer>(); + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { + result.add(t); + } + assertEquals(expected, result); + + /* + * Begin test part 2 + */ + + List<Integer> inputs2 = new ArrayList<Integer>(); + inputs2.add(1); + inputs2.add(2); + inputs2.add(-5); // changed this value to make sure it is excluded from + // the result + inputs2.add(-3); + inputs2.add(-4); + + myReduceFunction = new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + if (value1 <= value2) { + return value1; + } else { + return value2; + } + }; + }; + + /* + * The following setup reassembles the batch size 2 and the slide size 3 + * of the BatchReduceInvokable. + */ + triggers = new LinkedList<TriggerPolicy<Integer>>(); + // Trigger after every 3rd element, but the first time after the 2nd + triggers.add(new CountTriggerPolicy<Integer>(3, 1)); + + evictions = new LinkedList<EvictionPolicy<Integer>>(); + // On every 3rd element, remove the oldest 3 elements, but the first + // time after on the 5th element + evictions.add(new CountEvictionPolicy<Integer>(3, 3, -1)); + + WindowInvokable<Integer, Integer> invokable2 = new WindowReduceInvokable<Integer>( + myReduceFunction, triggers, evictions); + + List<Integer> expected2 = new ArrayList<Integer>(); + expected2.add(1); + expected2.add(-4); + + result = new ArrayList<Integer>(); + for (Integer t : MockContext.createAndExecute(invokable2, inputs2)) { + result.add(t); + } + + assertEquals(expected2, result); + + } + + @Test + public void testWindowInvokableWithMultiplePolicies() { + LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>(); + triggers.add(new CountTriggerPolicy<Integer>(2)); + triggers.add(new CountTriggerPolicy<Integer>(3)); + + LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>(); + evictions.add(new CountEvictionPolicy<Integer>(2, 2)); + evictions.add(new CountEvictionPolicy<Integer>(3, 3)); + + List<Integer> inputs = new ArrayList<Integer>(); + for (Integer i = 1; i <= 10; i++) { + inputs.add(i); + } + /** + * <code> + * VAL: 1,2,3,4,5,6,7,8,9,10 + * TR1: | | | | | + * TR2: | | | + * EV1: 2 2 2 2 2 + * EV2: 3 3 3 + * </code> + */ + + List<Integer> expected = new ArrayList<Integer>(); + expected.add(3); + expected.add(3); + expected.add(4); + expected.add(11); + expected.add(15); + expected.add(9); + expected.add(10); + + ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + return value1 + value2; + } + }; + + WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>( + myReduceFunction, triggers, evictions); + + ArrayList<Integer> result = new ArrayList<Integer>(); + for (Integer t : MockContext.createAndExecute(invokable, inputs)) { + result.add(t); + } + + assertEquals(expected, result); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java new file mode 100644 index 0000000..b044d88 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/UIDTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.streamrecord; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; +import org.junit.Test; + +public class UIDTest { + + //TODO fix with matching DataOutputStream and DataOutputView + @Test + public void test() throws IOException { + DataOutputSerializer out = new DataOutputSerializer(64); + + UID id = new UID(3); + id.write(out); + + ByteBuffer buff = out.wrapAsByteBuffer(); + + + DataInputDeserializer in = new DataInputDeserializer(buff); + + UID id2 = new UID(); + id2.read(in); + + assertEquals(id.getChannelId(), id2.getChannelId()); + assertArrayEquals(id.getGeneratedId(), id2.getGeneratedId()); + assertArrayEquals(id.getId(), id2.getId()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java new file mode 100755 index 0000000..a40048e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.streamvertex; + +import java.util.ArrayList; + +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.runtime.operators.DataSourceTask; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; + +public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> { + + public ArrayList<Integer> emittedRecords; + + public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) { + super(inputBase.getEnvironment().getWriter(0)); + } + + public boolean initList() { + emittedRecords = new ArrayList<Integer>(); + return true; + } + + @Override + public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) { + emittedRecords.add(record.getInstance().getObject().f0); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java new file mode 100644 index 0000000..b103d84 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.streamvertex; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.co.CoMapFunction; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class StreamVertexTest { + + private static Map<Integer, Integer> data = new HashMap<Integer, Integer>(); + + public static class MySource implements SourceFunction<Tuple1<Integer>> { + private static final long serialVersionUID = 1L; + + private Tuple1<Integer> tuple = new Tuple1<Integer>(0); + + @Override + public void invoke(Collector<Tuple1<Integer>> collector) throws Exception { + for (int i = 0; i < 10; i++) { + tuple.f0 = i; + collector.collect(tuple); + } + } + } + + public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception { + Integer i = value.f0; + return new Tuple2<Integer, Integer>(i, i + 1); + } + } + + public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Tuple2<Integer, Integer> tuple) { + Integer k = tuple.getField(0); + Integer v = tuple.getField(1); + data.put(k, v); + } + } + + @SuppressWarnings("unused") + private static final int PARALLELISM = 1; + private static final int SOURCE_PARALELISM = 1; + private static final long MEMORYSIZE = 32; + + @Test + public void wrongJobGraph() { + LocalStreamEnvironment env = StreamExecutionEnvironment + .createLocalEnvironment(SOURCE_PARALELISM); + + + try { + env.fromCollection(null); + fail(); + } catch (NullPointerException e) { + } + + try { + env.fromElements(); + fail(); + } catch (IllegalArgumentException e) { + } + + try { + env.generateSequence(-10, -30); + fail(); + } catch (IllegalArgumentException e) { + } + + try { + env.setBufferTimeout(-10); + fail(); + } catch (IllegalArgumentException e) { + } + + try { + env.generateSequence(1, 10).project(2); + fail(); + } catch (RuntimeException e) { + } + } + + private static class CoMap implements CoMapFunction<String, Long, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map1(String value) { + return value; + } + + @Override + public String map2(Long value) { + return value.toString(); + } + } + + static HashSet<String> resultSet; + + private static class SetSink implements SinkFunction<String> { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(String value) { + resultSet.add(value); + } + } + + @Test + public void coTest() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE); + + DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc"); + DataStream<Long> generatedSequence = env.generateSequence(0, 3); + + fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink()); + + resultSet = new HashSet<String>(); + env.execute(); + + HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1", + "2", "3")); + assertEquals(expectedSet, resultSet); + } + + @Test + public void runStream() throws Exception { + StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE); + + env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink()); + + env.execute(); + assertEquals(10, data.keySet().size()); + + for (Integer k : data.keySet()) { + assertEquals((Integer) (k + 1), data.get(k)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java new file mode 100644 index 0000000..e12b254 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.deltafunction; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class CosineDistanceTest { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testCosineDistance() { + + //Reference calculated using wolfram alpha + double[][][] testdata={ + {{0,0,0},{0,0,0}}, + {{0,0,0},{1,2,3}}, + {{1,2,3},{0,0,0}}, + {{1,2,3},{4,5,6}}, + {{1,2,3},{-4,-5,-6}}, + {{1,2,-3},{-4,5,-6}}, + {{1,2,3,4},{5,6,7,8}}, + {{1,2},{3,4}}, + {{1},{2}}, + }; + double[] referenceSolutions={ + 0, + 0, + 0, + 0.025368, + 1.974631, + 0.269026, + 0.031136, + 0.016130, + 0 + }; + + for (int i = 0; i < testdata.length; i++) { + assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and " + + arrayToString(testdata[i][0]), referenceSolutions[i], + new CosineDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001); + } + } + + private String arrayToString(double[] in){ + if (in.length==0) return "{}"; + String result="{"; + for (double d:in){ + result+=d+","; + } + return result.substring(0, result.length()-1)+"}"; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java new file mode 100644 index 0000000..8c62497 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.deltafunction; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class EuclideanDistanceTest { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testEuclideanDistance() { + + //Reference calculated using wolfram alpha + double[][][] testdata={ + {{0,0,0},{0,0,0}}, + {{0,0,0},{1,2,3}}, + {{1,2,3},{0,0,0}}, + {{1,2,3},{4,5,6}}, + {{1,2,3},{-4,-5,-6}}, + {{1,2,-3},{-4,5,-6}}, + {{1,2,3,4},{5,6,7,8}}, + {{1,2},{3,4}}, + {{1},{2}}, + }; + double[] referenceSolutions={ + 0, + 3.741657, + 3.741657, + 5.196152, + 12.4499, + 6.557439, + 8.0, + 2.828427, + 1 + }; + + for (int i = 0; i < testdata.length; i++) { + assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and " + + arrayToString(testdata[i][0]), referenceSolutions[i], + new EuclideanDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001); + } + + } + + private String arrayToString(double[] in){ + if (in.length==0) return "{}"; + String result="{"; + for (double d:in){ + result+=d+","; + } + return result.substring(0, result.length()-1)+"}"; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java new file mode 100644 index 0000000..17d3974 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTupleTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.extractor; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.streaming.api.windowing.extractor.ArrayFromTuple; +import org.junit.Before; +import org.junit.Test; + +public class ArrayFromTupleTest { + + private String[] testStrings; + + @Before + public void init() { + testStrings = new String[Tuple.MAX_ARITY]; + for (int i = 0; i < Tuple.MAX_ARITY; i++) { + testStrings[i] = Integer.toString(i); + } + } + + @Test + public void testConvertFromTupleToArray() throws InstantiationException, IllegalAccessException { + for (int i = 0; i < Tuple.MAX_ARITY; i++) { + Tuple currentTuple = (Tuple) CLASSES[i].newInstance(); + String[] currentArray = new String[i + 1]; + for (int j = 0; j <= i; j++) { + currentTuple.setField(testStrings[j], j); + currentArray[j] = testStrings[j]; + } + arrayEqualityCheck(currentArray, new ArrayFromTuple().extract(currentTuple)); + } + } + + @Test + public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException { + Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance(); + for (int i = 0; i < Tuple.MAX_ARITY; i++) { + currentTuple.setField(testStrings[i], i); + } + + String[] expected = { testStrings[5], testStrings[3], testStrings[6], testStrings[7], + testStrings[0] }; + arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 0).extract(currentTuple)); + + String[] expected2 = { testStrings[0], testStrings[Tuple.MAX_ARITY - 1] }; + arrayEqualityCheck(expected2, + new ArrayFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple)); + + String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], testStrings[0] }; + arrayEqualityCheck(expected3, + new ArrayFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple)); + + String[] expected4 = { testStrings[13], testStrings[4], testStrings[5], testStrings[4], + testStrings[2], testStrings[8], testStrings[6], testStrings[2], testStrings[8], + testStrings[3], testStrings[5], testStrings[2], testStrings[16], testStrings[4], + testStrings[3], testStrings[2], testStrings[6], testStrings[4], testStrings[7], + testStrings[4], testStrings[2], testStrings[8], testStrings[7], testStrings[2] }; + arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16, + 4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple)); + } + + private void arrayEqualityCheck(Object[] array1, Object[] array2) { + assertEquals("The result arrays must have the same length", array1.length, array2.length); + for (int i = 0; i < array1.length; i++) { + assertEquals("Unequal fields at position " + i, array1[i], array2[i]); + } + } + + private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class, + Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, + Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, + Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, + Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, + Tuple24.class, Tuple25.class }; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java new file mode 100644 index 0000000..82a876a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtractTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.extractor; + +import static org.junit.Assert.*; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.windowing.extractor.ArrayFromTuple; +import org.apache.flink.streaming.api.windowing.extractor.ConcatinatedExtract; +import org.apache.flink.streaming.api.windowing.extractor.Extractor; +import org.apache.flink.streaming.api.windowing.extractor.FieldFromArray; +import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple; +import org.apache.flink.streaming.api.windowing.extractor.FieldsFromArray; +import org.junit.Before; +import org.junit.Test; + +public class ConcatinatedExtractTest { + + private String[] testStringArray1 = { "1", "2", "3" }; + private int[] testIntArray1 = { 1, 2, 3 }; + private String[] testStringArray2 = { "4", "5", "6" }; + private int[] testIntArray2 = { 4, 5, 6 }; + private String[] testStringArray3 = { "7", "8", "9" }; + private int[] testIntArray3 = { 7, 8, 9 }; + private Tuple2<String[], int[]>[] testTuple2Array; + private Tuple2<String[], int[]> testTuple2; + private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> testData; + + @SuppressWarnings("unchecked") + @Before + public void setupData() { + testTuple2Array = new Tuple2[2]; + testTuple2Array[0] = new Tuple2<String[], int[]>(testStringArray1, testIntArray2); + testTuple2Array[1] = new Tuple2<String[], int[]>(testStringArray2, testIntArray1); + + testTuple2 = new Tuple2<String[], int[]>(testStringArray3, testIntArray3); + + testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]>(testTuple2, + testTuple2Array); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void test1() { + Extractor ext = new ConcatinatedExtract(new FieldFromTuple(0), new FieldFromTuple(1)) + .add(new FieldsFromArray(Integer.class, 2, 1, 0)); + int[] expected = { testIntArray3[2], testIntArray3[1], testIntArray3[0] }; + assertEquals(new Integer(expected[0]), ((Integer[]) ext.extract(testData))[0]); + assertEquals(new Integer(expected[1]), ((Integer[]) ext.extract(testData))[1]); + assertEquals(new Integer(expected[2]), ((Integer[]) ext.extract(testData))[2]); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void test2() { + Extractor ext = new ConcatinatedExtract(new FieldFromTuple(1), // Tuple2<String[],int[]>[] + new FieldsFromArray(Tuple2.class, 1)) // Tuple2<String[],int[]>[] + .add(new FieldFromArray(0)) // Tuple2<String[],int[]> + .add(new ArrayFromTuple(0)) // Object[] (Containing String[]) + .add(new FieldFromArray(0)) // String[] + .add(new FieldFromArray(1)); // String + + String expected2 = testStringArray2[1]; + assertEquals(expected2, ext.extract(testData)); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java new file mode 100644 index 0000000..2d4dbcf --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArrayTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.extractor; + +import static org.junit.Assert.*; + +import org.apache.flink.streaming.api.windowing.extractor.FieldFromArray; +import org.junit.Test; + +public class FieldFromArrayTest { + + String[] testStringArray = { "0", "1", "2", "3", "4" }; + Integer[] testIntegerArray = { 10, 11, 12, 13, 14 }; + int[] testIntArray = { 20, 21, 22, 23, 24 }; + + @Test + public void testStringArray() { + for (int i = 0; i < this.testStringArray.length; i++) { + assertEquals(this.testStringArray[i], + new FieldFromArray<String>(i).extract(testStringArray)); + } + } + + @Test + public void testIntegerArray() { + for (int i = 0; i < this.testIntegerArray.length; i++) { + assertEquals(this.testIntegerArray[i], + new FieldFromArray<String>(i).extract(testIntegerArray)); + } + } + + @Test + public void testIntArray() { + for (int i = 0; i < this.testIntArray.length; i++) { + assertEquals(new Integer(this.testIntArray[i]), + new FieldFromArray<Integer>(i).extract(testIntArray)); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java new file mode 100644 index 0000000..528611a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTupleTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.extractor; + +import static org.junit.Assert.*; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple; +import org.junit.Before; +import org.junit.Test; + +public class FieldFromTupleTest { + + private String[] testStrings; + + @Before + public void init() { + testStrings = new String[Tuple.MAX_ARITY]; + for (int i = 0; i < Tuple.MAX_ARITY; i++) { + testStrings[i] = Integer.toString(i); + } + } + + @Test + public void testSingleFieldExtraction() throws InstantiationException, IllegalAccessException { + // extract single fields + for (int i = 0; i < Tuple.MAX_ARITY; i++) { + Tuple current = (Tuple) CLASSES[i].newInstance(); + for (int j = 0; j < i; j++) { + current.setField(testStrings[j], j); + } + for (int j = 0; j < i; j++) { + assertEquals(testStrings[j], new FieldFromTuple<String>(j).extract(current)); + } + } + } + + private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class, + Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, + Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, + Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, + Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, + Tuple24.class, Tuple25.class }; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java new file mode 100644 index 0000000..3139aa5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArrayTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.extractor; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.streaming.api.windowing.extractor.FieldsFromArray; +import org.junit.Test; + +public class FieldsFromArrayTest { + + String[] testStringArray = { "0", "1", "2", "3", "4" }; + Integer[] testIntegerArray = { 10, 11, 12, 13, 14 }; + int[] testIntArray = { 20, 21, 22, 23, 24 }; + + @Test + public void testStringArray() { + // check single field extraction + for (int i = 0; i < testStringArray.length; i++) { + String[] tmp = { testStringArray[i] }; + arrayEqualityCheck(tmp, + new FieldsFromArray<String>(String.class, i).extract(testStringArray)); + } + + // check reverse order + String[] reverseOrder = new String[testStringArray.length]; + for (int i = 0; i < testStringArray.length; i++) { + reverseOrder[i] = testStringArray[testStringArray.length - i - 1]; + } + arrayEqualityCheck(reverseOrder, + new FieldsFromArray<String>(String.class, 4, 3, 2, 1, 0).extract(testStringArray)); + + // check picking fields and reorder + String[] crazyOrder = { testStringArray[4], testStringArray[1], testStringArray[2] }; + arrayEqualityCheck(crazyOrder, + new FieldsFromArray<String>(String.class, 4, 1, 2).extract(testStringArray)); + } + + @Test + public void testIntegerArray() { + // check single field extraction + for (int i = 0; i < testIntegerArray.length; i++) { + Integer[] tmp = { testIntegerArray[i] }; + arrayEqualityCheck(tmp, + new FieldsFromArray<Integer>(Integer.class, i).extract(testIntegerArray)); + } + + // check reverse order + Integer[] reverseOrder = new Integer[testIntegerArray.length]; + for (int i = 0; i < testIntegerArray.length; i++) { + reverseOrder[i] = testIntegerArray[testIntegerArray.length - i - 1]; + } + arrayEqualityCheck(reverseOrder, + new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0) + .extract(testIntegerArray)); + + // check picking fields and reorder + Integer[] crazyOrder = { testIntegerArray[4], testIntegerArray[1], testIntegerArray[2] }; + arrayEqualityCheck(crazyOrder, + new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntegerArray)); + + } + + @Test + public void testIntArray() { + for (int i = 0; i < testIntArray.length; i++) { + Integer[] tmp = { testIntArray[i] }; + arrayEqualityCheck(tmp, + new FieldsFromArray<Integer>(Integer.class, i).extract(testIntArray)); + } + + // check reverse order + Integer[] reverseOrder = new Integer[testIntArray.length]; + for (int i = 0; i < testIntArray.length; i++) { + reverseOrder[i] = testIntArray[testIntArray.length - i - 1]; + } + arrayEqualityCheck(reverseOrder, + new FieldsFromArray<Integer>(Integer.class, 4, 3, 2, 1, 0).extract(testIntArray)); + + // check picking fields and reorder + Integer[] crazyOrder = { testIntArray[4], testIntArray[1], testIntArray[2] }; + arrayEqualityCheck(crazyOrder, + new FieldsFromArray<Integer>(Integer.class, 4, 1, 2).extract(testIntArray)); + + } + + private void arrayEqualityCheck(Object[] array1, Object[] array2) { + assertEquals("The result arrays must have the same length", array1.length, array2.length); + for (int i = 0; i < array1.length; i++) { + assertEquals("Unequal fields at position " + i, array1[i], array2[i]); + } + } +}