Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.FuncSpec; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.builtin.GFCross; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; + +public class ParallelismSetter extends SparkOpPlanVisitor { + private JobConf jobConf; + + public ParallelismSetter(SparkOperPlan plan, JobConf jobConf) { + super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan)); + this.jobConf = jobConf; + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + if (sparkOp instanceof NativeSparkOperator) { + return; + } + + if (sparkOp.getCrossKeys() != null) { + for (String key : sparkOp.getCrossKeys()) { + jobConf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + key, + // TODO: Estimate parallelism. For now we are hard-coding GFCross.DEFAULT_PARALLELISM + Integer.toString(96)); + } + } + } +} \ No newline at end of file
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,217 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; + +/** + * Secondary key sort optimization for spark mode + */ +public class SecondaryKeyOptimizerSpark extends SparkOpPlanVisitor implements SecondaryKeyOptimizer { + private static final Log LOG = LogFactory + .getLog(SecondaryKeyOptimizerSpark.class); + + private int numSortRemoved = 0; + private int numDistinctChanged = 0; + private int numUseSecondaryKey = 0; + + public SecondaryKeyOptimizerSpark(SparkOperPlan plan) { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + } + + /** + * Secondary key sort optimization is enabled in group + foreach nested situation, like TestAccumlator#testAccumWithSort + * After calling SecondaryKeyOptimizerUtil.applySecondaryKeySort, the POSort in the POForeach will be deleted in the spark plan. + * Sort function can be implemented in secondary key sort even though POSort is deleted in the spark plan. + * + * @param sparkOperator + * @throws VisitorException + */ + @Override + public void visitSparkOp(SparkOperator sparkOperator) throws VisitorException { + List<POLocalRearrange> rearranges = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLocalRearrange.class); + if (rearranges.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("No POLocalRearranges found in the spark operator" + sparkOperator.getOperatorKey() + ". Skipping secondary key optimization."); + } + return; + } + + /** + * When ever POLocalRearrange is encountered in the sparkOperator.physicalPlan, + * the sub-physicalplan between the previousLR(or root) to currentLR is considered as mapPlan(like what + * we call in mapreduce) and the sub-physicalplan between the POGlobalRearrange(the successor of currentLR) and + * nextLR(or leaf) is considered as reducePlan(like what we call in mapreduce). After mapPlan and reducePlan are got, + * use SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan,reducePlan) to enable secondary key optimization. + * SecondaryKeyOptimizerUtil.applySecondaryKeySort will remove POSort in the foreach in the reducePlan or + * change PODistinct to POSortedDistinct in the foreach in the reducePlan. + */ + for (POLocalRearrange currentLR : rearranges) { + PhysicalPlan mapPlan = null; + PhysicalPlan reducePlan = null; + try { + mapPlan = getMapPlan(sparkOperator.physicalPlan, currentLR); + } catch (PlanException e) { + throw new VisitorException(e); + } + try { + reducePlan = getReducePlan(sparkOperator.physicalPlan, currentLR); + } catch (PlanException e) { + throw new VisitorException(e); + } + + // Current code does not enable secondarykey optimization when join case is encounted + List<PhysicalOperator> rootsOfReducePlan = reducePlan.getRoots(); + if (rootsOfReducePlan.get(0) instanceof POGlobalRearrangeSpark) { + PhysicalOperator glr = rootsOfReducePlan.get(0); + List<PhysicalOperator> predecessors = sparkOperator.physicalPlan.getPredecessors(glr); + if (predecessors != null && predecessors.size() >= 2) { + if (LOG.isDebugEnabled()) { + LOG.debug("Current code does not enable secondarykey optimization when join case is encounted"); + } + return; + } + } + + if (mapPlan.getOperator(currentLR.getOperatorKey()) == null) { + // The POLocalRearrange is sub-plan of a POSplit + mapPlan = PlanHelper.getLocalRearrangePlanFromSplit(mapPlan, currentLR.getOperatorKey()); + } + SparkSecondaryKeyOptimizerUtil sparkSecondaryKeyOptUtil = new SparkSecondaryKeyOptimizerUtil(); + SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = sparkSecondaryKeyOptUtil.applySecondaryKeySort(mapPlan, reducePlan); + if (info != null) { + numSortRemoved += info.getNumSortRemoved(); + numDistinctChanged += info.getNumDistinctChanged(); + numUseSecondaryKey += info.getNumUseSecondaryKey(); + } + } + } + + /** + * Find the MRPlan of the physicalPlan which containing currentLR + * Backward search all the physicalOperators which precede currentLR until the previous POLocalRearrange + * is found or the root of physicalPlan is found. + * + * @param physicalPlan + * @param currentLR + * @return + * @throws VisitorException + * @throws PlanException + */ + private PhysicalPlan getMapPlan(PhysicalPlan physicalPlan, POLocalRearrange currentLR) throws VisitorException, PlanException { + PhysicalPlan mapPlan = new PhysicalPlan(); + mapPlan.addAsRoot(currentLR); + List<PhysicalOperator> preList = physicalPlan.getPredecessors(currentLR); + while (true) { + if (preList == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("there is nothing to backward search"); + } + break; + } + if (preList.size() != 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("the size of predecessor of currentLR should be 1 but now it is not 1,physicalPlan:" + physicalPlan); + } + break; + } + PhysicalOperator pre = preList.get(0); + if (pre instanceof POLocalRearrange) { + if (LOG.isDebugEnabled()) { + LOG.debug("Finishing to find the mapPlan between preLR and currentLR."); + } + break; + } + mapPlan.addAsRoot(pre); + preList = physicalPlan.getPredecessors(pre); + + } + return mapPlan; + } + + /** + * Find the ReducePlan of the physicalPlan which containing currentLR + * Forward search all the physicalOperators which succeed currentLR until the next POLocalRearrange + * is found or the leave of physicalPlan is found. + * + * @param physicalPlan + * @param currentLR + * @return + * @throws PlanException + */ + private PhysicalPlan getReducePlan(PhysicalPlan physicalPlan, POLocalRearrange currentLR) throws PlanException { + PhysicalPlan reducePlan = new PhysicalPlan(); + List<PhysicalOperator> succList = physicalPlan.getSuccessors(currentLR); + while (true) { + if (succList == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("there is nothing to forward search"); + } + break; + } + if (succList.size() != 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("the size of successors of currentLR should be 1 but now it is not 1,physicalPlan:" + physicalPlan); + } + break; + } + PhysicalOperator succ = succList.get(0); + if (succ instanceof POLocalRearrange) { + if (LOG.isDebugEnabled()) { + LOG.debug("Finishing to find the ReducePlan between currentLR and netxtLR."); + } + break; + } + reducePlan.addAsLeaf(succ); + succList = physicalPlan.getSuccessors(succ); + } + return reducePlan; + } + + @Override + public int getNumSortRemoved() { + return numSortRemoved; + } + + @Override + public int getNumDistinctChanged() { + return numDistinctChanged; + } + + @Override + public int getNumUseSecondaryKey() { + return numUseSecondaryKey; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java Mon May 29 15:00:39 2017 @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.pig.backend.hadoop.executionengine.spark.optimizer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil; + +import java.util.List; + +public class SparkSecondaryKeyOptimizerUtil extends SecondaryKeyOptimizerUtil{ + private static Log log = LogFactory.getLog(SparkSecondaryKeyOptimizerUtil.class); + + @Override + protected PhysicalOperator getCurrentNode(PhysicalOperator root, PhysicalPlan reducePlan) { + PhysicalOperator currentNode = null; + + if (!(root instanceof POGlobalRearrange)) { + log.debug("Expected reduce root to be a POGlobalRearrange, skip secondary key optimizing"); + currentNode = null; + } else { + List<PhysicalOperator> globalRearrangeSuccs = reducePlan + .getSuccessors(root); + if (globalRearrangeSuccs.size() == 1) { + currentNode = globalRearrangeSuccs.get(0); + } else { + log.debug("Expected successor of a POGlobalRearrange is POPackage, skip secondary key optimizing"); + currentNode = null; + } + } + + return currentNode; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.plan; + +import java.io.PrintStream; +import java.util.LinkedList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.apache.pig.impl.plan.DotPlanDumper; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.DotPOPrinter; +import org.apache.pig.impl.plan.OperatorPlan; +import org.apache.pig.impl.plan.Operator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanVisitor; + +/** + * This class can print Spark plan in the DOT format. It uses + * clusters to illustrate nesting. If "verbose" is off, it will skip + * any nesting in the associated physical plans. + */ +public class DotSparkPrinter extends DotPlanDumper<SparkOperator, SparkOperPlan, + DotSparkPrinter.InnerOperator, + DotSparkPrinter.InnerPlan> { + + private static int counter = 0; + private boolean isVerboseNesting = true; + + public DotSparkPrinter(SparkOperPlan plan, PrintStream ps) { + this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>(), + new HashSet<Operator>()); + } + + private DotSparkPrinter(SparkOperPlan plan, PrintStream ps, boolean isSubGraph, + Set<Operator> subgraphs, + Set<Operator> multiInputSubgraphs, + Set<Operator> multiOutputSubgraphs) { + super(plan, ps, isSubGraph, subgraphs, + multiInputSubgraphs, multiOutputSubgraphs); + } + + @Override + public void setVerbose(boolean verbose) { + // leave the parents verbose set to true + isVerboseNesting = verbose; + } + + @Override + protected DotPlanDumper makeDumper(InnerPlan plan, PrintStream ps) { + return new InnerPrinter(plan, ps, mSubgraphs, mMultiInputSubgraphs, + mMultiOutputSubgraphs); + } + + @Override + protected String getName(SparkOperator op) { + String name = op.name(); + // Cut of the part of the name specifying scope. + String delimiter = " - "; + String[] temp; + temp = name.split(delimiter); + return temp[0]; + } + + @Override + protected Collection<InnerPlan> getNestedPlans(SparkOperator op) { + Collection<InnerPlan> plans = new LinkedList<InnerPlan>(); + plans.add(new InnerPlan(op.physicalPlan)); + return plans; + } + + @Override + protected String[] getAttributes(SparkOperator op) { + String[] attributes = new String[3]; + attributes[0] = "label=\""+getName(op)+"\""; + attributes[1] = "style=\"filled\""; + attributes[2] = "fillcolor=\"#EEEEEE\""; + return attributes; + } + + + /** + * Helper class to represent the relationship of inner operators + */ + public static class InnerOperator extends Operator<PlanVisitor> { + + private static final long serialVersionUID = 1L; + String name; + PhysicalPlan plan; + int code; + + public InnerOperator(PhysicalPlan plan, String name) { + super(new OperatorKey()); + this.name = name; + this.plan = plan; + this.code = counter++; + } + + @Override public void visit(PlanVisitor v) {} + @Override public boolean supportsMultipleInputs() {return false;} + @Override public boolean supportsMultipleOutputs() {return false;} + @Override public String name() {return name;} + public PhysicalPlan getPlan() {return plan;} + @Override public int hashCode() {return code;} + } + + /** + * Each spark operator will have and an inner plan of inner + * operators. The inner operators contain the physical plan of the + * execution phase. + */ + public static class InnerPlan extends OperatorPlan<InnerOperator> { + + private static final long serialVersionUID = 1L; + + public InnerPlan(PhysicalPlan plan) { + InnerOperator sparkInnerOp = new InnerOperator(plan, "spark"); + this.add(sparkInnerOp); + } + } + + private class InnerPrinter extends DotPlanDumper<InnerOperator, InnerPlan, + PhysicalOperator, PhysicalPlan> { + + public InnerPrinter(InnerPlan plan, PrintStream ps, + Set<Operator> subgraphs, + Set<Operator> multiInputSubgraphs, + Set<Operator> multiOutputSubgraphs) { + super(plan, ps, true, subgraphs, multiInputSubgraphs, + multiOutputSubgraphs); + } + + @Override + protected String[] getAttributes(InnerOperator op) { + String[] attributes = new String[3]; + attributes[0] = "label=\""+super.getName(op)+"\""; + attributes[1] = "style=\"filled\""; + attributes[2] = "fillcolor=\"white\""; + return attributes; + } + + @Override + protected Collection<PhysicalPlan> getNestedPlans(InnerOperator op) { + Collection<PhysicalPlan> l = new LinkedList<PhysicalPlan>(); + l.add(op.getPlan()); + return l; + } + + @Override + protected DotPOPrinter makeDumper(PhysicalPlan plan, PrintStream ps) { + DotPOPrinter printer = new DotPOPrinter(plan, ps, true, + mSubgraphs, + mMultiInputSubgraphs, + mMultiOutputSubgraphs); + printer.setVerbose(isVerboseNesting); + return printer; + } + } +}