http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
new file mode 100644
index 0000000..55b8785
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.operators.NoOpDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import 
org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A node in the optimizer's program representation for a bulk iteration.
+ */
+public class BulkIterationNode extends SingleInputNode implements 
IterationNode {
+       
+       private BulkPartialSolutionNode partialSolution;
+       
+       private OptimizerNode terminationCriterion;
+       
+       private OptimizerNode nextPartialSolution;
+       
+       private DagConnection rootConnection;           // connection out of 
the next partial solution
+       
+       private DagConnection terminationCriterionRootConnection;       // 
connection out of the term. criterion
+       
+       private OptimizerNode singleRoot;
+       
+       private final int costWeight;
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Creates a new node for the bulk iteration.
+        * 
+        * @param iteration The bulk iteration the node represents.
+        */
+       public BulkIterationNode(BulkIterationBase<?> iteration) {
+               super(iteration);
+               
+               if (iteration.getMaximumNumberOfIterations() <= 0) {
+                       throw new CompilerException("BulkIteration must have a 
maximum number of iterations specified.");
+               }
+               
+               int numIters = iteration.getMaximumNumberOfIterations();
+               
+               this.costWeight = (numIters > 0 && numIters < 
OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) ?
+                       numIters : OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT; 
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public BulkIterationBase<?> getIterationContract() {
+               return (BulkIterationBase<?>) getOperator();
+       }
+       
+       /**
+        * Gets the partialSolution from this BulkIterationNode.
+        *
+        * @return The partialSolution.
+        */
+       public BulkPartialSolutionNode getPartialSolution() {
+               return partialSolution;
+       }
+       
+       /**
+        * Sets the partialSolution for this BulkIterationNode.
+        *
+        * @param partialSolution The partialSolution to set.
+        */
+       public void setPartialSolution(BulkPartialSolutionNode partialSolution) 
{
+               this.partialSolution = partialSolution;
+       }
+
+       
+       /**
+        * Gets the nextPartialSolution from this BulkIterationNode.
+        *
+        * @return The nextPartialSolution.
+        */
+       public OptimizerNode getNextPartialSolution() {
+               return nextPartialSolution;
+       }
+       
+       /**
+        * Sets the nextPartialSolution for this BulkIterationNode.
+        *
+        * @param nextPartialSolution The nextPartialSolution to set.
+        */
+       public void setNextPartialSolution(OptimizerNode nextPartialSolution, 
OptimizerNode terminationCriterion) {
+               
+               // check if the root of the step function has the same DOP as 
the iteration
+               // or if the step function has any operator at all
+               if (nextPartialSolution.getParallelism() != getParallelism() ||
+                       nextPartialSolution == partialSolution || 
nextPartialSolution instanceof BinaryUnionNode)
+               {
+                       // add a no-op to the root to express the 
re-partitioning
+                       NoOpNode noop = new NoOpNode();
+                       noop.setDegreeOfParallelism(getParallelism());
+
+                       DagConnection noOpConn = new 
DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
+                       noop.setIncomingConnection(noOpConn);
+                       nextPartialSolution.addOutgoingConnection(noOpConn);
+                       
+                       nextPartialSolution = noop;
+               }
+               
+               this.nextPartialSolution = nextPartialSolution;
+               this.terminationCriterion = terminationCriterion;
+               
+               if (terminationCriterion == null) {
+                       this.singleRoot = nextPartialSolution;
+                       this.rootConnection = new 
DagConnection(nextPartialSolution, ExecutionMode.PIPELINED);
+               }
+               else {
+                       // we have a termination criterion
+                       SingleRootJoiner singleRootJoiner = new 
SingleRootJoiner();
+                       this.rootConnection = new 
DagConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
+                       this.terminationCriterionRootConnection = new 
DagConnection(terminationCriterion, singleRootJoiner,
+                                                                               
                                                                
ExecutionMode.PIPELINED);
+
+                       singleRootJoiner.setInputs(this.rootConnection, 
this.terminationCriterionRootConnection);
+                       
+                       this.singleRoot = singleRootJoiner;
+                       
+                       // add connection to terminationCriterion for 
interesting properties visitor
+                       
terminationCriterion.addOutgoingConnection(terminationCriterionRootConnection);
+               
+               }
+               
+               nextPartialSolution.addOutgoingConnection(rootConnection);
+       }
+       
+       public int getCostWeight() {
+               return this.costWeight;
+       }
+       
+       public OptimizerNode getSingleRootOfStepFunction() {
+               return this.singleRoot;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public String getName() {
+               return "Bulk Iteration";
+       }
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new EmptySemanticProperties();
+       }
+       
+       protected void readStubAnnotations() {}
+       
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               this.estimatedOutputSize = 
getPredecessorNode().getEstimatedOutputSize();
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                             Properties and Optimization
+       // 
--------------------------------------------------------------------------------------------
+       
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return Collections.<OperatorDescriptorSingle>singletonList(new 
NoOpDescriptor());
+       }
+
+       @Override
+       public void computeInterestingPropertiesForInputs(CostEstimator 
estimator) {
+               final InterestingProperties intProps = 
getInterestingProperties().clone();
+               
+               if (this.terminationCriterion != null) {
+                       // first propagate through termination Criterion. since 
it has no successors, it has no
+                       // interesting properties
+                       
this.terminationCriterionRootConnection.setInterestingProperties(new 
InterestingProperties());
+                       this.terminationCriterion.accept(new 
InterestingPropertyVisitor(estimator));
+               }
+               
+               // we need to make 2 interesting property passes, because the 
root of the step function needs also
+               // the interesting properties as generated by the partial 
solution
+               
+               // give our own interesting properties (as generated by the 
iterations successors) to the step function and
+               // make the first pass
+               this.rootConnection.setInterestingProperties(intProps);
+               this.nextPartialSolution.accept(new 
InterestingPropertyVisitor(estimator));
+               
+               // take the interesting properties of the partial solution and 
add them to the root interesting properties
+               InterestingProperties partialSolutionIntProps = 
this.partialSolution.getInterestingProperties();
+               
intProps.getGlobalProperties().addAll(partialSolutionIntProps.getGlobalProperties());
+               
intProps.getLocalProperties().addAll(partialSolutionIntProps.getLocalProperties());
+               
+               // clear all interesting properties to prepare the second 
traversal
+               // this clears only the path down from the next partial 
solution. The paths down
+               // from the termination criterion (before they meet the paths 
down from the next partial solution)
+               // remain unaffected by this step
+               this.rootConnection.clearInterestingProperties();
+               
this.nextPartialSolution.accept(InterestingPropertiesClearer.INSTANCE);
+               
+               // 2nd pass
+               this.rootConnection.setInterestingProperties(intProps);
+               this.nextPartialSolution.accept(new 
InterestingPropertyVisitor(estimator));
+               
+               // now add the interesting properties of the partial solution 
to the input
+               final InterestingProperties inProps = 
this.partialSolution.getInterestingProperties().clone();
+               inProps.addGlobalProperties(new RequestedGlobalProperties());
+               inProps.addLocalProperties(new RequestedLocalProperties());
+               this.inConn.setInterestingProperties(inProps);
+       }
+       
+       @Override
+       public void clearInterestingProperties() {
+               super.clearInterestingProperties();
+               
+               this.singleRoot.accept(InterestingPropertiesClearer.INSTANCE);
+               this.rootConnection.clearInterestingProperties();
+       }
+
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+
+               // the resulting branches are those of the step function
+               // because the BulkPartialSolution takes the input's branches
+               
addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
+               List<UnclosedBranchDescriptor> result = 
getSingleRootOfStepFunction().openBranches;
+
+               this.openBranches = (result == null || result.isEmpty()) ? 
Collections.<UnclosedBranchDescriptor>emptyList() : result;
+       }
+
+
+       @Override
+       protected void instantiateCandidate(OperatorDescriptorSingle dps, 
Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels, 
+                       List<PlanNode> target, CostEstimator estimator, 
RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
+       {
+               // NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS:
+               // Whenever we instantiate the iteration, we enumerate new 
candidates for the step function.
+               // That way, we make sure we have an appropriate plan for each 
candidate for the initial partial solution,
+               // we have a fitting candidate for the step function (often, 
work is pushed out of the step function).
+               // Among the candidates of the step function, we keep only 
those that meet the requested properties of the
+               // current candidate initial partial solution. That makes sure 
these properties exist at the beginning of
+               // the successive iteration.
+               
+               // 1) Because we enumerate multiple times, we may need to clean 
the cached plans
+               //    before starting another enumeration
+               this.nextPartialSolution.accept(PlanCacheCleaner.INSTANCE);
+               if (this.terminationCriterion != null) {
+                       
this.terminationCriterion.accept(PlanCacheCleaner.INSTANCE);
+               }
+               
+               // 2) Give the partial solution the properties of the current 
candidate for the initial partial solution
+               
this.partialSolution.setCandidateProperties(in.getGlobalProperties(), 
in.getLocalProperties(), in);
+               final BulkPartialSolutionPlanNode pspn = 
this.partialSolution.getCurrentPartialSolutionPlanNode();
+               
+               // 3) Get the alternative plans
+               List<PlanNode> candidates = 
this.nextPartialSolution.getAlternativePlans(estimator);
+               
+               // 4) Make sure that the beginning of the step function does 
not assume properties that 
+               //    are not also produced by the end of the step function.
+
+               {
+                       List<PlanNode> newCandidates = new 
ArrayList<PlanNode>();
+                       
+                       for (Iterator<PlanNode> planDeleter = 
candidates.iterator(); planDeleter.hasNext(); ) {
+                               PlanNode candidate = planDeleter.next();
+                               
+                               GlobalProperties atEndGlobal = 
candidate.getGlobalProperties();
+                               LocalProperties atEndLocal = 
candidate.getLocalProperties();
+                               
+                               FeedbackPropertiesMeetRequirementsReport report 
= candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
+                               if (report == 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+                                       ; // depends only through broadcast 
variable on the partial solution
+                               }
+                               else if (report == 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+                                       // attach a no-op node through which we 
create the properties of the original input
+                                       Channel toNoOp = new Channel(candidate);
+                                       
globPropsReq.parameterizeChannel(toNoOp, false, 
rootConnection.getDataExchangeMode(), false);
+                                       locPropsReq.parameterizeChannel(toNoOp);
+                                       
+                                       UnaryOperatorNode rebuildPropertiesNode 
= new UnaryOperatorNode("Rebuild Partial Solution Properties", 
FieldList.EMPTY_LIST);
+                                       
rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
+                                       
+                                       SingleInputPlanNode 
rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, 
"Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+                                       
rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), 
toNoOp.getLocalProperties());
+                                       
estimator.costOperator(rebuildPropertiesPlanNode);
+                                               
+                                       GlobalProperties atEndGlobalModified = 
rebuildPropertiesPlanNode.getGlobalProperties();
+                                       LocalProperties atEndLocalModified = 
rebuildPropertiesPlanNode.getLocalProperties();
+                                               
+                                       if 
(!(atEndGlobalModified.equals(atEndGlobal) && 
atEndLocalModified.equals(atEndLocal))) {
+                                               
FeedbackPropertiesMeetRequirementsReport report2 = 
candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobalModified, 
atEndLocalModified);
+                                               
+                                               if (report2 != 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+                                                       
newCandidates.add(rebuildPropertiesPlanNode);
+                                               }
+                                       }
+                                       
+                                       planDeleter.remove();
+                               }
+                       }
+               }
+               
+               if (candidates.isEmpty()) {
+                       return;
+               }
+               
+               // 5) Create a candidate for the Iteration Node for every 
remaining plan of the step function.
+               if (terminationCriterion == null) {
+                       for (PlanNode candidate : candidates) {
+                               BulkIterationPlanNode node = new 
BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", 
in, pspn, candidate);
+                               GlobalProperties gProps = 
candidate.getGlobalProperties().clone();
+                               LocalProperties lProps = 
candidate.getLocalProperties().clone();
+                               node.initProperties(gProps, lProps);
+                               target.add(node);
+                       }
+               }
+               else if (candidates.size() > 0) {
+                       List<PlanNode> terminationCriterionCandidates = 
this.terminationCriterion.getAlternativePlans(estimator);
+
+                       SingleRootJoiner singleRoot = (SingleRootJoiner) 
this.singleRoot;
+                       
+                       for (PlanNode candidate : candidates) {
+                               for (PlanNode terminationCandidate : 
terminationCriterionCandidates) {
+                                       if 
(singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
+                                               BulkIterationPlanNode node = 
new BulkIterationPlanNode(this, "BulkIteration 
("+this.getOperator().getName()+")", in, pspn, candidate, terminationCandidate);
+                                               GlobalProperties gProps = 
candidate.getGlobalProperties().clone();
+                                               LocalProperties lProps = 
candidate.getLocalProperties().clone();
+                                               node.initProperties(gProps, 
lProps);
+                                               target.add(node);
+                                               
+                                       }
+                               }
+                       }
+                       
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                      Iteration Specific Traversals
+       // 
--------------------------------------------------------------------------------------------
+
+       public void acceptForStepFunction(Visitor<OptimizerNode> visitor) {
+               this.singleRoot.accept(visitor);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
new file mode 100644
index 0000000..25a7eef
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import 
org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.PlanNode;
+
+/**
+ * The optimizer's internal representation of the partial solution that is 
input to a bulk iteration.
+ */
+public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
+       
+       private final BulkIterationNode iterationNode;
+       
+       
+       public BulkPartialSolutionNode(PartialSolutionPlaceHolder<?> psph, 
BulkIterationNode iterationNode) {
+               super(psph);
+               this.iterationNode = iterationNode;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void setCandidateProperties(GlobalProperties gProps, 
LocalProperties lProps, Channel initialInput) {
+               if (this.cachedPlans != null) {
+                       throw new IllegalStateException();
+               } else {
+                       this.cachedPlans = 
Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this,
+                                       "PartialSolution 
("+this.getOperator().getName()+")", gProps, lProps, initialInput));
+               }
+       }
+       
+       public BulkPartialSolutionPlanNode getCurrentPartialSolutionPlanNode() {
+               if (this.cachedPlans != null) {
+                       return (BulkPartialSolutionPlanNode) 
this.cachedPlans.get(0);
+               } else {
+                       throw new IllegalStateException();
+               }
+       }
+       
+       public BulkIterationNode getIterationNode() {
+               return this.iterationNode;
+       }
+       
+       @Override
+       public void computeOutputEstimates(DataStatistics statistics) {
+               copyEstimates(this.iterationNode.getPredecessorNode());
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Gets the operator (here the {@link PartialSolutionPlaceHolder}) that 
is represented by this
+        * optimizer node.
+        * 
+        * @return The operator represented by this optimizer node.
+        */
+       @Override
+       public PartialSolutionPlaceHolder<?> getOperator() {
+               return (PartialSolutionPlaceHolder<?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Bulk Partial Solution";
+       }
+
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+
+               OptimizerNode inputToIteration = 
this.iterationNode.getPredecessorNode();
+               
+               addClosedBranches(inputToIteration.closedBranchingNodes);
+               List<UnclosedBranchDescriptor> fromInput = 
inputToIteration.getBranchesForParent(this.iterationNode.getIncomingConnection());
+               this.openBranches = (fromInput == null || fromInput.isEmpty()) 
? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
new file mode 100644
index 0000000..92076c3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.CoGroupDescriptor;
+import 
org.apache.flink.optimizer.operators.CoGroupWithSolutionSetFirstDescriptor;
+import 
org.apache.flink.optimizer.operators.CoGroupWithSolutionSetSecondDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+
+/**
+ * The Optimizer representation of a <i>CoGroup</i> operator.
+ */
+public class CoGroupNode extends TwoInputNode {
+       
+       private List<OperatorDescriptorDual> dataProperties;
+       
+       public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> operator) {
+               super(operator);
+               this.dataProperties = 
initializeDataProperties(operator.getCustomPartitioner());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Gets the operator for this CoGroup node.
+        * 
+        * @return The CoGroup operator.
+        */
+       @Override
+       public CoGroupOperatorBase<?, ?, ?, ?> getOperator() {
+               return (CoGroupOperatorBase<?, ?, ?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "CoGroup";
+       }
+
+       @Override
+       protected List<OperatorDescriptorDual> getPossibleProperties() {
+               return this.dataProperties;
+       }
+       
+       public void makeCoGroupWithSolutionSet(int solutionsetInputIndex) {
+               OperatorDescriptorDual op;
+               if (solutionsetInputIndex == 0) {
+                       op = new CoGroupWithSolutionSetFirstDescriptor(keys1, 
keys2);
+               } else if (solutionsetInputIndex == 1) {
+                       op = new CoGroupWithSolutionSetSecondDescriptor(keys1, 
keys2);
+               } else {
+                       throw new IllegalArgumentException();
+               }
+               this.dataProperties = 
Collections.<OperatorDescriptorDual>singletonList(op);
+       }
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // for CoGroup, we currently make no reasonable default 
estimates
+       }
+       
+       private List<OperatorDescriptorDual> 
initializeDataProperties(Partitioner<?> customPartitioner) {
+               Ordering groupOrder1 = null;
+               Ordering groupOrder2 = null;
+               
+               CoGroupOperatorBase<?, ?, ?, ?> cgc = getOperator();
+               groupOrder1 = cgc.getGroupOrderForInputOne();
+               groupOrder2 = cgc.getGroupOrderForInputTwo();
+                       
+               if (groupOrder1 != null && groupOrder1.getNumberOfFields() == 
0) {
+                       groupOrder1 = null;
+               }
+               if (groupOrder2 != null && groupOrder2.getNumberOfFields() == 
0) {
+                       groupOrder2 = null;
+               }
+               
+               CoGroupDescriptor descr = new CoGroupDescriptor(this.keys1, 
this.keys2, groupOrder1, groupOrder2);
+               if (customPartitioner != null) {
+                       descr.setCustomPartitioner(customPartitioner);
+               }
+               
+               return Collections.<OperatorDescriptorDual>singletonList(descr);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
new file mode 100644
index 0000000..93be1e4
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.CollectorMapDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>Map</i> operator node.
+ */
+public class CollectorMapNode extends SingleInputNode {
+       
+       private final List<OperatorDescriptorSingle> possibleProperties;
+
+       
+       public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) {
+               super(operator);
+               
+               this.possibleProperties = 
Collections.<OperatorDescriptorSingle>singletonList(new 
CollectorMapDescriptor());
+       }
+
+       @Override
+       public String getName() {
+               return "Map";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       /**
+        * Computes the estimates for the Map operator. Map takes one value and 
transforms it into another value.
+        * The cardinality consequently stays the same.
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
new file mode 100644
index 0000000..8de67e8
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.CrossOperatorBase;
+import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.operators.CrossBlockOuterFirstDescriptor;
+import org.apache.flink.optimizer.operators.CrossBlockOuterSecondDescriptor;
+import org.apache.flink.optimizer.operators.CrossStreamOuterFirstDescriptor;
+import org.apache.flink.optimizer.operators.CrossStreamOuterSecondDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * The Optimizer representation of a <i>Cross</i> (Cartesian product) operator.
+ */
+public class CrossNode extends TwoInputNode {
+       
+       private final List<OperatorDescriptorDual> dataProperties;
+       
+       /**
+        * Creates a new CrossNode for the given operator.
+        * 
+        * @param operation The Cross operator object.
+        */
+       public CrossNode(CrossOperatorBase<?, ?, ?, ?> operation) {
+               super(operation);
+               
+               Configuration conf = operation.getParameters();
+               String localStrategy = 
conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
+       
+               CrossHint hint = operation.getCrossHint();
+               
+               if (localStrategy != null) {
+                       
+                       final boolean allowBCfirst = hint != 
CrossHint.SECOND_IS_SMALL;
+                       final boolean allowBCsecond = hint != 
CrossHint.FIRST_IS_SMALL;
+                       
+                       final OperatorDescriptorDual fixedDriverStrat;
+                       if 
(Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy))
 {
+                               fixedDriverStrat = new 
CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond);
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy))
 {
+                               fixedDriverStrat = new 
CrossBlockOuterSecondDescriptor(allowBCfirst, allowBCsecond);
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy))
 {
+                               fixedDriverStrat = new 
CrossStreamOuterFirstDescriptor(allowBCfirst, allowBCsecond);
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy))
 {
+                               fixedDriverStrat = new 
CrossStreamOuterSecondDescriptor(allowBCfirst, allowBCsecond);
+                       } else {
+                               throw new CompilerException("Invalid local 
strategy hint for cross contract: " + localStrategy);
+                       }
+                       
+                       this.dataProperties = 
Collections.singletonList(fixedDriverStrat);
+               }
+               else if (hint == CrossHint.SECOND_IS_SMALL) {
+                       ArrayList<OperatorDescriptorDual> list = new 
ArrayList<OperatorDescriptorDual>();
+                       list.add(new CrossBlockOuterSecondDescriptor(false, 
true));
+                       list.add(new CrossStreamOuterFirstDescriptor(false, 
true));
+                       this.dataProperties = list;
+               }
+               else if (hint == CrossHint.FIRST_IS_SMALL) {
+                       ArrayList<OperatorDescriptorDual> list = new 
ArrayList<OperatorDescriptorDual>();
+                       list.add(new CrossBlockOuterFirstDescriptor(true, 
false));
+                       list.add(new CrossStreamOuterSecondDescriptor(true, 
false));
+                       this.dataProperties = list;
+               }
+               else {
+                       ArrayList<OperatorDescriptorDual> list = new 
ArrayList<OperatorDescriptorDual>();
+                       list.add(new CrossBlockOuterFirstDescriptor());
+                       list.add(new CrossBlockOuterSecondDescriptor());
+                       list.add(new CrossStreamOuterFirstDescriptor());
+                       list.add(new CrossStreamOuterSecondDescriptor());
+                       this.dataProperties = list;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public CrossOperatorBase<?, ?, ?, ?> getOperator() {
+               return (CrossOperatorBase<?, ?, ?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Cross";
+       }
+       
+       @Override
+       protected List<OperatorDescriptorDual> getPossibleProperties() {
+               return this.dataProperties;
+       }
+
+       /**
+        * We assume that the cardinality is the product of  the input 
cardinalities
+        * and that the result width is the sum of the input widths.
+        * 
+        * @param statistics The statistics object to optionally access.
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+               long card2 = 
getSecondPredecessorNode().getEstimatedNumRecords();
+               this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : 
card1 * card2;
+               
+               if (this.estimatedNumRecords >= 0) {
+                       float width1 = 
getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+                       float width2 = 
getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+                       float width = (width1 <= 0 || width2 <= 0) ? -1 : 
width1 + width2;
+                       
+                       if (width > 0) {
+                               this.estimatedOutputSize = (long) (width * 
this.estimatedNumRecords);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
new file mode 100644
index 0000000..4e65976
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * A connection between to operators. Represents an intermediate result
+ * and a data exchange between the two operators.
+ *
+ * The data exchange has a mode in which it performs (batch / pipelined).
+ *
+ * The data exchange strategy may be set on this connection, in which case
+ * it is fixed and will not be determined during candidate plan enumeration.
+ *
+ * During the enumeration of interesting properties, this connection also holds
+ * all interesting properties generated by the successor operator.
+ */
+public class DagConnection implements EstimateProvider, 
DumpableConnection<OptimizerNode> {
+       
+       private final OptimizerNode source; // The source node of the connection
+
+       private final OptimizerNode target; // The target node of the 
connection.
+
+       private final ExecutionMode dataExchangeMode; // defines whether to use 
batch or pipelined data exchange
+
+       private InterestingProperties interestingProps; // local properties 
that succeeding nodes are interested in
+
+       private ShipStrategyType shipStrategy; // The data shipping strategy, 
if predefined.
+       
+       private TempMode materializationMode = TempMode.NONE; // the 
materialization mode
+       
+       private int maxDepth = -1;
+
+       private boolean breakPipeline;  // whether this connection should break 
the pipeline due to potential deadlocks
+
+       /**
+        * Creates a new Connection between two nodes. The shipping strategy is 
by default <tt>NONE</tt>.
+        * The temp mode is by default <tt>NONE</tt>.
+        * 
+        * @param source
+        *        The source node.
+        * @param target
+        *        The target node.
+        */
+       public DagConnection(OptimizerNode source, OptimizerNode target, 
ExecutionMode exchangeMode) {
+               this(source, target, null, exchangeMode);
+       }
+
+       /**
+        * Creates a new Connection between two nodes.
+        * 
+        * @param source
+        *        The source node.
+        * @param target
+        *        The target node.
+        * @param shipStrategy
+        *        The shipping strategy.
+        * @param exchangeMode
+        *        The data exchange mode (pipelined / batch / batch only for 
shuffles / ... )
+        */
+       public DagConnection(OptimizerNode source, OptimizerNode target,
+                                                       ShipStrategyType 
shipStrategy, ExecutionMode exchangeMode)
+       {
+               if (source == null || target == null) {
+                       throw new NullPointerException("Source and target must 
not be null.");
+               }
+               this.source = source;
+               this.target = target;
+               this.shipStrategy = shipStrategy;
+               this.dataExchangeMode = exchangeMode;
+       }
+       
+       /**
+        * Constructor to create a result from an operator that is not
+        * consumed by another operator.
+        * 
+        * @param source
+        *        The source node.
+        */
+       public DagConnection(OptimizerNode source, ExecutionMode exchangeMode) {
+               if (source == null) {
+                       throw new NullPointerException("Source and target must 
not be null.");
+               }
+               this.source = source;
+               this.target = null;
+               this.shipStrategy = ShipStrategyType.NONE;
+               this.dataExchangeMode = exchangeMode;
+       }
+
+       /**
+        * Gets the source of the connection.
+        * 
+        * @return The source Node.
+        */
+       public OptimizerNode getSource() {
+               return this.source;
+       }
+
+       /**
+        * Gets the target of the connection.
+        * 
+        * @return The target node.
+        */
+       public OptimizerNode getTarget() {
+               return this.target;
+       }
+
+       /**
+        * Gets the shipping strategy for this connection.
+        * 
+        * @return The connection's shipping strategy.
+        */
+       public ShipStrategyType getShipStrategy() {
+               return this.shipStrategy;
+       }
+
+       /**
+        * Sets the shipping strategy for this connection.
+        * 
+        * @param strategy
+        *        The shipping strategy to be applied to this connection.
+        */
+       public void setShipStrategy(ShipStrategyType strategy) {
+               this.shipStrategy = strategy;
+       }
+
+       /**
+        * Gets the data exchange mode to use for this connection.
+        *
+        * @return The data exchange mode to use for this connection.
+        */
+       public ExecutionMode getDataExchangeMode() {
+               if (dataExchangeMode == null) {
+                       throw new IllegalStateException("This connection does 
not have the data exchange mode set");
+               }
+               return dataExchangeMode;
+       }
+
+       /**
+        * Marks that this connection should do a decoupled data exchange (such 
as batched)
+        * rather then pipeline data. Connections are marked as pipeline 
breakers to avoid
+        * deadlock situations.
+        */
+       public void markBreaksPipeline() {
+               this.breakPipeline = true;
+       }
+
+       /**
+        * Checks whether this connection is marked to break the pipeline.
+        *
+        * @return True, if this connection is marked to break the pipeline, 
false otherwise.
+        */
+       public boolean isBreakingPipeline() {
+               return this.breakPipeline;
+       }
+
+       /**
+        * Gets the interesting properties object for this pact connection.
+        * If the interesting properties for this connections have not yet been 
set,
+        * this method returns null.
+        * 
+        * @return The collection of all interesting properties, or null, if 
they have not yet been set.
+        */
+       public InterestingProperties getInterestingProperties() {
+               return this.interestingProps;
+       }
+
+       /**
+        * Sets the interesting properties for this pact connection.
+        * 
+        * @param props The interesting properties.
+        */
+       public void setInterestingProperties(InterestingProperties props) {
+               if (this.interestingProps == null) {
+                       this.interestingProps = props;
+               } else {
+                       throw new IllegalStateException("Interesting Properties 
have already been set.");
+               }
+       }
+       
+       public void clearInterestingProperties() {
+               this.interestingProps = null;
+       }
+       
+       public void initMaxDepth() {
+               
+               if (this.maxDepth == -1) {
+                       this.maxDepth = this.source.getMaxDepth() + 1;
+               } else {
+                       throw new IllegalStateException("Maximum path depth has 
already been initialized.");
+               }
+       }
+       
+       public int getMaxDepth() {
+               if (this.maxDepth != -1) {
+                       return this.maxDepth;
+               } else {
+                       throw new IllegalStateException("Maximum path depth has 
not been initialized.");
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Estimates
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public long getEstimatedOutputSize() {
+               return this.source.getEstimatedOutputSize();
+       }
+
+       @Override
+       public long getEstimatedNumRecords() {
+               return this.source.getEstimatedNumRecords();
+       }
+       
+       @Override
+       public float getEstimatedAvgWidthPerOutputRecord() {
+               return this.source.getEstimatedAvgWidthPerOutputRecord();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       
+       public TempMode getMaterializationMode() {
+               return this.materializationMode;
+       }
+       
+       public void setMaterializationMode(TempMode materializationMode) {
+               this.materializationMode = materializationMode;
+       }
+       
+       public boolean isOnDynamicPath() {
+               return this.source.isOnDynamicPath();
+       }
+       
+       public int getCostWeight() {
+               return this.source.getCostWeight();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public String toString() {
+               StringBuilder buf = new StringBuilder(50);
+               buf.append("Connection: ");
+
+               if (this.source == null) {
+                       buf.append("null");
+               } else {
+                       buf.append(this.source.getOperator().getName());
+                       
buf.append('(').append(this.source.getName()).append(')');
+               }
+
+               buf.append(" -> ");
+
+               if (this.shipStrategy != null) {
+                       buf.append('[');
+                       buf.append(this.shipStrategy.name());
+                       buf.append(']').append(' ');
+               }
+
+               if (this.target == null) {
+                       buf.append("null");
+               } else {
+                       buf.append(this.target.getOperator().getName());
+                       
buf.append('(').append(this.target.getName()).append(')');
+               }
+
+               return buf.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
new file mode 100644
index 0000000..dbe04f4
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The Optimizer representation of a data sink.
+ */
+public class DataSinkNode extends OptimizerNode {
+       
+       protected DagConnection input;                  // The input edge
+       
+       /**
+        * Creates a new DataSinkNode for the given sink operator.
+        * 
+        * @param sink The data sink contract object.
+        */
+       public DataSinkNode(GenericDataSinkBase<?> sink) {
+               super(sink);
+       }
+
+       // 
--------------------------------------------------------------------------------------
+       
+       /**
+        * Gets the input of the sink.
+        * 
+        * @return The input connection.
+        */
+       public DagConnection getInputConnection() {
+               return this.input;
+       }
+       
+       /**
+        * Gets the predecessor of this node.
+        *
+        * @return The predecessor, or null, if no predecessor has been set.
+        */
+       public OptimizerNode getPredecessorNode() {
+               if(this.input != null) {
+                       return input.getSource();
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Gets the operator for which this optimizer sink node was created.
+        * 
+        * @return The node's underlying operator.
+        */
+       @Override
+       public GenericDataSinkBase<?> getOperator() {
+               return (GenericDataSinkBase<?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Data Sink";
+       }
+
+       @Override
+       public List<DagConnection> getIncomingConnections() {
+               return Collections.singletonList(this.input);
+       }
+
+       /**
+        * Gets all outgoing connections, which is an empty set for the data 
sink.
+        *
+        * @return An empty list.
+        */
+       @Override
+       public List<DagConnection> getOutgoingConnections() {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, 
ExecutionMode defaultExchangeMode) {
+               Operator<?> children = getOperator().getInput();
+
+               final OptimizerNode pred;
+               final DagConnection conn;
+               
+               pred = contractToNode.get(children);
+               conn = new DagConnection(pred, this, defaultExchangeMode);
+                       
+               // create the connection and add it
+               this.input = conn;
+               pred.addOutgoingConnection(conn);
+       }
+
+       /**
+        * Computes the estimated outputs for the data sink. Since the sink 
does not modify anything, it simply
+        * copies the output estimates from its direct predecessor.
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+               this.estimatedOutputSize = 
getPredecessorNode().getEstimatedOutputSize();
+       }
+
+       @Override
+       public void computeInterestingPropertiesForInputs(CostEstimator 
estimator) {
+               final InterestingProperties iProps = new 
InterestingProperties();
+               
+               {
+                       final Ordering partitioning = 
getOperator().getPartitionOrdering();
+                       final DataDistribution dataDist = 
getOperator().getDataDistribution();
+                       final RequestedGlobalProperties partitioningProps = new 
RequestedGlobalProperties();
+                       if (partitioning != null) {
+                               if(dataDist != null) {
+                                       
partitioningProps.setRangePartitioned(partitioning, dataDist);
+                               } else {
+                                       
partitioningProps.setRangePartitioned(partitioning);
+                               }
+                               iProps.addGlobalProperties(partitioningProps);
+                       }
+                       iProps.addGlobalProperties(partitioningProps);
+               }
+               
+               {
+                       final Ordering localOrder = 
getOperator().getLocalOrder();
+                       final RequestedLocalProperties orderProps = new 
RequestedLocalProperties();
+                       if (localOrder != null) {
+                               orderProps.setOrdering(localOrder);
+                       }
+                       iProps.addLocalProperties(orderProps);
+               }
+               
+               this.input.setInterestingProperties(iProps);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                     Branch Handling
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+
+               // we need to track open branches even in the sinks, because 
they get "closed" when
+               // we build a single "root" for the data flow plan
+               addClosedBranches(getPredecessorNode().closedBranchingNodes);
+               this.openBranches = 
getPredecessorNode().getBranchesForParent(this.input);
+       }
+       
+       @Override
+       protected List<UnclosedBranchDescriptor> 
getBranchesForParent(DagConnection parent) {
+               // return our own stack of open branches, because nothing is 
added
+               return this.openBranches;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //                                   Recursive Optimization
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
+               // check if we have a cached version
+               if (this.cachedPlans != null) {
+                       return this.cachedPlans;
+               }
+               
+               // calculate alternative sub-plans for predecessor
+               List<? extends PlanNode> subPlans = 
getPredecessorNode().getAlternativePlans(estimator);
+               List<PlanNode> outputPlans = new ArrayList<PlanNode>();
+               
+               final int dop = getParallelism();
+               final int inDop = getPredecessorNode().getParallelism();
+
+               final ExecutionMode executionMode = 
this.input.getDataExchangeMode();
+               final boolean dopChange = dop != inDop;
+               final boolean breakPipeline = this.input.isBreakingPipeline();
+
+               InterestingProperties ips = 
this.input.getInterestingProperties();
+               for (PlanNode p : subPlans) {
+                       for (RequestedGlobalProperties gp : 
ips.getGlobalProperties()) {
+                               for (RequestedLocalProperties lp : 
ips.getLocalProperties()) {
+                                       Channel c = new Channel(p);
+                                       gp.parameterizeChannel(c, dopChange, 
executionMode, breakPipeline);
+                                       lp.parameterizeChannel(c);
+                                       c.setRequiredLocalProps(lp);
+                                       c.setRequiredGlobalProps(gp);
+                                       
+                                       // no need to check whether the created 
properties meet what we need in case
+                                       // of ordering or global ordering, 
because the only interesting properties we have
+                                       // are what we require
+                                       outputPlans.add(new SinkPlanNode(this, 
"DataSink ("+this.getOperator().getName()+")" ,c));
+                               }
+                       }
+               }
+               
+               // cost and prune the plans
+               for (PlanNode node : outputPlans) {
+                       estimator.costOperator(node);
+               }
+               prunePlanAlternatives(outputPlans);
+
+               this.cachedPlans = outputPlans;
+               return outputPlans;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                   Function Annotation Handling
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new EmptySemanticProperties();
+       }
+               
+       // 
--------------------------------------------------------------------------------------------
+       //                                     Miscellaneous
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void accept(Visitor<OptimizerNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       if (getPredecessorNode() != null) {
+                               getPredecessorNode().accept(visitor);
+                       } else {
+                               throw new CompilerException();
+                       }
+                       visitor.postVisit(this);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
new file mode 100644
index 0000000..e4b35b7
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import 
org.apache.flink.api.common.operators.GenericDataSourceBase.SplitDataProperties;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The optimizer's internal representation of a data source.
+ */
+public class DataSourceNode extends OptimizerNode {
+       
+       private final boolean sequentialInput;
+
+       private final boolean replicatedInput;
+
+       private GlobalProperties gprops;
+
+       private LocalProperties lprops;
+
+       /**
+        * Creates a new DataSourceNode for the given contract.
+        * 
+        * @param pactContract
+        *        The data source contract object.
+        */
+       public DataSourceNode(GenericDataSourceBase<?, ?> pactContract) {
+               super(pactContract);
+               
+               if (pactContract.getUserCodeWrapper().getUserCodeClass() == 
null) {
+                       throw new IllegalArgumentException("Input format has 
not been set.");
+               }
+               
+               if 
(NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass()))
 {
+                       setDegreeOfParallelism(1);
+                       this.sequentialInput = true;
+               } else {
+                       this.sequentialInput = false;
+               }
+
+               this.replicatedInput = 
ReplicatingInputFormat.class.isAssignableFrom(
+                                                                               
                                
pactContract.getUserCodeWrapper().getUserCodeClass());
+
+               this.gprops = new GlobalProperties();
+               this.lprops = new LocalProperties();
+
+               SplitDataProperties<?> splitProps = 
pactContract.getSplitDataProperties();
+
+               if(replicatedInput) {
+                       this.gprops.setFullyReplicated();
+                       this.lprops = new LocalProperties();
+               } else if (splitProps != null) {
+                       // configure data properties of data source using split 
properties
+                       setDataPropertiesFromSplitProperties(splitProps);
+               }
+
+       }
+
+       /**
+        * Gets the contract object for this data source node.
+        * 
+        * @return The contract.
+        */
+       @Override
+       public GenericDataSourceBase<?, ?> getOperator() {
+               return (GenericDataSourceBase<?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Data Source";
+       }
+
+       @Override
+       public void setDegreeOfParallelism(int degreeOfParallelism) {
+               // if unsplittable, parallelism remains at 1
+               if (!this.sequentialInput) {
+                       super.setDegreeOfParallelism(degreeOfParallelism);
+               }
+       }
+
+       @Override
+       public List<DagConnection> getIncomingConnections() {
+               return Collections.<DagConnection>emptyList();
+       }
+
+       @Override
+       public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, 
ExecutionMode defaultDataExchangeMode) {}
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // see, if we have a statistics object that can tell us a bit 
about the file
+               if (statistics != null) {
+                       // instantiate the input format, as this is needed by 
the statistics 
+                       InputFormat<?, ?> format;
+                       String inFormatDescription = "<unknown>";
+                       
+                       try {
+                               format = 
getOperator().getFormatWrapper().getUserCodeObject();
+                               Configuration config = 
getOperator().getParameters();
+                               format.configure(config);
+                       }
+                       catch (Throwable t) {
+                               if (Optimizer.LOG.isWarnEnabled()) {
+                                       Optimizer.LOG.warn("Could not 
instantiate InputFormat to obtain statistics."
+                                               + " Limited statistics will be 
available.", t);
+                               }
+                               return;
+                       }
+                       try {
+                               inFormatDescription = format.toString();
+                       }
+                       catch (Throwable t) {
+                               // we can ignore this error, as it only 
prevents us to use a cosmetic string
+                       }
+                       
+                       // first of all, get the statistics from the cache
+                       final String statisticsKey = 
getOperator().getStatisticsKey();
+                       final BaseStatistics cachedStatistics = 
statistics.getBaseStatistics(statisticsKey);
+                       
+                       BaseStatistics bs = null;
+                       try {
+                               bs = format.getStatistics(cachedStatistics);
+                       }
+                       catch (Throwable t) {
+                               if (Optimizer.LOG.isWarnEnabled()) {
+                                       Optimizer.LOG.warn("Error obtaining 
statistics from input format: " + t.getMessage(), t);
+                               }
+                       }
+                       
+                       if (bs != null) {
+                               final long len = bs.getTotalInputSize();
+                               if (len == BaseStatistics.SIZE_UNKNOWN) {
+                                       if (Optimizer.LOG.isInfoEnabled()) {
+                                               Optimizer.LOG.info("Compiler 
could not determine the size of input '" + inFormatDescription + "'. Using 
default estimates.");
+                                       }
+                               }
+                               else if (len >= 0) {
+                                       this.estimatedOutputSize = len;
+                               }
+                               
+                               final long card = bs.getNumberOfRecords();
+                               if (card != BaseStatistics.NUM_RECORDS_UNKNOWN) 
{
+                                       this.estimatedNumRecords = card;
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void computeInterestingPropertiesForInputs(CostEstimator 
estimator) {
+               // no children, so nothing to compute
+       }
+
+       @Override
+       public void computeUnclosedBranchStack() {
+               // because there are no inputs, there are no unclosed branches.
+               this.openBranches = Collections.emptyList();
+       }
+
+       @Override
+       public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
+               if (this.cachedPlans != null) {
+                       return this.cachedPlans;
+               }
+
+               SourcePlanNode candidate = new SourcePlanNode(this, "DataSource 
("+this.getOperator().getName()+")",
+                               this.gprops, this.lprops);
+
+               if(!replicatedInput) {
+                       
candidate.updatePropertiesWithUniqueSets(getUniqueFields());
+
+                       final Costs costs = new Costs();
+                       if 
(FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass())
 &&
+                                       this.estimatedOutputSize >= 0) {
+                               
estimator.addFileInputCost(this.estimatedOutputSize, costs);
+                       }
+                       candidate.setCosts(costs);
+               } else {
+                       // replicated input
+                       final Costs costs = new Costs();
+                       InputFormat<?,?> inputFormat =
+                                       ((ReplicatingInputFormat<?,?>) 
getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
+                       if 
(FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
+                                       this.estimatedOutputSize >= 0) {
+                               
estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), 
costs);
+                       }
+                       candidate.setCosts(costs);
+               }
+
+               // since there is only a single plan for the data-source, 
return a list with that element only
+               List<PlanNode> plans = new ArrayList<PlanNode>(1);
+               plans.add(candidate);
+
+               this.cachedPlans = plans;
+               return plans;
+       }
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new EmptySemanticProperties();
+       }
+       
+       @Override
+       public void accept(Visitor<OptimizerNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       visitor.postVisit(this);
+               }
+       }
+
+       private void setDataPropertiesFromSplitProperties(SplitDataProperties 
splitProps) {
+
+               // set global properties
+               int[] partitionKeys = splitProps.getSplitPartitionKeys();
+               Partitioner<?> partitioner = splitProps.getSplitPartitioner();
+
+               if(partitionKeys != null && partitioner != null) {
+                       this.gprops.setCustomPartitioned(new 
FieldList(partitionKeys), partitioner);
+               }
+               else if(partitionKeys != null) {
+                       this.gprops.setAnyPartitioning(new 
FieldList(partitionKeys));
+               }
+               // set local properties
+               int[] groupingKeys = splitProps.getSplitGroupKeys();
+               Ordering ordering = splitProps.getSplitOrder();
+
+               // more than one split per source tasks possible.
+               // adapt split grouping and sorting
+               if(ordering != null) {
+
+                       // sorting falls back to grouping because a source can 
read multiple,
+                       // randomly assigned splits
+                       groupingKeys = ordering.getFieldPositions();
+               }
+
+               if(groupingKeys != null && partitionKeys != null) {
+                       // check if grouping is also valid across splits, i.e., 
whether grouping keys are
+                       // valid superset of partition keys
+                       boolean allFieldsIncluded = true;
+                       for(int i : partitionKeys) {
+                               boolean fieldIncluded = false;
+                               for(int j : groupingKeys) {
+                                       if(i == j) {
+                                               fieldIncluded = true;
+                                               break;
+                                       }
+                               }
+                               if(!fieldIncluded) {
+                                       allFieldsIncluded = false;
+                                       break;
+                               }
+                       }
+                       if (allFieldsIncluded) {
+                               this.lprops = LocalProperties.forGrouping(new 
FieldList(groupingKeys));
+                       } else {
+                               this.lprops = new LocalProperties();
+                       }
+
+               } else {
+                       this.lprops = new LocalProperties();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
new file mode 100644
index 0000000..482951b
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+/**
+ * Methods for operators / connections that provide estimated about data size 
and
+ * characteristics.
+ */
+public interface EstimateProvider {
+       
+       /**
+        * Gets the estimated output size from this node.
+        * 
+        * @return The estimated output size.
+        */
+       long getEstimatedOutputSize();
+
+       /**
+        * Gets the estimated number of records in the output of this node.
+        * 
+        * @return The estimated number of records.
+        */
+       long getEstimatedNumRecords();
+       
+       /**
+        * Gets the estimated number of bytes per record.
+        * 
+        * @return The estimated number of bytes per record.
+        */
+       float getEstimatedAvgWidthPerOutputRecord();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
new file mode 100644
index 0000000..118ddc8
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.FilterOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.FilterDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>FlatMap</i> operator node.
+ */
+public class FilterNode extends SingleInputNode {
+       
+       private final List<OperatorDescriptorSingle> possibleProperties;
+       
+       public FilterNode(FilterOperatorBase<?, ?> operator) {
+               super(operator);
+               this.possibleProperties = 
Collections.<OperatorDescriptorSingle>singletonList(new FilterDescriptor());
+       }
+
+       @Override
+       public FilterOperatorBase<?, ?> getOperator() {
+               return (FilterOperatorBase<?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Filter";
+       }
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       /**
+        * Computes the estimates for the Filter operator. Since it applies a 
filter on the data we assume a cardinality
+        * decrease. To give the system a hint at data decrease, we use a 
default magic number to indicate a 0.5 decrease. 
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               this.estimatedNumRecords = (long) 
(getPredecessorNode().getEstimatedNumRecords() * 0.5);
+               this.estimatedOutputSize = (long) 
(getPredecessorNode().getEstimatedOutputSize() * 0.5);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
new file mode 100644
index 0000000..f713d56
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.FlatMapDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>FlatMap</i> operator node.
+ */
+public class FlatMapNode extends SingleInputNode {
+       
+       private final List<OperatorDescriptorSingle> possibleProperties;
+       
+       public FlatMapNode(FlatMapOperatorBase<?, ?, ?> operator) {
+               super(operator);
+               
+               this.possibleProperties = 
Collections.<OperatorDescriptorSingle>singletonList(new FlatMapDescriptor());
+       }
+
+       @Override
+       public FlatMapOperatorBase<?, ?, ?> getOperator() {
+               return (FlatMapOperatorBase<?, ?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "FlatMap";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       /**
+        * Computes the estimates for the FlatMap operator. Since it un-nests, 
we assume a cardinality
+        * increase. To give the system a hint at data increase, we take a 
default magic number of a 5 times increase. 
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords() * 5;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
new file mode 100644
index 0000000..564c0d3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.AllGroupCombineProperties;
+import org.apache.flink.optimizer.operators.GroupCombineProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The optimizer representation of a <i>GroupCombineNode</i> operation.
+ */
+public class GroupCombineNode extends SingleInputNode {
+
+       private final List<OperatorDescriptorSingle> possibleProperties;
+
+       /**
+        * Creates a new optimizer node for the given operator.
+        *
+        * @param operator The reduce operation.
+        */
+       public GroupCombineNode(GroupCombineOperatorBase<?, ?, ?> operator) {
+               super(operator);
+
+               if (this.keys == null) {
+                       // case of a key-less reducer. force a parallelism of 1
+                       setDegreeOfParallelism(1);
+               }
+
+               this.possibleProperties = initPossibleProperties();
+       }
+
+       private List<OperatorDescriptorSingle> initPossibleProperties() {
+
+               // check if we can work with a grouping (simple reducer), or if 
we need ordering because of a group order
+               Ordering groupOrder = getOperator().getGroupOrder();
+               if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
+                       groupOrder = null;
+               }
+
+               OperatorDescriptorSingle props = (this.keys == null ?
+                               new AllGroupCombineProperties() :
+                               new GroupCombineProperties(this.keys, 
groupOrder));
+
+               return Collections.singletonList(props);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the operator represented by this optimizer node.
+        *
+        * @return The operator represented by this optimizer node.
+        */
+       @Override
+       public GroupCombineOperatorBase<?, ?, ?> getOperator() {
+               return (GroupCombineOperatorBase<?, ?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "GroupCombine";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Estimates
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // no real estimates possible for a reducer.
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
new file mode 100644
index 0000000..77acae5
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.operators.AllGroupReduceProperties;
+import 
org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupProperties;
+import org.apache.flink.optimizer.operators.GroupReduceProperties;
+import org.apache.flink.optimizer.operators.GroupReduceWithCombineProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * The optimizer representation of a <i>GroupReduce</i> operation.
+ */
+public class GroupReduceNode extends SingleInputNode {
+       
+       private final List<OperatorDescriptorSingle> possibleProperties;
+       
+       private GroupReduceNode combinerUtilityNode;
+       
+       /**
+        * Creates a new optimizer node for the given operator.
+        * 
+        * @param operator The reduce operation.
+        */
+       public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> operator) {
+               super(operator);
+               
+               if (this.keys == null) {
+                       // case of a key-less reducer. force a parallelism of 1
+                       setDegreeOfParallelism(1);
+               }
+               
+               this.possibleProperties = 
initPossibleProperties(operator.getCustomPartitioner());
+       }
+       
+       public GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
+               super(reducerToCopyForCombiner);
+               
+               this.possibleProperties = Collections.emptyList();
+       }
+       
+       private List<OperatorDescriptorSingle> 
initPossibleProperties(Partitioner<?> customPartitioner) {
+               // see if an internal hint dictates the strategy to use
+               final Configuration conf = getOperator().getParameters();
+               final String localStrategy = 
conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
+
+               final boolean useCombiner;
+               if (localStrategy != null) {
+                       if 
(Optimizer.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
+                               useCombiner = false;
+                       }
+                       else if 
(Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
+                               if (!isCombineable()) {
+                                       Optimizer.LOG.warn("Strategy hint for 
GroupReduce '" + getOperator().getName() +
+                                               "' requires combinable reduce, 
but user function is not marked combinable.");
+                               }
+                               useCombiner = true;
+                       } else {
+                               throw new CompilerException("Invalid local 
strategy hint for match contract: " + localStrategy);
+                       }
+               } else {
+                       useCombiner = isCombineable();
+               }
+               
+               // check if we can work with a grouping (simple reducer), or if 
we need ordering because of a group order
+               Ordering groupOrder = null;
+               if (getOperator() instanceof GroupReduceOperatorBase) {
+                       groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) 
getOperator()).getGroupOrder();
+                       if (groupOrder != null && 
groupOrder.getNumberOfFields() == 0) {
+                               groupOrder = null;
+                       }
+               }
+               
+               OperatorDescriptorSingle props = useCombiner ?
+                       (this.keys == null ? new 
AllGroupWithPartialPreGroupProperties() : new 
GroupReduceWithCombineProperties(this.keys, groupOrder, customPartitioner)) :
+                       (this.keys == null ? new AllGroupReduceProperties() : 
new GroupReduceProperties(this.keys, groupOrder, customPartitioner));
+
+               return Collections.singletonList(props);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the operator represented by this optimizer node.
+        * 
+        * @return The operator represented by this optimizer node.
+        */
+       @Override
+       public GroupReduceOperatorBase<?, ?, ?> getOperator() {
+               return (GroupReduceOperatorBase<?, ?, ?>) super.getOperator();
+       }
+
+       /**
+        * Checks, whether a combiner function has been given for the function 
encapsulated
+        * by this reduce contract.
+        * 
+        * @return True, if a combiner has been given, false otherwise.
+        */
+       public boolean isCombineable() {
+               return getOperator().isCombinable();
+       }
+
+       @Override
+       public String getName() {
+               return "GroupReduce";
+       }
+       
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Estimates
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // no real estimates possible for a reducer.
+       }
+       
+       public GroupReduceNode getCombinerUtilityNode() {
+               if (this.combinerUtilityNode == null) {
+                       this.combinerUtilityNode = new GroupReduceNode(this);
+                       
+                       // we conservatively assume the combiner returns the 
same data size as it consumes 
+                       this.combinerUtilityNode.estimatedOutputSize = 
getPredecessorNode().getEstimatedOutputSize();
+                       this.combinerUtilityNode.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+               }
+               return this.combinerUtilityNode;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
new file mode 100644
index 0000000..1fdae51
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.util.Visitor;
+
+final class InterestingPropertiesClearer implements Visitor<OptimizerNode> {
+       
+       static final InterestingPropertiesClearer INSTANCE = new 
InterestingPropertiesClearer();
+
+       @Override
+       public boolean preVisit(OptimizerNode visitable) {
+               if (visitable.getInterestingProperties() != null) {
+                       visitable.clearInterestingProperties();
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public void postVisit(OptimizerNode visitable) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
new file mode 100644
index 0000000..5d28043
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public interface IterationNode {
+       
+       void acceptForStepFunction(Visitor<OptimizerNode> visitor);
+
+}

Reply via email to