http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java new file mode 100644 index 0000000..55b8785 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; +import org.apache.flink.api.common.operators.base.BulkIterationBase; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor; +import org.apache.flink.optimizer.costs.CostEstimator; +import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.InterestingProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.operators.NoOpDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; +import org.apache.flink.optimizer.plan.BulkIterationPlanNode; +import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.NamedChannel; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.Visitor; + +/** + * A node in the optimizer's program representation for a bulk iteration. + */ +public class BulkIterationNode extends SingleInputNode implements IterationNode { + + private BulkPartialSolutionNode partialSolution; + + private OptimizerNode terminationCriterion; + + private OptimizerNode nextPartialSolution; + + private DagConnection rootConnection; // connection out of the next partial solution + + private DagConnection terminationCriterionRootConnection; // connection out of the term. criterion + + private OptimizerNode singleRoot; + + private final int costWeight; + + // -------------------------------------------------------------------------------------------- + + /** + * Creates a new node for the bulk iteration. + * + * @param iteration The bulk iteration the node represents. + */ + public BulkIterationNode(BulkIterationBase<?> iteration) { + super(iteration); + + if (iteration.getMaximumNumberOfIterations() <= 0) { + throw new CompilerException("BulkIteration must have a maximum number of iterations specified."); + } + + int numIters = iteration.getMaximumNumberOfIterations(); + + this.costWeight = (numIters > 0 && numIters < OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) ? + numIters : OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT; + } + + // -------------------------------------------------------------------------------------------- + + public BulkIterationBase<?> getIterationContract() { + return (BulkIterationBase<?>) getOperator(); + } + + /** + * Gets the partialSolution from this BulkIterationNode. + * + * @return The partialSolution. + */ + public BulkPartialSolutionNode getPartialSolution() { + return partialSolution; + } + + /** + * Sets the partialSolution for this BulkIterationNode. + * + * @param partialSolution The partialSolution to set. + */ + public void setPartialSolution(BulkPartialSolutionNode partialSolution) { + this.partialSolution = partialSolution; + } + + + /** + * Gets the nextPartialSolution from this BulkIterationNode. + * + * @return The nextPartialSolution. + */ + public OptimizerNode getNextPartialSolution() { + return nextPartialSolution; + } + + /** + * Sets the nextPartialSolution for this BulkIterationNode. + * + * @param nextPartialSolution The nextPartialSolution to set. + */ + public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) { + + // check if the root of the step function has the same DOP as the iteration + // or if the step function has any operator at all + if (nextPartialSolution.getParallelism() != getParallelism() || + nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode) + { + // add a no-op to the root to express the re-partitioning + NoOpNode noop = new NoOpNode(); + noop.setDegreeOfParallelism(getParallelism()); + + DagConnection noOpConn = new DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED); + noop.setIncomingConnection(noOpConn); + nextPartialSolution.addOutgoingConnection(noOpConn); + + nextPartialSolution = noop; + } + + this.nextPartialSolution = nextPartialSolution; + this.terminationCriterion = terminationCriterion; + + if (terminationCriterion == null) { + this.singleRoot = nextPartialSolution; + this.rootConnection = new DagConnection(nextPartialSolution, ExecutionMode.PIPELINED); + } + else { + // we have a termination criterion + SingleRootJoiner singleRootJoiner = new SingleRootJoiner(); + this.rootConnection = new DagConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED); + this.terminationCriterionRootConnection = new DagConnection(terminationCriterion, singleRootJoiner, + ExecutionMode.PIPELINED); + + singleRootJoiner.setInputs(this.rootConnection, this.terminationCriterionRootConnection); + + this.singleRoot = singleRootJoiner; + + // add connection to terminationCriterion for interesting properties visitor + terminationCriterion.addOutgoingConnection(terminationCriterionRootConnection); + + } + + nextPartialSolution.addOutgoingConnection(rootConnection); + } + + public int getCostWeight() { + return this.costWeight; + } + + public OptimizerNode getSingleRootOfStepFunction() { + return this.singleRoot; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String getName() { + return "Bulk Iteration"; + } + + @Override + public SemanticProperties getSemanticProperties() { + return new EmptySemanticProperties(); + } + + protected void readStubAnnotations() {} + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize(); + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + } + + // -------------------------------------------------------------------------------------------- + // Properties and Optimization + // -------------------------------------------------------------------------------------------- + + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return Collections.<OperatorDescriptorSingle>singletonList(new NoOpDescriptor()); + } + + @Override + public void computeInterestingPropertiesForInputs(CostEstimator estimator) { + final InterestingProperties intProps = getInterestingProperties().clone(); + + if (this.terminationCriterion != null) { + // first propagate through termination Criterion. since it has no successors, it has no + // interesting properties + this.terminationCriterionRootConnection.setInterestingProperties(new InterestingProperties()); + this.terminationCriterion.accept(new InterestingPropertyVisitor(estimator)); + } + + // we need to make 2 interesting property passes, because the root of the step function needs also + // the interesting properties as generated by the partial solution + + // give our own interesting properties (as generated by the iterations successors) to the step function and + // make the first pass + this.rootConnection.setInterestingProperties(intProps); + this.nextPartialSolution.accept(new InterestingPropertyVisitor(estimator)); + + // take the interesting properties of the partial solution and add them to the root interesting properties + InterestingProperties partialSolutionIntProps = this.partialSolution.getInterestingProperties(); + intProps.getGlobalProperties().addAll(partialSolutionIntProps.getGlobalProperties()); + intProps.getLocalProperties().addAll(partialSolutionIntProps.getLocalProperties()); + + // clear all interesting properties to prepare the second traversal + // this clears only the path down from the next partial solution. The paths down + // from the termination criterion (before they meet the paths down from the next partial solution) + // remain unaffected by this step + this.rootConnection.clearInterestingProperties(); + this.nextPartialSolution.accept(InterestingPropertiesClearer.INSTANCE); + + // 2nd pass + this.rootConnection.setInterestingProperties(intProps); + this.nextPartialSolution.accept(new InterestingPropertyVisitor(estimator)); + + // now add the interesting properties of the partial solution to the input + final InterestingProperties inProps = this.partialSolution.getInterestingProperties().clone(); + inProps.addGlobalProperties(new RequestedGlobalProperties()); + inProps.addLocalProperties(new RequestedLocalProperties()); + this.inConn.setInterestingProperties(inProps); + } + + @Override + public void clearInterestingProperties() { + super.clearInterestingProperties(); + + this.singleRoot.accept(InterestingPropertiesClearer.INSTANCE); + this.rootConnection.clearInterestingProperties(); + } + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + // the resulting branches are those of the step function + // because the BulkPartialSolution takes the input's branches + addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes); + List<UnclosedBranchDescriptor> result = getSingleRootOfStepFunction().openBranches; + + this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result; + } + + + @Override + protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels, + List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq) + { + // NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS: + // Whenever we instantiate the iteration, we enumerate new candidates for the step function. + // That way, we make sure we have an appropriate plan for each candidate for the initial partial solution, + // we have a fitting candidate for the step function (often, work is pushed out of the step function). + // Among the candidates of the step function, we keep only those that meet the requested properties of the + // current candidate initial partial solution. That makes sure these properties exist at the beginning of + // the successive iteration. + + // 1) Because we enumerate multiple times, we may need to clean the cached plans + // before starting another enumeration + this.nextPartialSolution.accept(PlanCacheCleaner.INSTANCE); + if (this.terminationCriterion != null) { + this.terminationCriterion.accept(PlanCacheCleaner.INSTANCE); + } + + // 2) Give the partial solution the properties of the current candidate for the initial partial solution + this.partialSolution.setCandidateProperties(in.getGlobalProperties(), in.getLocalProperties(), in); + final BulkPartialSolutionPlanNode pspn = this.partialSolution.getCurrentPartialSolutionPlanNode(); + + // 3) Get the alternative plans + List<PlanNode> candidates = this.nextPartialSolution.getAlternativePlans(estimator); + + // 4) Make sure that the beginning of the step function does not assume properties that + // are not also produced by the end of the step function. + + { + List<PlanNode> newCandidates = new ArrayList<PlanNode>(); + + for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) { + PlanNode candidate = planDeleter.next(); + + GlobalProperties atEndGlobal = candidate.getGlobalProperties(); + LocalProperties atEndLocal = candidate.getLocalProperties(); + + FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal); + if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) { + ; // depends only through broadcast variable on the partial solution + } + else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) { + // attach a no-op node through which we create the properties of the original input + Channel toNoOp = new Channel(candidate); + globPropsReq.parameterizeChannel(toNoOp, false, rootConnection.getDataExchangeMode(), false); + locPropsReq.parameterizeChannel(toNoOp); + + UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST); + rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism()); + + SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP); + rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties()); + estimator.costOperator(rebuildPropertiesPlanNode); + + GlobalProperties atEndGlobalModified = rebuildPropertiesPlanNode.getGlobalProperties(); + LocalProperties atEndLocalModified = rebuildPropertiesPlanNode.getLocalProperties(); + + if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) { + FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobalModified, atEndLocalModified); + + if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) { + newCandidates.add(rebuildPropertiesPlanNode); + } + } + + planDeleter.remove(); + } + } + } + + if (candidates.isEmpty()) { + return; + } + + // 5) Create a candidate for the Iteration Node for every remaining plan of the step function. + if (terminationCriterion == null) { + for (PlanNode candidate : candidates) { + BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate); + GlobalProperties gProps = candidate.getGlobalProperties().clone(); + LocalProperties lProps = candidate.getLocalProperties().clone(); + node.initProperties(gProps, lProps); + target.add(node); + } + } + else if (candidates.size() > 0) { + List<PlanNode> terminationCriterionCandidates = this.terminationCriterion.getAlternativePlans(estimator); + + SingleRootJoiner singleRoot = (SingleRootJoiner) this.singleRoot; + + for (PlanNode candidate : candidates) { + for (PlanNode terminationCandidate : terminationCriterionCandidates) { + if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) { + BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate, terminationCandidate); + GlobalProperties gProps = candidate.getGlobalProperties().clone(); + LocalProperties lProps = candidate.getLocalProperties().clone(); + node.initProperties(gProps, lProps); + target.add(node); + + } + } + } + + } + } + + // -------------------------------------------------------------------------------------------- + // Iteration Specific Traversals + // -------------------------------------------------------------------------------------------- + + public void acceptForStepFunction(Visitor<OptimizerNode> visitor) { + this.singleRoot.accept(visitor); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java new file mode 100644 index 0000000..25a7eef --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.PlanNode; + +/** + * The optimizer's internal representation of the partial solution that is input to a bulk iteration. + */ +public class BulkPartialSolutionNode extends AbstractPartialSolutionNode { + + private final BulkIterationNode iterationNode; + + + public BulkPartialSolutionNode(PartialSolutionPlaceHolder<?> psph, BulkIterationNode iterationNode) { + super(psph); + this.iterationNode = iterationNode; + } + + // -------------------------------------------------------------------------------------------- + + public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) { + if (this.cachedPlans != null) { + throw new IllegalStateException(); + } else { + this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, + "PartialSolution ("+this.getOperator().getName()+")", gProps, lProps, initialInput)); + } + } + + public BulkPartialSolutionPlanNode getCurrentPartialSolutionPlanNode() { + if (this.cachedPlans != null) { + return (BulkPartialSolutionPlanNode) this.cachedPlans.get(0); + } else { + throw new IllegalStateException(); + } + } + + public BulkIterationNode getIterationNode() { + return this.iterationNode; + } + + @Override + public void computeOutputEstimates(DataStatistics statistics) { + copyEstimates(this.iterationNode.getPredecessorNode()); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Gets the operator (here the {@link PartialSolutionPlaceHolder}) that is represented by this + * optimizer node. + * + * @return The operator represented by this optimizer node. + */ + @Override + public PartialSolutionPlaceHolder<?> getOperator() { + return (PartialSolutionPlaceHolder<?>) super.getOperator(); + } + + @Override + public String getName() { + return "Bulk Partial Solution"; + } + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + OptimizerNode inputToIteration = this.iterationNode.getPredecessorNode(); + + addClosedBranches(inputToIteration.closedBranchingNodes); + List<UnclosedBranchDescriptor> fromInput = inputToIteration.getBranchesForParent(this.iterationNode.getIncomingConnection()); + this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java new file mode 100644 index 0000000..92076c3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.CoGroupDescriptor; +import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetFirstDescriptor; +import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetSecondDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; + +/** + * The Optimizer representation of a <i>CoGroup</i> operator. + */ +public class CoGroupNode extends TwoInputNode { + + private List<OperatorDescriptorDual> dataProperties; + + public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> operator) { + super(operator); + this.dataProperties = initializeDataProperties(operator.getCustomPartitioner()); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Gets the operator for this CoGroup node. + * + * @return The CoGroup operator. + */ + @Override + public CoGroupOperatorBase<?, ?, ?, ?> getOperator() { + return (CoGroupOperatorBase<?, ?, ?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "CoGroup"; + } + + @Override + protected List<OperatorDescriptorDual> getPossibleProperties() { + return this.dataProperties; + } + + public void makeCoGroupWithSolutionSet(int solutionsetInputIndex) { + OperatorDescriptorDual op; + if (solutionsetInputIndex == 0) { + op = new CoGroupWithSolutionSetFirstDescriptor(keys1, keys2); + } else if (solutionsetInputIndex == 1) { + op = new CoGroupWithSolutionSetSecondDescriptor(keys1, keys2); + } else { + throw new IllegalArgumentException(); + } + this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(op); + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // for CoGroup, we currently make no reasonable default estimates + } + + private List<OperatorDescriptorDual> initializeDataProperties(Partitioner<?> customPartitioner) { + Ordering groupOrder1 = null; + Ordering groupOrder2 = null; + + CoGroupOperatorBase<?, ?, ?, ?> cgc = getOperator(); + groupOrder1 = cgc.getGroupOrderForInputOne(); + groupOrder2 = cgc.getGroupOrderForInputTwo(); + + if (groupOrder1 != null && groupOrder1.getNumberOfFields() == 0) { + groupOrder1 = null; + } + if (groupOrder2 != null && groupOrder2.getNumberOfFields() == 0) { + groupOrder2 = null; + } + + CoGroupDescriptor descr = new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2); + if (customPartitioner != null) { + descr.setCustomPartitioner(customPartitioner); + } + + return Collections.<OperatorDescriptorDual>singletonList(descr); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java new file mode 100644 index 0000000..93be1e4 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.SingleInputOperator; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.CollectorMapDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; + +/** + * The optimizer's internal representation of a <i>Map</i> operator node. + */ +public class CollectorMapNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + + public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) { + super(operator); + + this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new CollectorMapDescriptor()); + } + + @Override + public String getName() { + return "Map"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + /** + * Computes the estimates for the Map operator. Map takes one value and transforms it into another value. + * The cardinality consequently stays the same. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java new file mode 100644 index 0000000..8de67e8 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.CrossOperatorBase; +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.operators.CrossBlockOuterFirstDescriptor; +import org.apache.flink.optimizer.operators.CrossBlockOuterSecondDescriptor; +import org.apache.flink.optimizer.operators.CrossStreamOuterFirstDescriptor; +import org.apache.flink.optimizer.operators.CrossStreamOuterSecondDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; +import org.apache.flink.configuration.Configuration; + +/** + * The Optimizer representation of a <i>Cross</i> (Cartesian product) operator. + */ +public class CrossNode extends TwoInputNode { + + private final List<OperatorDescriptorDual> dataProperties; + + /** + * Creates a new CrossNode for the given operator. + * + * @param operation The Cross operator object. + */ + public CrossNode(CrossOperatorBase<?, ?, ?, ?> operation) { + super(operation); + + Configuration conf = operation.getParameters(); + String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null); + + CrossHint hint = operation.getCrossHint(); + + if (localStrategy != null) { + + final boolean allowBCfirst = hint != CrossHint.SECOND_IS_SMALL; + final boolean allowBCsecond = hint != CrossHint.FIRST_IS_SMALL; + + final OperatorDescriptorDual fixedDriverStrat; + if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) { + fixedDriverStrat = new CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond); + } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) { + fixedDriverStrat = new CrossBlockOuterSecondDescriptor(allowBCfirst, allowBCsecond); + } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) { + fixedDriverStrat = new CrossStreamOuterFirstDescriptor(allowBCfirst, allowBCsecond); + } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) { + fixedDriverStrat = new CrossStreamOuterSecondDescriptor(allowBCfirst, allowBCsecond); + } else { + throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy); + } + + this.dataProperties = Collections.singletonList(fixedDriverStrat); + } + else if (hint == CrossHint.SECOND_IS_SMALL) { + ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>(); + list.add(new CrossBlockOuterSecondDescriptor(false, true)); + list.add(new CrossStreamOuterFirstDescriptor(false, true)); + this.dataProperties = list; + } + else if (hint == CrossHint.FIRST_IS_SMALL) { + ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>(); + list.add(new CrossBlockOuterFirstDescriptor(true, false)); + list.add(new CrossStreamOuterSecondDescriptor(true, false)); + this.dataProperties = list; + } + else { + ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>(); + list.add(new CrossBlockOuterFirstDescriptor()); + list.add(new CrossBlockOuterSecondDescriptor()); + list.add(new CrossStreamOuterFirstDescriptor()); + list.add(new CrossStreamOuterSecondDescriptor()); + this.dataProperties = list; + } + } + + // ------------------------------------------------------------------------ + + @Override + public CrossOperatorBase<?, ?, ?, ?> getOperator() { + return (CrossOperatorBase<?, ?, ?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "Cross"; + } + + @Override + protected List<OperatorDescriptorDual> getPossibleProperties() { + return this.dataProperties; + } + + /** + * We assume that the cardinality is the product of the input cardinalities + * and that the result width is the sum of the input widths. + * + * @param statistics The statistics object to optionally access. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + long card1 = getFirstPredecessorNode().getEstimatedNumRecords(); + long card2 = getSecondPredecessorNode().getEstimatedNumRecords(); + this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 * card2; + + if (this.estimatedNumRecords >= 0) { + float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord(); + float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord(); + float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2; + + if (width > 0) { + this.estimatedOutputSize = (long) (width * this.estimatedNumRecords); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java new file mode 100644 index 0000000..4e65976 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.optimizer.dataproperties.InterestingProperties; +import org.apache.flink.optimizer.plandump.DumpableConnection; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; + +/** + * A connection between to operators. Represents an intermediate result + * and a data exchange between the two operators. + * + * The data exchange has a mode in which it performs (batch / pipelined). + * + * The data exchange strategy may be set on this connection, in which case + * it is fixed and will not be determined during candidate plan enumeration. + * + * During the enumeration of interesting properties, this connection also holds + * all interesting properties generated by the successor operator. + */ +public class DagConnection implements EstimateProvider, DumpableConnection<OptimizerNode> { + + private final OptimizerNode source; // The source node of the connection + + private final OptimizerNode target; // The target node of the connection. + + private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange + + private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in + + private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined. + + private TempMode materializationMode = TempMode.NONE; // the materialization mode + + private int maxDepth = -1; + + private boolean breakPipeline; // whether this connection should break the pipeline due to potential deadlocks + + /** + * Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>. + * The temp mode is by default <tt>NONE</tt>. + * + * @param source + * The source node. + * @param target + * The target node. + */ + public DagConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) { + this(source, target, null, exchangeMode); + } + + /** + * Creates a new Connection between two nodes. + * + * @param source + * The source node. + * @param target + * The target node. + * @param shipStrategy + * The shipping strategy. + * @param exchangeMode + * The data exchange mode (pipelined / batch / batch only for shuffles / ... ) + */ + public DagConnection(OptimizerNode source, OptimizerNode target, + ShipStrategyType shipStrategy, ExecutionMode exchangeMode) + { + if (source == null || target == null) { + throw new NullPointerException("Source and target must not be null."); + } + this.source = source; + this.target = target; + this.shipStrategy = shipStrategy; + this.dataExchangeMode = exchangeMode; + } + + /** + * Constructor to create a result from an operator that is not + * consumed by another operator. + * + * @param source + * The source node. + */ + public DagConnection(OptimizerNode source, ExecutionMode exchangeMode) { + if (source == null) { + throw new NullPointerException("Source and target must not be null."); + } + this.source = source; + this.target = null; + this.shipStrategy = ShipStrategyType.NONE; + this.dataExchangeMode = exchangeMode; + } + + /** + * Gets the source of the connection. + * + * @return The source Node. + */ + public OptimizerNode getSource() { + return this.source; + } + + /** + * Gets the target of the connection. + * + * @return The target node. + */ + public OptimizerNode getTarget() { + return this.target; + } + + /** + * Gets the shipping strategy for this connection. + * + * @return The connection's shipping strategy. + */ + public ShipStrategyType getShipStrategy() { + return this.shipStrategy; + } + + /** + * Sets the shipping strategy for this connection. + * + * @param strategy + * The shipping strategy to be applied to this connection. + */ + public void setShipStrategy(ShipStrategyType strategy) { + this.shipStrategy = strategy; + } + + /** + * Gets the data exchange mode to use for this connection. + * + * @return The data exchange mode to use for this connection. + */ + public ExecutionMode getDataExchangeMode() { + if (dataExchangeMode == null) { + throw new IllegalStateException("This connection does not have the data exchange mode set"); + } + return dataExchangeMode; + } + + /** + * Marks that this connection should do a decoupled data exchange (such as batched) + * rather then pipeline data. Connections are marked as pipeline breakers to avoid + * deadlock situations. + */ + public void markBreaksPipeline() { + this.breakPipeline = true; + } + + /** + * Checks whether this connection is marked to break the pipeline. + * + * @return True, if this connection is marked to break the pipeline, false otherwise. + */ + public boolean isBreakingPipeline() { + return this.breakPipeline; + } + + /** + * Gets the interesting properties object for this pact connection. + * If the interesting properties for this connections have not yet been set, + * this method returns null. + * + * @return The collection of all interesting properties, or null, if they have not yet been set. + */ + public InterestingProperties getInterestingProperties() { + return this.interestingProps; + } + + /** + * Sets the interesting properties for this pact connection. + * + * @param props The interesting properties. + */ + public void setInterestingProperties(InterestingProperties props) { + if (this.interestingProps == null) { + this.interestingProps = props; + } else { + throw new IllegalStateException("Interesting Properties have already been set."); + } + } + + public void clearInterestingProperties() { + this.interestingProps = null; + } + + public void initMaxDepth() { + + if (this.maxDepth == -1) { + this.maxDepth = this.source.getMaxDepth() + 1; + } else { + throw new IllegalStateException("Maximum path depth has already been initialized."); + } + } + + public int getMaxDepth() { + if (this.maxDepth != -1) { + return this.maxDepth; + } else { + throw new IllegalStateException("Maximum path depth has not been initialized."); + } + } + + // -------------------------------------------------------------------------------------------- + // Estimates + // -------------------------------------------------------------------------------------------- + + @Override + public long getEstimatedOutputSize() { + return this.source.getEstimatedOutputSize(); + } + + @Override + public long getEstimatedNumRecords() { + return this.source.getEstimatedNumRecords(); + } + + @Override + public float getEstimatedAvgWidthPerOutputRecord() { + return this.source.getEstimatedAvgWidthPerOutputRecord(); + } + + // -------------------------------------------------------------------------------------------- + + + public TempMode getMaterializationMode() { + return this.materializationMode; + } + + public void setMaterializationMode(TempMode materializationMode) { + this.materializationMode = materializationMode; + } + + public boolean isOnDynamicPath() { + return this.source.isOnDynamicPath(); + } + + public int getCostWeight() { + return this.source.getCostWeight(); + } + + // -------------------------------------------------------------------------------------------- + + public String toString() { + StringBuilder buf = new StringBuilder(50); + buf.append("Connection: "); + + if (this.source == null) { + buf.append("null"); + } else { + buf.append(this.source.getOperator().getName()); + buf.append('(').append(this.source.getName()).append(')'); + } + + buf.append(" -> "); + + if (this.shipStrategy != null) { + buf.append('['); + buf.append(this.shipStrategy.name()); + buf.append(']').append(' '); + } + + if (this.target == null) { + buf.append("null"); + } else { + buf.append(this.target.getOperator().getName()); + buf.append('(').append(this.target.getName()).append(')'); + } + + return buf.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java new file mode 100644 index 0000000..dbe04f4 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.operators.GenericDataSinkBase; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.costs.CostEstimator; +import org.apache.flink.optimizer.dataproperties.InterestingProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.util.Visitor; + +/** + * The Optimizer representation of a data sink. + */ +public class DataSinkNode extends OptimizerNode { + + protected DagConnection input; // The input edge + + /** + * Creates a new DataSinkNode for the given sink operator. + * + * @param sink The data sink contract object. + */ + public DataSinkNode(GenericDataSinkBase<?> sink) { + super(sink); + } + + // -------------------------------------------------------------------------------------- + + /** + * Gets the input of the sink. + * + * @return The input connection. + */ + public DagConnection getInputConnection() { + return this.input; + } + + /** + * Gets the predecessor of this node. + * + * @return The predecessor, or null, if no predecessor has been set. + */ + public OptimizerNode getPredecessorNode() { + if(this.input != null) { + return input.getSource(); + } else { + return null; + } + } + + /** + * Gets the operator for which this optimizer sink node was created. + * + * @return The node's underlying operator. + */ + @Override + public GenericDataSinkBase<?> getOperator() { + return (GenericDataSinkBase<?>) super.getOperator(); + } + + @Override + public String getName() { + return "Data Sink"; + } + + @Override + public List<DagConnection> getIncomingConnections() { + return Collections.singletonList(this.input); + } + + /** + * Gets all outgoing connections, which is an empty set for the data sink. + * + * @return An empty list. + */ + @Override + public List<DagConnection> getOutgoingConnections() { + return Collections.emptyList(); + } + + @Override + public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) { + Operator<?> children = getOperator().getInput(); + + final OptimizerNode pred; + final DagConnection conn; + + pred = contractToNode.get(children); + conn = new DagConnection(pred, this, defaultExchangeMode); + + // create the connection and add it + this.input = conn; + pred.addOutgoingConnection(conn); + } + + /** + * Computes the estimated outputs for the data sink. Since the sink does not modify anything, it simply + * copies the output estimates from its direct predecessor. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize(); + } + + @Override + public void computeInterestingPropertiesForInputs(CostEstimator estimator) { + final InterestingProperties iProps = new InterestingProperties(); + + { + final Ordering partitioning = getOperator().getPartitionOrdering(); + final DataDistribution dataDist = getOperator().getDataDistribution(); + final RequestedGlobalProperties partitioningProps = new RequestedGlobalProperties(); + if (partitioning != null) { + if(dataDist != null) { + partitioningProps.setRangePartitioned(partitioning, dataDist); + } else { + partitioningProps.setRangePartitioned(partitioning); + } + iProps.addGlobalProperties(partitioningProps); + } + iProps.addGlobalProperties(partitioningProps); + } + + { + final Ordering localOrder = getOperator().getLocalOrder(); + final RequestedLocalProperties orderProps = new RequestedLocalProperties(); + if (localOrder != null) { + orderProps.setOrdering(localOrder); + } + iProps.addLocalProperties(orderProps); + } + + this.input.setInterestingProperties(iProps); + } + + // -------------------------------------------------------------------------------------------- + // Branch Handling + // -------------------------------------------------------------------------------------------- + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + // we need to track open branches even in the sinks, because they get "closed" when + // we build a single "root" for the data flow plan + addClosedBranches(getPredecessorNode().closedBranchingNodes); + this.openBranches = getPredecessorNode().getBranchesForParent(this.input); + } + + @Override + protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection parent) { + // return our own stack of open branches, because nothing is added + return this.openBranches; + } + + // -------------------------------------------------------------------------------------------- + // Recursive Optimization + // -------------------------------------------------------------------------------------------- + + @Override + public List<PlanNode> getAlternativePlans(CostEstimator estimator) { + // check if we have a cached version + if (this.cachedPlans != null) { + return this.cachedPlans; + } + + // calculate alternative sub-plans for predecessor + List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator); + List<PlanNode> outputPlans = new ArrayList<PlanNode>(); + + final int dop = getParallelism(); + final int inDop = getPredecessorNode().getParallelism(); + + final ExecutionMode executionMode = this.input.getDataExchangeMode(); + final boolean dopChange = dop != inDop; + final boolean breakPipeline = this.input.isBreakingPipeline(); + + InterestingProperties ips = this.input.getInterestingProperties(); + for (PlanNode p : subPlans) { + for (RequestedGlobalProperties gp : ips.getGlobalProperties()) { + for (RequestedLocalProperties lp : ips.getLocalProperties()) { + Channel c = new Channel(p); + gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline); + lp.parameterizeChannel(c); + c.setRequiredLocalProps(lp); + c.setRequiredGlobalProps(gp); + + // no need to check whether the created properties meet what we need in case + // of ordering or global ordering, because the only interesting properties we have + // are what we require + outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c)); + } + } + } + + // cost and prune the plans + for (PlanNode node : outputPlans) { + estimator.costOperator(node); + } + prunePlanAlternatives(outputPlans); + + this.cachedPlans = outputPlans; + return outputPlans; + } + + // -------------------------------------------------------------------------------------------- + // Function Annotation Handling + // -------------------------------------------------------------------------------------------- + + @Override + public SemanticProperties getSemanticProperties() { + return new EmptySemanticProperties(); + } + + // -------------------------------------------------------------------------------------------- + // Miscellaneous + // -------------------------------------------------------------------------------------------- + + @Override + public void accept(Visitor<OptimizerNode> visitor) { + if (visitor.preVisit(this)) { + if (getPredecessorNode() != null) { + getPredecessorNode().accept(visitor); + } else { + throw new CompilerException(); + } + visitor.postVisit(this); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java new file mode 100644 index 0000000..e4b35b7 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.ReplicatingInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.GenericDataSourceBase.SplitDataProperties; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.CostEstimator; +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SourcePlanNode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Visitor; + +/** + * The optimizer's internal representation of a data source. + */ +public class DataSourceNode extends OptimizerNode { + + private final boolean sequentialInput; + + private final boolean replicatedInput; + + private GlobalProperties gprops; + + private LocalProperties lprops; + + /** + * Creates a new DataSourceNode for the given contract. + * + * @param pactContract + * The data source contract object. + */ + public DataSourceNode(GenericDataSourceBase<?, ?> pactContract) { + super(pactContract); + + if (pactContract.getUserCodeWrapper().getUserCodeClass() == null) { + throw new IllegalArgumentException("Input format has not been set."); + } + + if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) { + setDegreeOfParallelism(1); + this.sequentialInput = true; + } else { + this.sequentialInput = false; + } + + this.replicatedInput = ReplicatingInputFormat.class.isAssignableFrom( + pactContract.getUserCodeWrapper().getUserCodeClass()); + + this.gprops = new GlobalProperties(); + this.lprops = new LocalProperties(); + + SplitDataProperties<?> splitProps = pactContract.getSplitDataProperties(); + + if(replicatedInput) { + this.gprops.setFullyReplicated(); + this.lprops = new LocalProperties(); + } else if (splitProps != null) { + // configure data properties of data source using split properties + setDataPropertiesFromSplitProperties(splitProps); + } + + } + + /** + * Gets the contract object for this data source node. + * + * @return The contract. + */ + @Override + public GenericDataSourceBase<?, ?> getOperator() { + return (GenericDataSourceBase<?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "Data Source"; + } + + @Override + public void setDegreeOfParallelism(int degreeOfParallelism) { + // if unsplittable, parallelism remains at 1 + if (!this.sequentialInput) { + super.setDegreeOfParallelism(degreeOfParallelism); + } + } + + @Override + public List<DagConnection> getIncomingConnections() { + return Collections.<DagConnection>emptyList(); + } + + @Override + public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultDataExchangeMode) {} + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // see, if we have a statistics object that can tell us a bit about the file + if (statistics != null) { + // instantiate the input format, as this is needed by the statistics + InputFormat<?, ?> format; + String inFormatDescription = "<unknown>"; + + try { + format = getOperator().getFormatWrapper().getUserCodeObject(); + Configuration config = getOperator().getParameters(); + format.configure(config); + } + catch (Throwable t) { + if (Optimizer.LOG.isWarnEnabled()) { + Optimizer.LOG.warn("Could not instantiate InputFormat to obtain statistics." + + " Limited statistics will be available.", t); + } + return; + } + try { + inFormatDescription = format.toString(); + } + catch (Throwable t) { + // we can ignore this error, as it only prevents us to use a cosmetic string + } + + // first of all, get the statistics from the cache + final String statisticsKey = getOperator().getStatisticsKey(); + final BaseStatistics cachedStatistics = statistics.getBaseStatistics(statisticsKey); + + BaseStatistics bs = null; + try { + bs = format.getStatistics(cachedStatistics); + } + catch (Throwable t) { + if (Optimizer.LOG.isWarnEnabled()) { + Optimizer.LOG.warn("Error obtaining statistics from input format: " + t.getMessage(), t); + } + } + + if (bs != null) { + final long len = bs.getTotalInputSize(); + if (len == BaseStatistics.SIZE_UNKNOWN) { + if (Optimizer.LOG.isInfoEnabled()) { + Optimizer.LOG.info("Compiler could not determine the size of input '" + inFormatDescription + "'. Using default estimates."); + } + } + else if (len >= 0) { + this.estimatedOutputSize = len; + } + + final long card = bs.getNumberOfRecords(); + if (card != BaseStatistics.NUM_RECORDS_UNKNOWN) { + this.estimatedNumRecords = card; + } + } + } + } + + @Override + public void computeInterestingPropertiesForInputs(CostEstimator estimator) { + // no children, so nothing to compute + } + + @Override + public void computeUnclosedBranchStack() { + // because there are no inputs, there are no unclosed branches. + this.openBranches = Collections.emptyList(); + } + + @Override + public List<PlanNode> getAlternativePlans(CostEstimator estimator) { + if (this.cachedPlans != null) { + return this.cachedPlans; + } + + SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")", + this.gprops, this.lprops); + + if(!replicatedInput) { + candidate.updatePropertiesWithUniqueSets(getUniqueFields()); + + final Costs costs = new Costs(); + if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) && + this.estimatedOutputSize >= 0) { + estimator.addFileInputCost(this.estimatedOutputSize, costs); + } + candidate.setCosts(costs); + } else { + // replicated input + final Costs costs = new Costs(); + InputFormat<?,?> inputFormat = + ((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat(); + if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) && + this.estimatedOutputSize >= 0) { + estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs); + } + candidate.setCosts(costs); + } + + // since there is only a single plan for the data-source, return a list with that element only + List<PlanNode> plans = new ArrayList<PlanNode>(1); + plans.add(candidate); + + this.cachedPlans = plans; + return plans; + } + + @Override + public SemanticProperties getSemanticProperties() { + return new EmptySemanticProperties(); + } + + @Override + public void accept(Visitor<OptimizerNode> visitor) { + if (visitor.preVisit(this)) { + visitor.postVisit(this); + } + } + + private void setDataPropertiesFromSplitProperties(SplitDataProperties splitProps) { + + // set global properties + int[] partitionKeys = splitProps.getSplitPartitionKeys(); + Partitioner<?> partitioner = splitProps.getSplitPartitioner(); + + if(partitionKeys != null && partitioner != null) { + this.gprops.setCustomPartitioned(new FieldList(partitionKeys), partitioner); + } + else if(partitionKeys != null) { + this.gprops.setAnyPartitioning(new FieldList(partitionKeys)); + } + // set local properties + int[] groupingKeys = splitProps.getSplitGroupKeys(); + Ordering ordering = splitProps.getSplitOrder(); + + // more than one split per source tasks possible. + // adapt split grouping and sorting + if(ordering != null) { + + // sorting falls back to grouping because a source can read multiple, + // randomly assigned splits + groupingKeys = ordering.getFieldPositions(); + } + + if(groupingKeys != null && partitionKeys != null) { + // check if grouping is also valid across splits, i.e., whether grouping keys are + // valid superset of partition keys + boolean allFieldsIncluded = true; + for(int i : partitionKeys) { + boolean fieldIncluded = false; + for(int j : groupingKeys) { + if(i == j) { + fieldIncluded = true; + break; + } + } + if(!fieldIncluded) { + allFieldsIncluded = false; + break; + } + } + if (allFieldsIncluded) { + this.lprops = LocalProperties.forGrouping(new FieldList(groupingKeys)); + } else { + this.lprops = new LocalProperties(); + } + + } else { + this.lprops = new LocalProperties(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java new file mode 100644 index 0000000..482951b --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +/** + * Methods for operators / connections that provide estimated about data size and + * characteristics. + */ +public interface EstimateProvider { + + /** + * Gets the estimated output size from this node. + * + * @return The estimated output size. + */ + long getEstimatedOutputSize(); + + /** + * Gets the estimated number of records in the output of this node. + * + * @return The estimated number of records. + */ + long getEstimatedNumRecords(); + + /** + * Gets the estimated number of bytes per record. + * + * @return The estimated number of bytes per record. + */ + float getEstimatedAvgWidthPerOutputRecord(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java new file mode 100644 index 0000000..118ddc8 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.operators.base.FilterOperatorBase; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.FilterDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; + +/** + * The optimizer's internal representation of a <i>FlatMap</i> operator node. + */ +public class FilterNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + public FilterNode(FilterOperatorBase<?, ?> operator) { + super(operator); + this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FilterDescriptor()); + } + + @Override + public FilterOperatorBase<?, ?> getOperator() { + return (FilterOperatorBase<?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "Filter"; + } + + @Override + public SemanticProperties getSemanticProperties() { + return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + /** + * Computes the estimates for the Filter operator. Since it applies a filter on the data we assume a cardinality + * decrease. To give the system a hint at data decrease, we use a default magic number to indicate a 0.5 decrease. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedNumRecords = (long) (getPredecessorNode().getEstimatedNumRecords() * 0.5); + this.estimatedOutputSize = (long) (getPredecessorNode().getEstimatedOutputSize() * 0.5); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java new file mode 100644 index 0000000..f713d56 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.FlatMapOperatorBase; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.FlatMapDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; + +/** + * The optimizer's internal representation of a <i>FlatMap</i> operator node. + */ +public class FlatMapNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + public FlatMapNode(FlatMapOperatorBase<?, ?, ?> operator) { + super(operator); + + this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FlatMapDescriptor()); + } + + @Override + public FlatMapOperatorBase<?, ?, ?> getOperator() { + return (FlatMapOperatorBase<?, ?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "FlatMap"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + /** + * Computes the estimates for the FlatMap operator. Since it un-nests, we assume a cardinality + * increase. To give the system a hint at data increase, we take a default magic number of a 5 times increase. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords() * 5; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java new file mode 100644 index 0000000..564c0d3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.AllGroupCombineProperties; +import org.apache.flink.optimizer.operators.GroupCombineProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; + +import java.util.Collections; +import java.util.List; + +/** + * The optimizer representation of a <i>GroupCombineNode</i> operation. + */ +public class GroupCombineNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + /** + * Creates a new optimizer node for the given operator. + * + * @param operator The reduce operation. + */ + public GroupCombineNode(GroupCombineOperatorBase<?, ?, ?> operator) { + super(operator); + + if (this.keys == null) { + // case of a key-less reducer. force a parallelism of 1 + setDegreeOfParallelism(1); + } + + this.possibleProperties = initPossibleProperties(); + } + + private List<OperatorDescriptorSingle> initPossibleProperties() { + + // check if we can work with a grouping (simple reducer), or if we need ordering because of a group order + Ordering groupOrder = getOperator().getGroupOrder(); + if (groupOrder != null && groupOrder.getNumberOfFields() == 0) { + groupOrder = null; + } + + OperatorDescriptorSingle props = (this.keys == null ? + new AllGroupCombineProperties() : + new GroupCombineProperties(this.keys, groupOrder)); + + return Collections.singletonList(props); + } + + // ------------------------------------------------------------------------ + + /** + * Gets the operator represented by this optimizer node. + * + * @return The operator represented by this optimizer node. + */ + @Override + public GroupCombineOperatorBase<?, ?, ?> getOperator() { + return (GroupCombineOperatorBase<?, ?, ?>) super.getOperator(); + } + + @Override + public String getName() { + return "GroupCombine"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + // -------------------------------------------------------------------------------------------- + // Estimates + // -------------------------------------------------------------------------------------------- + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // no real estimates possible for a reducer. + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java new file mode 100644 index 0000000..77acae5 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.operators.AllGroupReduceProperties; +import org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupProperties; +import org.apache.flink.optimizer.operators.GroupReduceProperties; +import org.apache.flink.optimizer.operators.GroupReduceWithCombineProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; +import org.apache.flink.configuration.Configuration; + +/** + * The optimizer representation of a <i>GroupReduce</i> operation. + */ +public class GroupReduceNode extends SingleInputNode { + + private final List<OperatorDescriptorSingle> possibleProperties; + + private GroupReduceNode combinerUtilityNode; + + /** + * Creates a new optimizer node for the given operator. + * + * @param operator The reduce operation. + */ + public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> operator) { + super(operator); + + if (this.keys == null) { + // case of a key-less reducer. force a parallelism of 1 + setDegreeOfParallelism(1); + } + + this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner()); + } + + public GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) { + super(reducerToCopyForCombiner); + + this.possibleProperties = Collections.emptyList(); + } + + private List<OperatorDescriptorSingle> initPossibleProperties(Partitioner<?> customPartitioner) { + // see if an internal hint dictates the strategy to use + final Configuration conf = getOperator().getParameters(); + final String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null); + + final boolean useCombiner; + if (localStrategy != null) { + if (Optimizer.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) { + useCombiner = false; + } + else if (Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) { + if (!isCombineable()) { + Optimizer.LOG.warn("Strategy hint for GroupReduce '" + getOperator().getName() + + "' requires combinable reduce, but user function is not marked combinable."); + } + useCombiner = true; + } else { + throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy); + } + } else { + useCombiner = isCombineable(); + } + + // check if we can work with a grouping (simple reducer), or if we need ordering because of a group order + Ordering groupOrder = null; + if (getOperator() instanceof GroupReduceOperatorBase) { + groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getOperator()).getGroupOrder(); + if (groupOrder != null && groupOrder.getNumberOfFields() == 0) { + groupOrder = null; + } + } + + OperatorDescriptorSingle props = useCombiner ? + (this.keys == null ? new AllGroupWithPartialPreGroupProperties() : new GroupReduceWithCombineProperties(this.keys, groupOrder, customPartitioner)) : + (this.keys == null ? new AllGroupReduceProperties() : new GroupReduceProperties(this.keys, groupOrder, customPartitioner)); + + return Collections.singletonList(props); + } + + // ------------------------------------------------------------------------ + + /** + * Gets the operator represented by this optimizer node. + * + * @return The operator represented by this optimizer node. + */ + @Override + public GroupReduceOperatorBase<?, ?, ?> getOperator() { + return (GroupReduceOperatorBase<?, ?, ?>) super.getOperator(); + } + + /** + * Checks, whether a combiner function has been given for the function encapsulated + * by this reduce contract. + * + * @return True, if a combiner has been given, false otherwise. + */ + public boolean isCombineable() { + return getOperator().isCombinable(); + } + + @Override + public String getName() { + return "GroupReduce"; + } + + @Override + protected List<OperatorDescriptorSingle> getPossibleProperties() { + return this.possibleProperties; + } + + // -------------------------------------------------------------------------------------------- + // Estimates + // -------------------------------------------------------------------------------------------- + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // no real estimates possible for a reducer. + } + + public GroupReduceNode getCombinerUtilityNode() { + if (this.combinerUtilityNode == null) { + this.combinerUtilityNode = new GroupReduceNode(this); + + // we conservatively assume the combiner returns the same data size as it consumes + this.combinerUtilityNode.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize(); + this.combinerUtilityNode.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + } + return this.combinerUtilityNode; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java new file mode 100644 index 0000000..1fdae51 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.util.Visitor; + +final class InterestingPropertiesClearer implements Visitor<OptimizerNode> { + + static final InterestingPropertiesClearer INSTANCE = new InterestingPropertiesClearer(); + + @Override + public boolean preVisit(OptimizerNode visitable) { + if (visitable.getInterestingProperties() != null) { + visitable.clearInterestingProperties(); + return true; + } else { + return false; + } + } + + @Override + public void postVisit(OptimizerNode visitable) {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java new file mode 100644 index 0000000..5d28043 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.util.Visitor; + +/** + * + */ +public interface IterationNode { + + void acceptForStepFunction(Visitor<OptimizerNode> visitor); + +}