http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..9f2c467
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+
+public class RequestedGlobalPropertiesFilteringTest {
+
+       private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>> tupleInfo =
+                       new TupleTypeInfo<Tuple8<Integer, Integer, Integer, 
Integer, Integer, Integer, Integer, Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       @Test(expected = NullPointerException.class)
+       public void testNullProps() {
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+               rgProps.filterBySemanticProperties(null, 0);
+       }
+
+       @Test
+       public void testEraseAll1() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testEraseAll2() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"3;4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0, 1, 2));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testHashPartitioningPreserved1() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getPartitionedFields());
+               assertEquals(3, filtered.getPartitionedFields().size());
+               assertTrue(filtered.getPartitionedFields().contains(0));
+               assertTrue(filtered.getPartitionedFields().contains(3));
+               assertTrue(filtered.getPartitionedFields().contains(4));
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testHashPartitioningPreserved2() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getPartitionedFields());
+               assertEquals(3, filtered.getPartitionedFields().size());
+               assertTrue(filtered.getPartitionedFields().contains(1));
+               assertTrue(filtered.getPartitionedFields().contains(2));
+               assertTrue(filtered.getPartitionedFields().contains(7));
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testHashPartitioningErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testAnyPartitioningPreserved1() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.ANY_PARTITIONING, 
filtered.getPartitioning());
+               assertNotNull(filtered.getPartitionedFields());
+               assertEquals(3, filtered.getPartitionedFields().size());
+               assertTrue(filtered.getPartitionedFields().contains(0));
+               assertTrue(filtered.getPartitionedFields().contains(3));
+               assertTrue(filtered.getPartitionedFields().contains(4));
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testAnyPartitioningPreserved2() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.ANY_PARTITIONING, 
filtered.getPartitioning());
+               assertNotNull(filtered.getPartitionedFields());
+               assertEquals(3, filtered.getPartitionedFields().size());
+               assertTrue(filtered.getPartitionedFields().contains(1));
+               assertTrue(filtered.getPartitionedFields().contains(2));
+               assertTrue(filtered.getPartitionedFields().contains(7));
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testAnyPartitioningErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testRangePartitioningPreserved1() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"1;3;6"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRangePartitioned(o);
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.RANGE_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(3, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(1, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(6, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNull(filtered.getPartitionedFields());
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+       }
+
+       @Test
+       public void testRangePartitioningPreserved2() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRangePartitioned(o);
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.RANGE_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(7, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(1, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(2, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNull(filtered.getPartitionedFields());
+               assertNull(filtered.getDataDistribution());
+               assertNull(filtered.getCustomPartitioner());
+       }
+
+       @Test
+       public void testRangePartitioningPreserved3() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+               DataDistribution dd = new MockDistribution();
+               Ordering o = new Ordering();
+               o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRangePartitioned(o, dd);
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNotNull(filtered);
+               assertEquals(PartitioningProperty.RANGE_PARTITIONED, 
filtered.getPartitioning());
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(7, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(1, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(2, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNotNull(filtered.getDataDistribution());
+               assertEquals(dd, filtered.getDataDistribution());
+               assertNull(filtered.getPartitionedFields());
+               assertNull(filtered.getCustomPartitioner());
+       }
+
+       @Test
+       public void testRangePartitioningErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRangePartitioned(o);
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testCustomPartitioningErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setCustomPartitioned(new FieldSet(0, 1, 2), new 
MockPartitioner());
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testRandomDistributionErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setRandomPartitioning();
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testReplicationErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setFullyReplicated();
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testRebalancingErased() {
+
+               SingleInputSemanticProperties sProp = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new 
String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties rgProps = new 
RequestedGlobalProperties();
+               rgProps.setForceRebalancing();
+
+               RequestedGlobalProperties filtered = 
rgProps.filterBySemanticProperties(sProp, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testDualHashPartitioningPreserved() {
+
+               DualInputSemanticProperties dprops = new 
DualInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsDualFromString(dprops, new 
String[]{"0;2;4"}, new String[]{"1->3;4->6;3->7"},
+                               null, null, null, null, tupleInfo, tupleInfo, 
tupleInfo);
+
+               RequestedGlobalProperties gprops1 = new 
RequestedGlobalProperties();
+               RequestedGlobalProperties gprops2 = new 
RequestedGlobalProperties();
+               gprops1.setHashPartitioned(new FieldSet(2, 0, 4));
+               gprops2.setHashPartitioned(new FieldSet(3, 6, 7));
+               RequestedGlobalProperties filtered1 = 
gprops1.filterBySemanticProperties(dprops, 0);
+               RequestedGlobalProperties filtered2 = 
gprops2.filterBySemanticProperties(dprops, 1);
+
+               assertNotNull(filtered1);
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
filtered1.getPartitioning());
+               assertNotNull(filtered1.getPartitionedFields());
+               assertEquals(3, filtered1.getPartitionedFields().size());
+               assertTrue(filtered1.getPartitionedFields().contains(0));
+               assertTrue(filtered1.getPartitionedFields().contains(2));
+               assertTrue(filtered1.getPartitionedFields().contains(4));
+               assertNull(filtered1.getOrdering());
+               assertNull(filtered1.getCustomPartitioner());
+               assertNull(filtered1.getDataDistribution());
+
+               assertNotNull(filtered2);
+               assertEquals(PartitioningProperty.HASH_PARTITIONED, 
filtered2.getPartitioning());
+               assertNotNull(filtered2.getPartitionedFields());
+               assertEquals(3, filtered2.getPartitionedFields().size());
+               assertTrue(filtered2.getPartitionedFields().contains(1));
+               assertTrue(filtered2.getPartitionedFields().contains(3));
+               assertTrue(filtered2.getPartitionedFields().contains(4));
+               assertNull(filtered2.getOrdering());
+               assertNull(filtered2.getCustomPartitioner());
+               assertNull(filtered2.getDataDistribution());
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidInputIndex() {
+               SingleInputSemanticProperties sprops = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new 
String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedGlobalProperties gprops = new 
RequestedGlobalProperties();
+               gprops.setHashPartitioned(new FieldSet(0,1));
+
+               gprops.filterBySemanticProperties(sprops, 1);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..0cede0e
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+public class RequestedLocalPropertiesFilteringTest {
+
+       private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, 
Integer, Integer, Integer, Integer>> tupleInfo =
+                       new TupleTypeInfo<Tuple8<Integer, Integer, Integer, 
Integer, Integer, Integer, Integer, Integer>>(
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO,
+                                       BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO
+                       );
+
+       @Test(expected = NullPointerException.class)
+       public void testNullProps() {
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               rlProp.filterBySemanticProperties(null, 0);
+       }
+
+       @Test
+       public void testAllErased() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testGroupingPreserved1() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNotNull(filtered);
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(3, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(0));
+               assertTrue(filtered.getGroupedFields().contains(2));
+               assertTrue(filtered.getGroupedFields().contains(3));
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testGroupingPreserved2() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"3->0;5->2;1->3"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNotNull(filtered);
+               assertNotNull(filtered.getGroupedFields());
+               assertEquals(3, filtered.getGroupedFields().size());
+               assertTrue(filtered.getGroupedFields().contains(3));
+               assertTrue(filtered.getGroupedFields().contains(5));
+               assertTrue(filtered.getGroupedFields().contains(1));
+               assertNull(filtered.getOrdering());
+       }
+
+       @Test
+       public void testGroupingErased() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testOrderPreserved1() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"1;4;6"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setOrdering(o);
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNotNull(filtered);
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(4, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(1, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(6, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNull(filtered.getGroupedFields());
+       }
+
+       @Test
+       public void testOrderPreserved2() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"5->1;0->4;2->6"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setOrdering(o);
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNotNull(filtered);
+               assertNotNull(filtered.getOrdering());
+               assertEquals(3, filtered.getOrdering().getNumberOfFields());
+               assertEquals(0, 
filtered.getOrdering().getFieldNumber(0).intValue());
+               assertEquals(5, 
filtered.getOrdering().getFieldNumber(1).intValue());
+               assertEquals(2, 
filtered.getOrdering().getFieldNumber(2).intValue());
+               assertEquals(LongValue.class, 
filtered.getOrdering().getType(0));
+               assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+               assertEquals(ByteValue.class, 
filtered.getOrdering().getType(2));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(0));
+               assertEquals(Order.ASCENDING, 
filtered.getOrdering().getOrder(1));
+               assertEquals(Order.DESCENDING, 
filtered.getOrdering().getOrder(2));
+               assertNull(filtered.getGroupedFields());
+       }
+
+       @Test
+       public void testOrderErased() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+               Ordering o = new Ordering();
+               o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+               o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+               o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setOrdering(o);
+
+               RequestedLocalProperties filtered = 
rlProp.filterBySemanticProperties(sProps, 0);
+
+               assertNull(filtered);
+       }
+
+       @Test
+       public void testDualGroupingPreserved() {
+
+               DualInputSemanticProperties dprops = new 
DualInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsDualFromString(dprops, new 
String[]{"1->0;3;2->4"}, new String[]{"0->7;1"},
+                               null, null, null, null, tupleInfo, tupleInfo, 
tupleInfo);
+
+               RequestedLocalProperties lprops1 = new 
RequestedLocalProperties();
+               lprops1.setGroupedFields(new FieldSet(0,3,4));
+
+               RequestedLocalProperties lprops2 = new 
RequestedLocalProperties();
+               lprops2.setGroupedFields(new FieldSet(7, 1));
+
+               RequestedLocalProperties filtered1 = 
lprops1.filterBySemanticProperties(dprops, 0);
+               RequestedLocalProperties filtered2 = 
lprops2.filterBySemanticProperties(dprops, 1);
+
+               assertNotNull(filtered1);
+               assertNotNull(filtered1.getGroupedFields());
+               assertEquals(3, filtered1.getGroupedFields().size());
+               assertTrue(filtered1.getGroupedFields().contains(1));
+               assertTrue(filtered1.getGroupedFields().contains(2));
+               assertTrue(filtered1.getGroupedFields().contains(3));
+               assertNull(filtered1.getOrdering());
+
+               assertNotNull(filtered2);
+               assertNotNull(filtered2.getGroupedFields());
+               assertEquals(2, filtered2.getGroupedFields().size());
+               assertTrue(filtered2.getGroupedFields().contains(0));
+               assertTrue(filtered2.getGroupedFields().contains(1));
+               assertNull(filtered2.getOrdering());
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidInputIndex() {
+
+               SingleInputSemanticProperties sProps = new 
SingleInputSemanticProperties();
+               SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new 
String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+               RequestedLocalProperties rlProp = new 
RequestedLocalProperties();
+               rlProp.setGroupedFields(new FieldSet(1, 4));
+
+               rlProp.filterBySemanticProperties(sProps, 1);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
new file mode 100644
index 0000000..b359e6b
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.CompilerTestBase;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class DeltaIterationDependenciesTest extends CompilerTestBase {
+
+       @Test
+       public void testExceptionWhenNewWorksetNotDependentOnWorkset() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Tuple2<Long, Long>> input = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
deltaIteration = input.iterateDelta(input, 10,0);
+
+                       DataSet<Tuple2<Long, Long>> delta = 
deltaIteration.getSolutionSet().join(deltaIteration.getWorkset())
+                                                                               
                                .where(0).equalTo(0)
+                                                                               
                                .projectFirst(1).projectSecond(1);
+
+                       DataSet<Tuple2<Long, Long>> nextWorkset = 
deltaIteration.getSolutionSet().join(input)
+                                                                               
                                .where(0).equalTo(0)
+                                                                               
                                .projectFirst(1).projectSecond(1);
+                       
+
+                       DataSet<Tuple2<Long, Long>> result = 
deltaIteration.closeWith(delta, nextWorkset);
+
+                       result.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       try {
+                               compileNoStats(p);
+                               fail("Should not be able to compile, since the 
next workset does not depend on the workset");
+                       }
+                       catch (CompilerException e) {
+                               // good
+                       }
+                       catch (Exception e) {
+                               fail("wrong exception type");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
new file mode 100644
index 0000000..de02836
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+@SuppressWarnings("serial")
+public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
+       
+       @Test
+       public void testDistinctPreservesPartitioningOfDistinctFields() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(4);
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple2<Long, Long>> data = env.fromElements(new 
Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
+                                       .map(new 
IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
+                       
+                       data.distinct(0)
+                               .groupBy(0)
+                               .sum(1)
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       SingleInputPlanNode distinctReducer = 
(SingleInputPlanNode) reducer.getInput().getSource();
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
+                       
+                       // reducer can be forward, reuses partitioning from 
distinct
+                       assertEquals(ShipStrategyType.FORWARD, 
reducer.getInput().getShipStrategy());
+                       
+                       // distinct reducer is partitioned
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
distinctReducer.getInput().getShipStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testDistinctDestroysPartitioningOfNonDistinctFields() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(4);
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple2<Long, Long>> data = env.fromElements(new 
Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
+                                       .map(new 
IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
+                       
+                       data.distinct(1)
+                               .groupBy(0)
+                               .sum(1)
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
+                       SingleInputPlanNode distinctReducer = 
(SingleInputPlanNode) combiner.getInput().getSource();
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
+                       
+                       // reducer must repartition, because it works on a 
different field
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
reducer.getInput().getShipStrategy());
+
+                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
+                       
+                       // distinct reducer is partitioned
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
distinctReducer.getInput().getShipStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
new file mode 100644
index 0000000..a683968
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class GroupReduceCompilationTest extends CompilerTestBase implements 
java.io.Serializable {
+
+       @Test
+       public void testAllGroupReduceNoCombiner() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 
0.5).name("source");
+                       
+                       data.reduceGroup(new RichGroupReduceFunction<Double, 
Double>() {
+                               public void reduce(Iterable<Double> values, 
Collector<Double> out) {}
+                       }).name("reducer")
+                       .print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       
+                       // the all-reduce has no combiner, when the DOP of the 
input is one
+                       
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
reduceNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+                       
+                       // check that reduce has the right strategy
+                       assertEquals(DriverStrategy.ALL_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       
+                       // check DOP
+                       assertEquals(1, sourceNode.getParallelism());
+                       assertEquals(1, reduceNode.getParallelism());
+                       assertEquals(1, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testAllReduceWithCombiner() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Long> data = env.generateSequence(1, 
8000000).name("source");
+                       
+                       GroupReduceOperator<Long, Long> reduced = 
data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
+                               public void reduce(Iterable<Long> values, 
Collector<Long> out) {}
+                       }).name("reducer");
+                       
+                       reduced.setCombinable(true);
+                       reduced.print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
combineNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+                       
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.ALL_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, 
combineNode.getDriverStrategy());
+                       
+                       // check DOP
+                       assertEquals(8, sourceNode.getParallelism());
+                       assertEquals(8, combineNode.getParallelism());
+                       assertEquals(1, reduceNode.getParallelism());
+                       assertEquals(1, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+       
+       
+       @Test
+       public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                               .name("source").setParallelism(6);
+                       
+                       data
+                               .groupBy(1)
+                               .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
+                       }).name("reducer")
+                       .print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
reduceNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+                       
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       
+                       // check the keys
+                       assertEquals(new FieldList(1), reduceNode.getKeys(0));
+                       assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
+                       
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testGroupedReduceWithFieldPositionKeyCombinable() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                               .name("source").setParallelism(6);
+                       
+                       GroupReduceOperator<Tuple2<String, Double>, 
Tuple2<String, Double>> reduced = data
+                                       .groupBy(1)
+                                       .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
+                       }).name("reducer");
+                       
+                       reduced.setCombinable(true);
+                       reduced.print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
combineNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+                       
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combineNode.getDriverStrategy());
+                       
+                       // check the keys
+                       assertEquals(new FieldList(1), reduceNode.getKeys(0));
+                       assertEquals(new FieldList(1), combineNode.getKeys(0));
+                       assertEquals(new FieldList(1), combineNode.getKeys(1));
+                       assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
+                       
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                               .name("source").setParallelism(6);
+                       
+                       data
+                               .groupBy(new KeySelector<Tuple2<String,Double>, 
String>() { 
+                                       public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
+                               })
+                               .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
+                       }).name("reducer")
+                       .print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // get the key extractors and projectors
+                       SingleInputPlanNode keyExtractor = 
(SingleInputPlanNode) reduceNode.getInput().getSource();
+                       SingleInputPlanNode keyProjector = 
(SingleInputPlanNode) sinkNode.getInput().getSource();
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
keyExtractor.getInput().getSource());
+                       assertEquals(keyProjector, 
sinkNode.getInput().getSource());
+                       
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       
+                       // check the keys
+                       assertEquals(new FieldList(0), reduceNode.getKeys(0));
+                       assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
+                       
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, keyExtractor.getParallelism());
+                       
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, keyProjector.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                               .name("source").setParallelism(6);
+                       
+                       GroupReduceOperator<Tuple2<String, Double>, 
Tuple2<String, Double>> reduced = data
+                               .groupBy(new KeySelector<Tuple2<String,Double>, 
String>() { 
+                                       public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
+                               })
+                               .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
+                       }).name("reducer");
+                       
+                       reduced.setCombinable(true);
+                       reduced.print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+                       
+                       // get the key extractors and projectors
+                       SingleInputPlanNode keyExtractor = 
(SingleInputPlanNode) combineNode.getInput().getSource();
+                       SingleInputPlanNode keyProjector = 
(SingleInputPlanNode) sinkNode.getInput().getSource();
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
keyExtractor.getInput().getSource());
+                       assertEquals(keyProjector, 
sinkNode.getInput().getSource());
+                       
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combineNode.getDriverStrategy());
+                       
+                       // check the keys
+                       assertEquals(new FieldList(0), reduceNode.getKeys(0));
+                       assertEquals(new FieldList(0), combineNode.getKeys(0));
+                       assertEquals(new FieldList(0), combineNode.getKeys(1));
+                       assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
+                       
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, keyExtractor.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, keyProjector.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
new file mode 100644
index 0000000..37a8e81
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class IterationCompilerTest extends CompilerTestBase {
+
+       @Test
+       public void testIdentityIteration() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(43);
+                       
+                       IterativeDataSet<Long> iteration = 
env.generateSequence(-4, 1000).iterate(100);
+                       iteration.closeWith(iteration).print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testEmptyWorksetIteration() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(43);
+                       
+                       DataSet<Tuple2<Long, Long>> input = 
env.generateSequence(1, 20)
+                                       .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
+                                               @Override
+                                               public Tuple2<Long, Long> 
map(Long value){ return null; }
+                                       });
+                                       
+                                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iter = input.iterateDelta(input, 100, 0);
+                       iter.closeWith(iter.getWorkset(), iter.getWorkset())
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testIterationWithUnionRoot() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(43);
+                       
+                       IterativeDataSet<Long> iteration = 
env.generateSequence(-4, 1000).iterate(100);
+                       
+                       iteration.closeWith(
+                                       iteration.map(new 
IdentityMapper<Long>()).union(iteration.map(new IdentityMapper<Long>())))
+                                       .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       BulkIterationPlanNode iterNode = 
(BulkIterationPlanNode) sink.getInput().getSource();
+                       
+                       // make sure that the root is part of the dynamic path
+                       
+                       // the "NoOp" that comes after the union.
+                       SingleInputPlanNode noop = (SingleInputPlanNode) 
iterNode.getRootOfStepFunction();
+                       NAryUnionPlanNode union = (NAryUnionPlanNode) 
noop.getInput().getSource();
+                       
+                       assertTrue(noop.isOnDynamicPath());
+                       assertTrue(noop.getCostWeight() >= 1);
+                       
+                       assertTrue(union.isOnDynamicPath());
+                       assertTrue(union.getCostWeight() >= 1);
+                       
+                       // see that the jobgraph generator can translate this
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testWorksetIterationWithUnionRoot() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(43);
+                       
+                       DataSet<Tuple2<Long, Long>> input = 
env.generateSequence(1, 20)
+                                       .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
+                                               @Override
+                                               public Tuple2<Long, Long> 
map(Long value){ return null; }
+                                       });
+                                       
+                                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iter = input.iterateDelta(input, 100, 0);
+                       iter.closeWith(
+                                       iter.getWorkset().map(new 
IdentityMapper<Tuple2<Long,Long>>())
+                               .union(
+                                       iter.getWorkset().map(new 
IdentityMapper<Tuple2<Long,Long>>()))
+                               , iter.getWorkset().map(new 
IdentityMapper<Tuple2<Long,Long>>())
+                               .union(
+                                               iter.getWorkset().map(new 
IdentityMapper<Tuple2<Long,Long>>()))
+                               )
+                       .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       WorksetIterationPlanNode iterNode = 
(WorksetIterationPlanNode) sink.getInput().getSource();
+                       
+                       // make sure that the root is part of the dynamic path
+                       
+                       // the "NoOp"a that come after the union.
+                       SingleInputPlanNode nextWorksetNoop = 
(SingleInputPlanNode) iterNode.getNextWorkSetPlanNode();
+                       SingleInputPlanNode solutionDeltaNoop = 
(SingleInputPlanNode) iterNode.getSolutionSetDeltaPlanNode();
+                       
+                       NAryUnionPlanNode nextWorksetUnion = 
(NAryUnionPlanNode) nextWorksetNoop.getInput().getSource();
+                       NAryUnionPlanNode solutionDeltaUnion = 
(NAryUnionPlanNode) solutionDeltaNoop.getInput().getSource();
+                       
+                       assertTrue(nextWorksetNoop.isOnDynamicPath());
+                       assertTrue(nextWorksetNoop.getCostWeight() >= 1);
+                       
+                       assertTrue(solutionDeltaNoop.isOnDynamicPath());
+                       assertTrue(solutionDeltaNoop.getCostWeight() >= 1);
+                       
+                       assertTrue(nextWorksetUnion.isOnDynamicPath());
+                       assertTrue(nextWorksetUnion.getCostWeight() >= 1);
+                       
+                       assertTrue(solutionDeltaUnion.isOnDynamicPath());
+                       assertTrue(solutionDeltaUnion.getCostWeight() >= 1);
+                       
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
new file mode 100644
index 0000000..0a62132
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class JoinTranslationTest extends CompilerTestBase {
+
+       @Test
+       public void testBroadcastHashFirstTest() {
+               try {
+                       DualInputPlanNode node = 
createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_FIRST);
+                       assertEquals(ShipStrategyType.BROADCAST, 
node.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.FORWARD, 
node.getInput2().getShipStrategy());
+                       assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, 
node.getDriverStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + ": " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testBroadcastHashSecondTest() {
+               try {
+                       DualInputPlanNode node = 
createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_SECOND);
+                       assertEquals(ShipStrategyType.FORWARD, 
node.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.BROADCAST, 
node.getInput2().getShipStrategy());
+                       assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, 
node.getDriverStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + ": " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPartitionHashFirstTest() {
+               try {
+                       DualInputPlanNode node = 
createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_FIRST);
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput2().getShipStrategy());
+                       assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, 
node.getDriverStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + ": " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPartitionHashSecondTest() {
+               try {
+                       DualInputPlanNode node = 
createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_SECOND);
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput2().getShipStrategy());
+                       assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, 
node.getDriverStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + ": " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPartitionSortMergeTest() {
+               try {
+                       DualInputPlanNode node = 
createPlanAndGetJoinNode(JoinHint.REPARTITION_SORT_MERGE);
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput2().getShipStrategy());
+                       assertEquals(DriverStrategy.MERGE, 
node.getDriverStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + ": " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testOptimizerChoosesTest() {
+               try {
+                       DualInputPlanNode node = 
createPlanAndGetJoinNode(JoinHint.OPTIMIZER_CHOOSES);
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput1().getShipStrategy());
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput2().getShipStrategy());
+                       assertTrue(DriverStrategy.HYBRIDHASH_BUILD_FIRST == 
node.getDriverStrategy() ||
+                                       DriverStrategy.HYBRIDHASH_BUILD_SECOND 
== node.getDriverStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + ": " + 
e.getMessage());
+               }
+       }
+       
+       
+       private DualInputPlanNode createPlanAndGetJoinNode(JoinHint hint) {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               DataSet<Long> i1 = env.generateSequence(1, 1000);
+               DataSet<Long> i2 = env.generateSequence(1, 1000);
+               
+               i1.join(i2, hint).where(new 
IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).print();
+               
+               Plan plan = env.createProgramPlan();
+               
+               // set statistics to the sources
+               plan.accept(new Visitor<Operator<?>>() {
+                       @Override
+                       public boolean preVisit(Operator<?> visitable) {
+                               if (visitable instanceof GenericDataSourceBase) 
{
+                                       GenericDataSourceBase<?, ?> source = 
(GenericDataSourceBase<?, ?>) visitable;
+                                       setSourceStatistics(source, 10000000, 
1000);
+                               }
+                               
+                               return true;
+                       }
+                       
+                       @Override
+                       public void postVisit(Operator<?> visitable) {}
+               });
+               
+               OptimizedPlan op = compileWithStats(plan);
+               
+               return (DualInputPlanNode) ((SinkPlanNode) 
op.getDataSinks().iterator().next()).getInput().getSource();
+       }
+       
+       
+       
+       private static final class IdentityKeySelector<T> implements 
KeySelector<T, T> {
+               
+               @Override
+               public T getKey(T value) {
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
new file mode 100644
index 0000000..cd63b72
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class OpenIterationTest extends CompilerTestBase {
+
+       @Test
+       public void testSinkInOpenBulkIteration() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Long> input = env.generateSequence(1, 10);
+                       
+                       IterativeDataSet<Long> iteration = input.iterate(10);
+                       
+                       DataSet<Long> mapped = iteration.map(new 
IdentityMapper<Long>());
+                       
+                       mapped.print();
+                       
+                       try {
+                               env.createProgramPlan();
+                               fail("should throw an exception");
+                       }
+                       catch (InvalidProgramException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSinkInClosedBulkIteration() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Long> input = env.generateSequence(1, 10);
+                       
+                       IterativeDataSet<Long> iteration = input.iterate(10);
+                       
+                       DataSet<Long> mapped = iteration.map(new 
IdentityMapper<Long>());
+                       
+                       iteration.closeWith(mapped).print();
+                       
+                       mapped.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       
+                       try {
+                               compileNoStats(p);
+                               fail("should throw an exception");
+                       }
+                       catch (InvalidProgramException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSinkOnSolutionSetDeltaIteration() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple2<Long, Long>> input = 
env.fromElements(new Tuple2<Long, Long>(0L,0L));
+                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration = input.iterateDelta(input, 10, 0);
+                       
+                       DataSet<Tuple2<Long, Long>> mapped = 
iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
+                       
+                       mapped.print();
+                       
+                       try {
+                               env.createProgramPlan();
+                               fail("should throw an exception");
+                       }
+                       catch (InvalidProgramException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSinkOnWorksetDeltaIteration() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple2<Long, Long>> input = 
env.fromElements(new Tuple2<Long, Long>(0L,0L));
+                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration = input.iterateDelta(input, 10, 0);
+                       
+                       DataSet<Tuple2<Long, Long>> mapped = 
iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+                       
+                       mapped.print();
+                       
+                       try {
+                               env.createProgramPlan();
+                               fail("should throw an exception");
+                       }
+                       catch (InvalidProgramException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testOperationOnSolutionSet() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple2<Long, Long>> input = 
env.fromElements(new Tuple2<Long, Long>(0L,0L));
+                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration = input.iterateDelta(input, 10, 0);
+                       
+                       DataSet<Tuple2<Long, Long>> mapped = 
iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
+                       
+                       DataSet<Tuple2<Long, Long>> joined = 
iteration.getWorkset().join(mapped)
+                                                                               
                .where(0).equalTo(0).projectFirst(1).projectSecond(0);
+                       
+                       iteration.closeWith(joined, joined)
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       try {
+                               compileNoStats(p);
+                               fail("should throw an exception");
+                       }
+                       catch (InvalidProgramException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
new file mode 100644
index 0000000..8bb9a76
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionOperatorTest extends CompilerTestBase {
+
+       @Test
+       public void testPartitionOperatorPreservesFields() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Tuple2<Long, Long>> data = 
env.fromCollection(Collections.singleton(new Tuple2<Long, Long>(0L, 0L)));
+                       
+                       data.partitionCustom(new Partitioner<Long>() {
+                                       public int partition(Long key, int 
numPartitions) { return key.intValue(); }
+                               }, 1)
+                               .groupBy(1)
+                               .reduceGroup(new 
IdentityGroupReducer<Tuple2<Long,Long>>())
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       SingleInputPlanNode partitioner = (SingleInputPlanNode) 
reducer.getInput().getSource();
+
+                       assertEquals(ShipStrategyType.FORWARD, 
reducer.getInput().getShipStrategy());
+                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput().getShipStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
new file mode 100644
index 0000000..0724a9f
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class ReduceCompilationTest extends CompilerTestBase implements 
java.io.Serializable {
+
+       @Test
+       public void testAllReduceNoCombiner() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 
0.5).name("source");
+                       
+                       data.reduce(new RichReduceFunction<Double>() {
+                               
+                               @Override
+                               public Double reduce(Double value1, Double 
value2){
+                                       return value1 + value2;
+                               }
+                       }).name("reducer")
+                       .print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       
+                       // the all-reduce has no combiner, when the DOP of the 
input is one
+                       
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
reduceNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+                       
+                       // check DOP
+                       assertEquals(1, sourceNode.getParallelism());
+                       assertEquals(1, reduceNode.getParallelism());
+                       assertEquals(1, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testAllReduceWithCombiner() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Long> data = env.generateSequence(1, 
8000000).name("source");
+                       
+                       data.reduce(new RichReduceFunction<Long>() {
+                               
+                               @Override
+                               public Long reduce(Long value1, Long value2){
+                                       return value1 + value2;
+                               }
+                       }).name("reducer")
+                       .print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
combineNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+                       
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.ALL_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.ALL_REDUCE, 
combineNode.getDriverStrategy());
+                       
+                       // check DOP
+                       assertEquals(8, sourceNode.getParallelism());
+                       assertEquals(8, combineNode.getParallelism());
+                       assertEquals(1, reduceNode.getParallelism());
+                       assertEquals(1, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testGroupedReduceWithFieldPositionKey() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                               .name("source").setParallelism(6);
+                       
+                       data
+                               .groupBy(1)
+                               .reduce(new 
RichReduceFunction<Tuple2<String,Double>>() {
+                               @Override
+                               public Tuple2<String, Double> 
reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
+                                       return null;
+                               }
+                       }).name("reducer")
+                       .print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
combineNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+                       
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, 
combineNode.getDriverStrategy());
+                       
+                       // check the keys
+                       assertEquals(new FieldList(1), reduceNode.getKeys(0));
+                       assertEquals(new FieldList(1), combineNode.getKeys(0));
+                       assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
+                       
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testGroupedReduceWithSelectorFunctionKey() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+                       
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                               .name("source").setParallelism(6);
+                       
+                       data
+                               .groupBy(new KeySelector<Tuple2<String,Double>, 
String>() { 
+                                       public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
+                               })
+                               .reduce(new 
RichReduceFunction<Tuple2<String,Double>>() {
+                               @Override
+                               public Tuple2<String, Double> 
reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
+                                       return null;
+                               }
+                       }).name("reducer")
+                       .print().name("sink");
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+                       
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+                       
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+                       
+                       // get the key extractors and projectors
+                       SingleInputPlanNode keyExtractor = 
(SingleInputPlanNode) combineNode.getInput().getSource();
+                       SingleInputPlanNode keyProjector = 
(SingleInputPlanNode) sinkNode.getInput().getSource();
+                       
+                       // check wiring
+                       assertEquals(sourceNode, 
keyExtractor.getInput().getSource());
+                       assertEquals(keyProjector, 
sinkNode.getInput().getSource());
+                       
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, 
combineNode.getDriverStrategy());
+                       
+                       // check the keys
+                       assertEquals(new FieldList(0), reduceNode.getKeys(0));
+                       assertEquals(new FieldList(0), combineNode.getKeys(0));
+                       assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
+                       
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, keyExtractor.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, keyProjector.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+}

Reply via email to