http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java new file mode 100644 index 0000000..9f2c467 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.junit.Test; + + +public class RequestedGlobalPropertiesFilteringTest { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test(expected = NullPointerException.class) + public void testNullProps() { + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0,1,2)); + + rgProps.filterBySemanticProperties(null, 0); + } + + @Test + public void testEraseAll1() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0,1,2)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testEraseAll2() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"3;4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0, 1, 2)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testHashPartitioningPreserved1() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setHashPartitioned(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getPartitionedFields()); + assertEquals(3, filtered.getPartitionedFields().size()); + assertTrue(filtered.getPartitionedFields().contains(0)); + assertTrue(filtered.getPartitionedFields().contains(3)); + assertTrue(filtered.getPartitionedFields().contains(4)); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + assertNull(filtered.getOrdering()); + } + + @Test + public void testHashPartitioningPreserved2() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setHashPartitioned(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getPartitionedFields()); + assertEquals(3, filtered.getPartitionedFields().size()); + assertTrue(filtered.getPartitionedFields().contains(1)); + assertTrue(filtered.getPartitionedFields().contains(2)); + assertTrue(filtered.getPartitionedFields().contains(7)); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + assertNull(filtered.getOrdering()); + } + + @Test + public void testHashPartitioningErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setHashPartitioned(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testAnyPartitioningPreserved1() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning()); + assertNotNull(filtered.getPartitionedFields()); + assertEquals(3, filtered.getPartitionedFields().size()); + assertTrue(filtered.getPartitionedFields().contains(0)); + assertTrue(filtered.getPartitionedFields().contains(3)); + assertTrue(filtered.getPartitionedFields().contains(4)); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + assertNull(filtered.getOrdering()); + } + + @Test + public void testAnyPartitioningPreserved2() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning()); + assertNotNull(filtered.getPartitionedFields()); + assertEquals(3, filtered.getPartitionedFields().size()); + assertTrue(filtered.getPartitionedFields().contains(1)); + assertTrue(filtered.getPartitionedFields().contains(2)); + assertTrue(filtered.getPartitionedFields().contains(7)); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + assertNull(filtered.getOrdering()); + } + + @Test + public void testAnyPartitioningErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setAnyPartitioning(new FieldSet(0, 3, 4)); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testRangePartitioningPreserved1() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;3;6"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(3, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRangePartitioned(o); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(3, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNull(filtered.getPartitionedFields()); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + } + + @Test + public void testRangePartitioningPreserved2() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(3, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRangePartitioned(o); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNull(filtered.getPartitionedFields()); + assertNull(filtered.getDataDistribution()); + assertNull(filtered.getCustomPartitioner()); + } + + @Test + public void testRangePartitioningPreserved3() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo); + + DataDistribution dd = new MockDistribution(); + Ordering o = new Ordering(); + o.appendOrdering(3, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRangePartitioned(o, dd); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNotNull(filtered); + assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning()); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNotNull(filtered.getDataDistribution()); + assertEquals(dd, filtered.getDataDistribution()); + assertNull(filtered.getPartitionedFields()); + assertNull(filtered.getCustomPartitioner()); + } + + @Test + public void testRangePartitioningErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(3, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRangePartitioned(o); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testCustomPartitioningErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setCustomPartitioned(new FieldSet(0, 1, 2), new MockPartitioner()); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testRandomDistributionErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setRandomPartitioning(); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testReplicationErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setFullyReplicated(); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testRebalancingErased() { + + SingleInputSemanticProperties sProp = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties rgProps = new RequestedGlobalProperties(); + rgProps.setForceRebalancing(); + + RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0); + + assertNull(filtered); + } + + @Test + public void testDualHashPartitioningPreserved() { + + DualInputSemanticProperties dprops = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"0;2;4"}, new String[]{"1->3;4->6;3->7"}, + null, null, null, null, tupleInfo, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops1 = new RequestedGlobalProperties(); + RequestedGlobalProperties gprops2 = new RequestedGlobalProperties(); + gprops1.setHashPartitioned(new FieldSet(2, 0, 4)); + gprops2.setHashPartitioned(new FieldSet(3, 6, 7)); + RequestedGlobalProperties filtered1 = gprops1.filterBySemanticProperties(dprops, 0); + RequestedGlobalProperties filtered2 = gprops2.filterBySemanticProperties(dprops, 1); + + assertNotNull(filtered1); + assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered1.getPartitioning()); + assertNotNull(filtered1.getPartitionedFields()); + assertEquals(3, filtered1.getPartitionedFields().size()); + assertTrue(filtered1.getPartitionedFields().contains(0)); + assertTrue(filtered1.getPartitionedFields().contains(2)); + assertTrue(filtered1.getPartitionedFields().contains(4)); + assertNull(filtered1.getOrdering()); + assertNull(filtered1.getCustomPartitioner()); + assertNull(filtered1.getDataDistribution()); + + assertNotNull(filtered2); + assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered2.getPartitioning()); + assertNotNull(filtered2.getPartitionedFields()); + assertEquals(3, filtered2.getPartitionedFields().size()); + assertTrue(filtered2.getPartitionedFields().contains(1)); + assertTrue(filtered2.getPartitionedFields().contains(3)); + assertTrue(filtered2.getPartitionedFields().contains(4)); + assertNull(filtered2.getOrdering()); + assertNull(filtered2.getCustomPartitioner()); + assertNull(filtered2.getDataDistribution()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidInputIndex() { + SingleInputSemanticProperties sprops = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo); + + RequestedGlobalProperties gprops = new RequestedGlobalProperties(); + gprops.setHashPartitioned(new FieldSet(0,1)); + + gprops.filterBySemanticProperties(sprops, 1); + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java new file mode 100644 index 0000000..0cede0e --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.junit.Test; + +public class RequestedLocalPropertiesFilteringTest { + + private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo = + new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Test(expected = NullPointerException.class) + public void testNullProps() { + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + rlProp.filterBySemanticProperties(null, 0); + } + + @Test + public void testAllErased() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNull(filtered); + } + + @Test + public void testGroupingPreserved1() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNotNull(filtered); + assertNotNull(filtered.getGroupedFields()); + assertEquals(3, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(0)); + assertTrue(filtered.getGroupedFields().contains(2)); + assertTrue(filtered.getGroupedFields().contains(3)); + assertNull(filtered.getOrdering()); + } + + @Test + public void testGroupingPreserved2() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"3->0;5->2;1->3"}, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNotNull(filtered); + assertNotNull(filtered.getGroupedFields()); + assertEquals(3, filtered.getGroupedFields().size()); + assertTrue(filtered.getGroupedFields().contains(3)); + assertTrue(filtered.getGroupedFields().contains(5)); + assertTrue(filtered.getGroupedFields().contains(1)); + assertNull(filtered.getOrdering()); + } + + @Test + public void testGroupingErased() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(0, 2, 3)); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNull(filtered); + } + + @Test + public void testOrderPreserved1() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1;4;6"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(4, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setOrdering(o); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNotNull(filtered); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(4, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNull(filtered.getGroupedFields()); + } + + @Test + public void testOrderPreserved2() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"5->1;0->4;2->6"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(4, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setOrdering(o); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNotNull(filtered); + assertNotNull(filtered.getOrdering()); + assertEquals(3, filtered.getOrdering().getNumberOfFields()); + assertEquals(0, filtered.getOrdering().getFieldNumber(0).intValue()); + assertEquals(5, filtered.getOrdering().getFieldNumber(1).intValue()); + assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue()); + assertEquals(LongValue.class, filtered.getOrdering().getType(0)); + assertEquals(IntValue.class, filtered.getOrdering().getType(1)); + assertEquals(ByteValue.class, filtered.getOrdering().getType(2)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0)); + assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1)); + assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2)); + assertNull(filtered.getGroupedFields()); + } + + @Test + public void testOrderErased() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo); + + Ordering o = new Ordering(); + o.appendOrdering(4, LongValue.class, Order.DESCENDING); + o.appendOrdering(1, IntValue.class, Order.ASCENDING); + o.appendOrdering(6, ByteValue.class, Order.DESCENDING); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setOrdering(o); + + RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0); + + assertNull(filtered); + } + + @Test + public void testDualGroupingPreserved() { + + DualInputSemanticProperties dprops = new DualInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"1->0;3;2->4"}, new String[]{"0->7;1"}, + null, null, null, null, tupleInfo, tupleInfo, tupleInfo); + + RequestedLocalProperties lprops1 = new RequestedLocalProperties(); + lprops1.setGroupedFields(new FieldSet(0,3,4)); + + RequestedLocalProperties lprops2 = new RequestedLocalProperties(); + lprops2.setGroupedFields(new FieldSet(7, 1)); + + RequestedLocalProperties filtered1 = lprops1.filterBySemanticProperties(dprops, 0); + RequestedLocalProperties filtered2 = lprops2.filterBySemanticProperties(dprops, 1); + + assertNotNull(filtered1); + assertNotNull(filtered1.getGroupedFields()); + assertEquals(3, filtered1.getGroupedFields().size()); + assertTrue(filtered1.getGroupedFields().contains(1)); + assertTrue(filtered1.getGroupedFields().contains(2)); + assertTrue(filtered1.getGroupedFields().contains(3)); + assertNull(filtered1.getOrdering()); + + assertNotNull(filtered2); + assertNotNull(filtered2.getGroupedFields()); + assertEquals(2, filtered2.getGroupedFields().size()); + assertTrue(filtered2.getGroupedFields().contains(0)); + assertTrue(filtered2.getGroupedFields().contains(1)); + assertNull(filtered2.getOrdering()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testInvalidInputIndex() { + + SingleInputSemanticProperties sProps = new SingleInputSemanticProperties(); + SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo); + + RequestedLocalProperties rlProp = new RequestedLocalProperties(); + rlProp.setGroupedFields(new FieldSet(1, 4)); + + rlProp.filterBySemanticProperties(sProps, 1); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java new file mode 100644 index 0000000..b359e6b --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import static org.junit.Assert.fail; + +import org.junit.Test; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.CompilerTestBase; + + +@SuppressWarnings({"serial", "unchecked"}) +public class DeltaIterationDependenciesTest extends CompilerTestBase { + + @Test + public void testExceptionWhenNewWorksetNotDependentOnWorkset() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIteration = input.iterateDelta(input, 10,0); + + DataSet<Tuple2<Long, Long>> delta = deltaIteration.getSolutionSet().join(deltaIteration.getWorkset()) + .where(0).equalTo(0) + .projectFirst(1).projectSecond(1); + + DataSet<Tuple2<Long, Long>> nextWorkset = deltaIteration.getSolutionSet().join(input) + .where(0).equalTo(0) + .projectFirst(1).projectSecond(1); + + + DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset); + + result.print(); + + Plan p = env.createProgramPlan(); + try { + compileNoStats(p); + fail("Should not be able to compile, since the next workset does not depend on the workset"); + } + catch (CompilerException e) { + // good + } + catch (Exception e) { + fail("wrong exception type"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java new file mode 100644 index 0000000..de02836 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; + +@SuppressWarnings("serial") +public class DistinctAndGroupingOptimizerTest extends CompilerTestBase { + + @Test + public void testDistinctPreservesPartitioningOfDistinctFields() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(4); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L)) + .map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4); + + data.distinct(0) + .groupBy(0) + .sum(1) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode distinctReducer = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + + // reducer can be forward, reuses partitioning from distinct + assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy()); + + // distinct reducer is partitioned + assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testDistinctDestroysPartitioningOfNonDistinctFields() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(4); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L)) + .map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4); + + data.distinct(1) + .groupBy(0) + .sum(1) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); + SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); + + // reducer must repartition, because it works on a different field + assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy()); + + assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); + + // distinct reducer is partitioned + assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java new file mode 100644 index 0000000..a683968 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.GroupReduceOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.Collector; +import org.junit.Test; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plan.SourcePlanNode; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class GroupReduceCompilationTest extends CompilerTestBase implements java.io.Serializable { + + @Test + public void testAllGroupReduceNoCombiner() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source"); + + data.reduceGroup(new RichGroupReduceFunction<Double, Double>() { + public void reduce(Iterable<Double> values, Collector<Double> out) {} + }).name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + + // the all-reduce has no combiner, when the DOP of the input is one + + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // check wiring + assertEquals(sourceNode, reduceNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check that reduce has the right strategy + assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy()); + + // check DOP + assertEquals(1, sourceNode.getParallelism()); + assertEquals(1, reduceNode.getParallelism()); + assertEquals(1, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testAllReduceWithCombiner() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Long> data = env.generateSequence(1, 8000000).name("source"); + + GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() { + public void reduce(Iterable<Long> values, Collector<Long> out) {} + }).name("reducer"); + + reduced.setCombinable(true); + reduced.print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, combineNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy()); + + // check DOP + assertEquals(8, sourceNode.getParallelism()); + assertEquals(8, combineNode.getParallelism()); + assertEquals(1, reduceNode.getParallelism()); + assertEquals(1, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + + @Test + public void testGroupedReduceWithFieldPositionKeyNonCombinable() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + data + .groupBy(1) + .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + }).name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // check wiring + assertEquals(sourceNode, reduceNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(1), reduceNode.getKeys(0)); + assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testGroupedReduceWithFieldPositionKeyCombinable() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data + .groupBy(1) + .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + }).name("reducer"); + + reduced.setCombinable(true); + reduced.print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, combineNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(1), reduceNode.getKeys(0)); + assertEquals(new FieldList(1), combineNode.getKeys(0)); + assertEquals(new FieldList(1), combineNode.getKeys(1)); + assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + data + .groupBy(new KeySelector<Tuple2<String,Double>, String>() { + public String getKey(Tuple2<String, Double> value) { return value.f0; } + }) + .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + }).name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the key extractors and projectors + SingleInputPlanNode keyExtractor = (SingleInputPlanNode) reduceNode.getInput().getSource(); + SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, keyExtractor.getInput().getSource()); + assertEquals(keyProjector, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(0), reduceNode.getKeys(0)); + assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, keyExtractor.getParallelism()); + + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, keyProjector.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testGroupedReduceWithSelectorFunctionKeyCombinable() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data + .groupBy(new KeySelector<Tuple2<String,Double>, String>() { + public String getKey(Tuple2<String, Double> value) { return value.f0; } + }) + .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + }).name("reducer"); + + reduced.setCombinable(true); + reduced.print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // get the key extractors and projectors + SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource(); + SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, keyExtractor.getInput().getSource()); + assertEquals(keyProjector, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(0), reduceNode.getKeys(0)); + assertEquals(new FieldList(0), combineNode.getKeys(0)); + assertEquals(new FieldList(0), combineNode.getKeys(1)); + assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, keyExtractor.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, keyProjector.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java new file mode 100644 index 0000000..37a8e81 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.BulkIterationPlanNode; +import org.apache.flink.optimizer.plan.NAryUnionPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.junit.Test; + +@SuppressWarnings("serial") +public class IterationCompilerTest extends CompilerTestBase { + + @Test + public void testIdentityIteration() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(43); + + IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100); + iteration.closeWith(iteration).print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testEmptyWorksetIteration() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(43); + + DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20) + .map(new MapFunction<Long, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Long value){ return null; } + }); + + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0); + iter.closeWith(iter.getWorkset(), iter.getWorkset()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testIterationWithUnionRoot() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(43); + + IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100); + + iteration.closeWith( + iteration.map(new IdentityMapper<Long>()).union(iteration.map(new IdentityMapper<Long>()))) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + BulkIterationPlanNode iterNode = (BulkIterationPlanNode) sink.getInput().getSource(); + + // make sure that the root is part of the dynamic path + + // the "NoOp" that comes after the union. + SingleInputPlanNode noop = (SingleInputPlanNode) iterNode.getRootOfStepFunction(); + NAryUnionPlanNode union = (NAryUnionPlanNode) noop.getInput().getSource(); + + assertTrue(noop.isOnDynamicPath()); + assertTrue(noop.getCostWeight() >= 1); + + assertTrue(union.isOnDynamicPath()); + assertTrue(union.getCostWeight() >= 1); + + // see that the jobgraph generator can translate this + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testWorksetIterationWithUnionRoot() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(43); + + DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20) + .map(new MapFunction<Long, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Long value){ return null; } + }); + + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0); + iter.closeWith( + iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()) + .union( + iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>())) + , iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()) + .union( + iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>())) + ) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) sink.getInput().getSource(); + + // make sure that the root is part of the dynamic path + + // the "NoOp"a that come after the union. + SingleInputPlanNode nextWorksetNoop = (SingleInputPlanNode) iterNode.getNextWorkSetPlanNode(); + SingleInputPlanNode solutionDeltaNoop = (SingleInputPlanNode) iterNode.getSolutionSetDeltaPlanNode(); + + NAryUnionPlanNode nextWorksetUnion = (NAryUnionPlanNode) nextWorksetNoop.getInput().getSource(); + NAryUnionPlanNode solutionDeltaUnion = (NAryUnionPlanNode) solutionDeltaNoop.getInput().getSource(); + + assertTrue(nextWorksetNoop.isOnDynamicPath()); + assertTrue(nextWorksetNoop.getCostWeight() >= 1); + + assertTrue(solutionDeltaNoop.isOnDynamicPath()); + assertTrue(solutionDeltaNoop.getCostWeight() >= 1); + + assertTrue(nextWorksetUnion.isOnDynamicPath()); + assertTrue(nextWorksetUnion.getCostWeight() >= 1); + + assertTrue(solutionDeltaUnion.isOnDynamicPath()); + assertTrue(solutionDeltaUnion.getCostWeight() >= 1); + + new JobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java new file mode 100644 index 0000000..0a62132 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Visitor; +import org.junit.Test; + +@SuppressWarnings("serial") +public class JoinTranslationTest extends CompilerTestBase { + + @Test + public void testBroadcastHashFirstTest() { + try { + DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_FIRST); + assertEquals(ShipStrategyType.BROADCAST, node.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, node.getInput2().getShipStrategy()); + assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node.getDriverStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } + + @Test + public void testBroadcastHashSecondTest() { + try { + DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_SECOND); + assertEquals(ShipStrategyType.FORWARD, node.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.BROADCAST, node.getInput2().getShipStrategy()); + assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node.getDriverStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } + + @Test + public void testPartitionHashFirstTest() { + try { + DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_FIRST); + assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy()); + assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node.getDriverStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } + + @Test + public void testPartitionHashSecondTest() { + try { + DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_SECOND); + assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy()); + assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node.getDriverStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } + + @Test + public void testPartitionSortMergeTest() { + try { + DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_SORT_MERGE); + assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy()); + assertEquals(DriverStrategy.MERGE, node.getDriverStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } + + @Test + public void testOptimizerChoosesTest() { + try { + DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.OPTIMIZER_CHOOSES); + assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy()); + assertTrue(DriverStrategy.HYBRIDHASH_BUILD_FIRST == node.getDriverStrategy() || + DriverStrategy.HYBRIDHASH_BUILD_SECOND == node.getDriverStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } + + + private DualInputPlanNode createPlanAndGetJoinNode(JoinHint hint) { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Long> i1 = env.generateSequence(1, 1000); + DataSet<Long> i2 = env.generateSequence(1, 1000); + + i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).print(); + + Plan plan = env.createProgramPlan(); + + // set statistics to the sources + plan.accept(new Visitor<Operator<?>>() { + @Override + public boolean preVisit(Operator<?> visitable) { + if (visitable instanceof GenericDataSourceBase) { + GenericDataSourceBase<?, ?> source = (GenericDataSourceBase<?, ?>) visitable; + setSourceStatistics(source, 10000000, 1000); + } + + return true; + } + + @Override + public void postVisit(Operator<?> visitable) {} + }); + + OptimizedPlan op = compileWithStats(plan); + + return (DualInputPlanNode) ((SinkPlanNode) op.getDataSinks().iterator().next()).getInput().getSource(); + } + + + + private static final class IdentityKeySelector<T> implements KeySelector<T, T> { + + @Override + public T getKey(T value) { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java new file mode 100644 index 0000000..cd63b72 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.junit.Test; + +@SuppressWarnings("serial") +public class OpenIterationTest extends CompilerTestBase { + + @Test + public void testSinkInOpenBulkIteration() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Long> input = env.generateSequence(1, 10); + + IterativeDataSet<Long> iteration = input.iterate(10); + + DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>()); + + mapped.print(); + + try { + env.createProgramPlan(); + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSinkInClosedBulkIteration() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Long> input = env.generateSequence(1, 10); + + IterativeDataSet<Long> iteration = input.iterate(10); + + DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>()); + + iteration.closeWith(mapped).print(); + + mapped.print(); + + Plan p = env.createProgramPlan(); + + try { + compileNoStats(p); + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSinkOnSolutionSetDeltaIteration() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L)); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0); + + DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>()); + + mapped.print(); + + try { + env.createProgramPlan(); + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSinkOnWorksetDeltaIteration() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L)); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0); + + DataSet<Tuple2<Long, Long>> mapped = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>()); + + mapped.print(); + + try { + env.createProgramPlan(); + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testOperationOnSolutionSet() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L)); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0); + + DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>()); + + DataSet<Tuple2<Long, Long>> joined = iteration.getWorkset().join(mapped) + .where(0).equalTo(0).projectFirst(1).projectSecond(0); + + iteration.closeWith(joined, joined) + .print(); + + Plan p = env.createProgramPlan(); + try { + compileNoStats(p); + fail("should throw an exception"); + } + catch (InvalidProgramException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java new file mode 100644 index 0000000..8bb9a76 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import static org.junit.Assert.*; + +import java.util.Collections; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.junit.Test; + +@SuppressWarnings("serial") +public class PartitionOperatorTest extends CompilerTestBase { + + @Test + public void testPartitionOperatorPreservesFields() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<Long, Long>(0L, 0L))); + + data.partitionCustom(new Partitioner<Long>() { + public int partition(Long key, int numPartitions) { return key.intValue(); } + }, 1) + .groupBy(1) + .reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>()) + .print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + SinkPlanNode sink = op.getDataSinks().iterator().next(); + SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); + SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource(); + + assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java new file mode 100644 index 0000000..0724a9f --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Test; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.plan.SourcePlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable { + + @Test + public void testAllReduceNoCombiner() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source"); + + data.reduce(new RichReduceFunction<Double>() { + + @Override + public Double reduce(Double value1, Double value2){ + return value1 + value2; + } + }).name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + + // the all-reduce has no combiner, when the DOP of the input is one + + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // check wiring + assertEquals(sourceNode, reduceNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check DOP + assertEquals(1, sourceNode.getParallelism()); + assertEquals(1, reduceNode.getParallelism()); + assertEquals(1, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testAllReduceWithCombiner() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Long> data = env.generateSequence(1, 8000000).name("source"); + + data.reduce(new RichReduceFunction<Long>() { + + @Override + public Long reduce(Long value1, Long value2){ + return value1 + value2; + } + }).name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, combineNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.ALL_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy()); + + // check DOP + assertEquals(8, sourceNode.getParallelism()); + assertEquals(8, combineNode.getParallelism()); + assertEquals(1, reduceNode.getParallelism()); + assertEquals(1, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testGroupedReduceWithFieldPositionKey() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + data + .groupBy(1) + .reduce(new RichReduceFunction<Tuple2<String,Double>>() { + @Override + public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){ + return null; + } + }).name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, combineNode.getInput().getSource()); + assertEquals(reduceNode, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(1), reduceNode.getKeys(0)); + assertEquals(new FieldList(1), combineNode.getKeys(0)); + assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } + + @Test + public void testGroupedReduceWithSelectorFunctionKey() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + data + .groupBy(new KeySelector<Tuple2<String,Double>, String>() { + public String getKey(Tuple2<String, Double> value) { return value.f0; } + }) + .reduce(new RichReduceFunction<Tuple2<String,Double>>() { + @Override + public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){ + return null; + } + }).name("reducer") + .print().name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // get the combiner + SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource(); + + // get the key extractors and projectors + SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource(); + SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource(); + + // check wiring + assertEquals(sourceNode, keyExtractor.getInput().getSource()); + assertEquals(keyProjector, sinkNode.getInput().getSource()); + + // check that both reduce and combiner have the same strategy + assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy()); + assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(0), reduceNode.getKeys(0)); + assertEquals(new FieldList(0), combineNode.getKeys(0)); + assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); + + // check DOP + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, keyExtractor.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, keyProjector.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); + } + } +}