Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.FuncSpec;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.builtin.GFCross;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ParallelismSetter extends SparkOpPlanVisitor {
+    private JobConf jobConf;
+
+    public ParallelismSetter(SparkOperPlan plan, JobConf jobConf) {
+        super(plan, new DependencyOrderWalker<SparkOperator, 
SparkOperPlan>(plan));
+        this.jobConf = jobConf;
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        if (sparkOp instanceof NativeSparkOperator) {
+            return;
+        }
+
+        if (sparkOp.getCrossKeys() != null) {
+            for (String key : sparkOp.getCrossKeys()) {
+                jobConf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + key,
+                        // TODO: Estimate parallelism. For now we are 
hard-coding GFCross.DEFAULT_PARALLELISM
+                        Integer.toString(96));
+            }
+        }
+    }
+}
\ No newline at end of file

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,217 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+
+import 
org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Secondary key sort optimization for spark mode
+ */
+public class SecondaryKeyOptimizerSpark extends SparkOpPlanVisitor implements 
SecondaryKeyOptimizer {
+    private static final Log LOG = LogFactory
+            .getLog(SecondaryKeyOptimizerSpark.class);
+
+    private int numSortRemoved = 0;
+    private int numDistinctChanged = 0;
+    private int numUseSecondaryKey = 0;
+
+    public SecondaryKeyOptimizerSpark(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+    }
+
+    /**
+     * Secondary key sort optimization is enabled in group + foreach nested 
situation, like TestAccumlator#testAccumWithSort
+     * After calling SecondaryKeyOptimizerUtil.applySecondaryKeySort, the 
POSort in the POForeach will be deleted in the spark plan.
+     * Sort function can be implemented in secondary key sort even though 
POSort is deleted in the spark plan.
+     *
+     * @param sparkOperator
+     * @throws VisitorException
+     */
+    @Override
+    public void visitSparkOp(SparkOperator sparkOperator) throws 
VisitorException {
+        List<POLocalRearrange> rearranges = 
PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, 
POLocalRearrange.class);
+        if (rearranges.isEmpty()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No POLocalRearranges found in the spark operator" + 
sparkOperator.getOperatorKey() + ". Skipping secondary key optimization.");
+            }
+            return;
+        }
+
+        /**
+         * When ever POLocalRearrange is encountered in the 
sparkOperator.physicalPlan,
+         * the sub-physicalplan between the previousLR(or root) to currentLR 
is considered as mapPlan(like what
+         * we call in mapreduce) and the sub-physicalplan between the 
POGlobalRearrange(the successor of currentLR) and
+         * nextLR(or leaf) is considered as reducePlan(like what we call in 
mapreduce).  After mapPlan and reducePlan are got,
+         * use 
SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan,reducePlan) to enable 
secondary key optimization.
+         * SecondaryKeyOptimizerUtil.applySecondaryKeySort will remove POSort 
in the foreach in the reducePlan or
+         * change PODistinct to POSortedDistinct in the foreach in the 
reducePlan.
+         */
+        for (POLocalRearrange currentLR : rearranges) {
+            PhysicalPlan mapPlan = null;
+            PhysicalPlan reducePlan = null;
+            try {
+                mapPlan = getMapPlan(sparkOperator.physicalPlan, currentLR);
+            } catch (PlanException e) {
+                throw new VisitorException(e);
+            }
+            try {
+                reducePlan = getReducePlan(sparkOperator.physicalPlan, 
currentLR);
+            } catch (PlanException e) {
+                throw new VisitorException(e);
+            }
+
+            // Current code does not enable secondarykey optimization when 
join case is encounted
+            List<PhysicalOperator> rootsOfReducePlan = reducePlan.getRoots();
+            if (rootsOfReducePlan.get(0) instanceof POGlobalRearrangeSpark) {
+                PhysicalOperator glr = rootsOfReducePlan.get(0);
+                List<PhysicalOperator> predecessors = 
sparkOperator.physicalPlan.getPredecessors(glr);
+                if (predecessors != null && predecessors.size() >= 2) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Current code does not enable secondarykey 
optimization when  join case is encounted");
+                    }
+                    return;
+                }
+            }
+
+            if (mapPlan.getOperator(currentLR.getOperatorKey()) == null) {
+                // The POLocalRearrange is sub-plan of a POSplit
+                mapPlan = PlanHelper.getLocalRearrangePlanFromSplit(mapPlan, 
currentLR.getOperatorKey());
+            }
+            SparkSecondaryKeyOptimizerUtil sparkSecondaryKeyOptUtil = new 
SparkSecondaryKeyOptimizerUtil();
+            SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = 
sparkSecondaryKeyOptUtil.applySecondaryKeySort(mapPlan, reducePlan);
+            if (info != null) {
+                numSortRemoved += info.getNumSortRemoved();
+                numDistinctChanged += info.getNumDistinctChanged();
+                numUseSecondaryKey += info.getNumUseSecondaryKey();
+            }
+        }
+    }
+
+    /**
+     * Find the MRPlan of the physicalPlan which containing currentLR
+     * Backward search all the physicalOperators which precede currentLR until 
the previous POLocalRearrange
+     * is found or the root of physicalPlan is found.
+     *
+     * @param physicalPlan
+     * @param currentLR
+     * @return
+     * @throws VisitorException
+     * @throws PlanException
+     */
+    private PhysicalPlan getMapPlan(PhysicalPlan physicalPlan, 
POLocalRearrange currentLR) throws VisitorException, PlanException {
+        PhysicalPlan mapPlan = new PhysicalPlan();
+        mapPlan.addAsRoot(currentLR);
+        List<PhysicalOperator> preList = 
physicalPlan.getPredecessors(currentLR);
+        while (true) {
+            if (preList == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("there is nothing to backward search");
+                }
+                break;
+            }
+            if (preList.size() != 1) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("the size of predecessor of currentLR should be 
1 but now it is not 1,physicalPlan:" + physicalPlan);
+                }
+                break;
+            }
+            PhysicalOperator pre = preList.get(0);
+            if (pre instanceof POLocalRearrange) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Finishing to find the mapPlan between preLR and 
currentLR.");
+                }
+                break;
+            }
+            mapPlan.addAsRoot(pre);
+            preList = physicalPlan.getPredecessors(pre);
+
+        }
+        return mapPlan;
+    }
+
+    /**
+     * Find the ReducePlan of the physicalPlan which containing currentLR
+     * Forward search all the physicalOperators which succeed currentLR until 
the next POLocalRearrange
+     * is found or the leave of physicalPlan is found.
+     *
+     * @param physicalPlan
+     * @param currentLR
+     * @return
+     * @throws PlanException
+     */
+    private PhysicalPlan getReducePlan(PhysicalPlan physicalPlan, 
POLocalRearrange currentLR) throws PlanException {
+        PhysicalPlan reducePlan = new PhysicalPlan();
+        List<PhysicalOperator> succList = 
physicalPlan.getSuccessors(currentLR);
+        while (true) {
+            if (succList == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("there is nothing to forward search");
+                }
+                break;
+            }
+            if (succList.size() != 1) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("the size of successors of currentLR should be 1 
but now it is not 1,physicalPlan:" + physicalPlan);
+                }
+                break;
+            }
+            PhysicalOperator succ = succList.get(0);
+            if (succ instanceof POLocalRearrange) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Finishing to find the ReducePlan between 
currentLR and netxtLR.");
+                }
+                break;
+            }
+            reducePlan.addAsLeaf(succ);
+            succList = physicalPlan.getSuccessors(succ);
+        }
+        return reducePlan;
+    }
+
+    @Override
+    public int getNumSortRemoved() {
+        return numSortRemoved;
+    }
+
+    @Override
+    public int getNumDistinctChanged() {
+        return numDistinctChanged;
+    }
+
+    @Override
+    public int getNumUseSecondaryKey() {
+        return numUseSecondaryKey;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+
+import java.util.List;
+
+public class SparkSecondaryKeyOptimizerUtil extends SecondaryKeyOptimizerUtil{
+    private static Log log = 
LogFactory.getLog(SparkSecondaryKeyOptimizerUtil.class);
+
+    @Override
+    protected PhysicalOperator getCurrentNode(PhysicalOperator root, 
PhysicalPlan reducePlan) {
+        PhysicalOperator currentNode = null;
+
+        if (!(root instanceof POGlobalRearrange)) {
+            log.debug("Expected reduce root to be a POGlobalRearrange, skip 
secondary key optimizing");
+            currentNode = null;
+        } else {
+            List<PhysicalOperator> globalRearrangeSuccs = reducePlan
+                    .getSuccessors(root);
+            if (globalRearrangeSuccs.size() == 1) {
+                currentNode = globalRearrangeSuccs.get(0);
+            } else {
+                log.debug("Expected successor of a POGlobalRearrange is 
POPackage, skip secondary key optimizing");
+                currentNode = null;
+            }
+        }
+
+        return currentNode;
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.PrintStream;
+import java.util.LinkedList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DotPlanDumper;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.DotPOPrinter;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+/**
+ * This class can print Spark plan in the DOT format. It uses
+ * clusters to illustrate nesting. If "verbose" is off, it will skip
+ * any nesting in the associated physical plans.
+ */
+public class DotSparkPrinter extends DotPlanDumper<SparkOperator, 
SparkOperPlan,
+        DotSparkPrinter.InnerOperator,
+        DotSparkPrinter.InnerPlan> {
+
+    private static int counter = 0;
+    private boolean isVerboseNesting = true;
+
+    public DotSparkPrinter(SparkOperPlan plan, PrintStream ps) {
+        this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>(),
+                new HashSet<Operator>());
+    }
+
+    private DotSparkPrinter(SparkOperPlan plan, PrintStream ps, boolean 
isSubGraph,
+                         Set<Operator> subgraphs,
+                         Set<Operator> multiInputSubgraphs,
+                         Set<Operator> multiOutputSubgraphs) {
+        super(plan, ps, isSubGraph, subgraphs,
+                multiInputSubgraphs, multiOutputSubgraphs);
+    }
+
+    @Override
+    public void setVerbose(boolean verbose) {
+        // leave the parents verbose set to true
+        isVerboseNesting = verbose;
+    }
+
+    @Override
+    protected DotPlanDumper makeDumper(InnerPlan plan, PrintStream ps) {
+        return new InnerPrinter(plan, ps, mSubgraphs, mMultiInputSubgraphs,
+                mMultiOutputSubgraphs);
+    }
+
+    @Override
+    protected String getName(SparkOperator op) {
+        String name = op.name();
+        // Cut of the part of the name specifying scope.
+        String delimiter = " - ";
+        String[] temp;
+        temp = name.split(delimiter);
+        return temp[0];
+    }
+
+    @Override
+    protected Collection<InnerPlan> getNestedPlans(SparkOperator op) {
+        Collection<InnerPlan> plans = new LinkedList<InnerPlan>();
+        plans.add(new InnerPlan(op.physicalPlan));
+        return plans;
+    }
+
+    @Override
+    protected String[] getAttributes(SparkOperator op) {
+        String[] attributes = new String[3];
+        attributes[0] = "label=\""+getName(op)+"\"";
+        attributes[1] = "style=\"filled\"";
+        attributes[2] = "fillcolor=\"#EEEEEE\"";
+        return attributes;
+    }
+
+
+    /**
+     * Helper class to represent the relationship of inner operators
+     */
+    public static class InnerOperator extends Operator<PlanVisitor> {
+
+        private static final long serialVersionUID = 1L;
+        String name;
+        PhysicalPlan plan;
+        int code;
+
+        public InnerOperator(PhysicalPlan plan, String name) {
+            super(new OperatorKey());
+            this.name = name;
+            this.plan = plan;
+            this.code = counter++;
+        }
+
+        @Override public void visit(PlanVisitor v) {}
+        @Override public boolean supportsMultipleInputs() {return false;}
+        @Override public boolean supportsMultipleOutputs() {return false;}
+        @Override public String name() {return name;}
+        public PhysicalPlan getPlan() {return plan;}
+        @Override public int hashCode() {return code;}
+    }
+
+    /**
+     * Each spark operator will have and an inner plan of inner
+     * operators. The inner operators contain the physical plan of the
+     * execution phase.
+     */
+    public static class InnerPlan extends OperatorPlan<InnerOperator> {
+
+        private static final long serialVersionUID = 1L;
+
+        public InnerPlan(PhysicalPlan plan) {
+            InnerOperator sparkInnerOp = new InnerOperator(plan, "spark");
+            this.add(sparkInnerOp);
+        }
+    }
+
+    private class InnerPrinter extends DotPlanDumper<InnerOperator, InnerPlan,
+            PhysicalOperator, PhysicalPlan> {
+
+        public InnerPrinter(InnerPlan plan, PrintStream ps,
+                            Set<Operator> subgraphs,
+                            Set<Operator> multiInputSubgraphs,
+                            Set<Operator> multiOutputSubgraphs) {
+            super(plan, ps, true, subgraphs, multiInputSubgraphs,
+                    multiOutputSubgraphs);
+        }
+
+        @Override
+        protected String[] getAttributes(InnerOperator op) {
+            String[] attributes = new String[3];
+            attributes[0] = "label=\""+super.getName(op)+"\"";
+            attributes[1] = "style=\"filled\"";
+            attributes[2] = "fillcolor=\"white\"";
+            return attributes;
+        }
+
+        @Override
+        protected Collection<PhysicalPlan> getNestedPlans(InnerOperator op) {
+            Collection<PhysicalPlan> l = new LinkedList<PhysicalPlan>();
+            l.add(op.getPlan());
+            return l;
+        }
+
+        @Override
+        protected DotPOPrinter makeDumper(PhysicalPlan plan, PrintStream ps) {
+            DotPOPrinter printer = new DotPOPrinter(plan, ps, true,
+                    mSubgraphs,
+                    mMultiInputSubgraphs,
+                    mMultiOutputSubgraphs);
+            printer.setVerbose(isVerboseNesting);
+            return printer;
+        }
+    }
+}


Reply via email to