Author: xuefu
Date: Wed Apr 29 00:45:31 2015
New Revision: 1676651

URL: http://svn.apache.org/r1676651
Log:
PIG-4518: SparkOperator should correspond to complete Spark job (Mohit via 
Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed Apr 29 00:45:31 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
+import com.google.common.collect.Lists;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -32,10 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -101,7 +100,6 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
-
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
@@ -125,13 +123,16 @@ public class SparkLauncher extends Launc
        @Override
        public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
                        PigContext pigContext) throws Exception {
-               LOG.debug(physicalPlan);
+               if (LOG.isDebugEnabled())
+                   LOG.debug(physicalPlan);
                JobConf jobConf = SparkUtil.newJobConf(pigContext);
                jobConf.set(PigConstants.LOCAL_CODE_DIR,
                                System.getProperty("java.io.tmpdir"));
 
                SchemaTupleBackend.initialize(jobConf, pigContext);
                SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
+               if (LOG.isDebugEnabled())
+                         explain(sparkplan, System.out, "text", true);
                SparkPigStats sparkStats = (SparkPigStats) pigContext
                                .getExecutionEngine().instantiatePigStats();
                PigStats.start(sparkStats);
@@ -185,7 +186,7 @@ public class SparkLauncher extends Launc
                cleanUpSparkJob(pigContext, currentDirectoryPath);
                sparkStats.finish();
 
-        return sparkStats;
+               return sparkStats;
        }
 
        private void optimize(PigContext pc, SparkOperPlan plan) throws 
VisitorException {
@@ -443,6 +444,8 @@ public class SparkLauncher extends Launc
                        List<SparkOperator> leaves = sparkPlan.getLeaves();
                        Collections.sort(leaves);
                        Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new 
HashMap();
+                       if (LOG.isDebugEnabled())
+                           LOG.debug("Converting " + leaves.size() + " Spark 
Operators");
                        for (SparkOperator leaf : leaves) {
                                Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = 
new HashMap();
                                sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
@@ -494,7 +497,7 @@ public class SparkLauncher extends Launc
                                                predecessorRDDs, convertMap);
                                sparkOpRdds.put(sparkOperator.getOperatorKey(),
                                                
physicalOpRdds.get(leafPO.getOperatorKey()));
-                       }catch(Exception e) {
+                       } catch(Exception e) {
                                if( e instanceof  SparkException) {
                                        LOG.info("throw SparkException, error 
founds when running " +
                                                        "rdds in spark");
@@ -507,7 +510,7 @@ public class SparkLauncher extends Launc
                List<POStore> poStores = PlanHelper.getPhysicalOperators(
                                sparkOperator.physicalPlan, POStore.class);
                if (poStores != null && poStores.size() == 1) {
-                       POStore poStore = poStores.get(0);
+                         POStore poStore = poStores.get(0);
             if (!isFail) {
                 for (int jobID : getJobIDs(seenJobIDs)) {
                     SparkStatsUtil.waitForJobAddStats(jobID, poStore,
@@ -519,10 +522,10 @@ public class SparkLauncher extends Launc
                         conf, exception);
             }
         } else {
-                       LOG.info(String
-                                       .format(String.format("sparkOperator:{} 
does not have POStore or" +
-                                    " sparkOperator has more than 1 POStore, 
the size of POStore"),
-                                                       sparkOperator.name(), 
poStores.size()));
+                             LOG.info(String
+                                             
.format(String.format("sparkOperator:{} does not have POStore or" +
+                    " sparkOperator has more than 1 POStore. {} is the size of 
POStore."),
+                    sparkOperator.name(), poStores.size()));
                }
        }
 
@@ -542,6 +545,7 @@ public class SparkLauncher extends Launc
                                                convertMap);
                                
predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
                        }
+
                } else {
                        if (rddsFromPredeSparkOper != null
                                        && rddsFromPredeSparkOper.size() > 0) {
@@ -552,7 +556,7 @@ public class SparkLauncher extends Launc
                POConverter converter = 
convertMap.get(physicalOperator.getClass());
                if (converter == null) {
                        throw new IllegalArgumentException(
-                                       "Spork unsupported PhysicalOperator: " 
+ physicalOperator);
+                                       "Pig on Spark does not support Physical 
Operator: " + physicalOperator);
                }
 
                LOG.info("Converting operator "
@@ -573,9 +577,12 @@ public class SparkLauncher extends Launc
        public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
                        String format, boolean verbose) throws IOException {
                SparkOperPlan sparkPlan = compile(pp, pc);
-               
ps.println("#-----------------------------------------------------#");
-               ps.println("#The Spark node relations are:");
-               
ps.println("#-----------------------------------------------------#");
+               explain(sparkPlan, ps, format, verbose);
+       }
+
+       private void explain(SparkOperPlan sparkPlan, PrintStream ps,
+           String format, boolean verbose)
+                 throws IOException {
                Map<OperatorKey, SparkOperator> allOperKeys = 
sparkPlan.getKeys();
                List<OperatorKey> operKeyList = new 
ArrayList(allOperKeys.keySet());
                Collections.sort(operKeyList);
@@ -591,13 +598,14 @@ public class SparkLauncher extends Launc
                        }
                        ps.println();
                }
+
                if (format.equals("text")) {
                        SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
                        printer.setVerbose(verbose);
                        printer.visit();
                } else { // TODO: add support for other file format
                        throw new IOException(
-                                       "Non-text    output of explain is not 
supported.");
+                                       "Non-text output of explain is not 
supported.");
                }
        }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
 Wed Apr 29 00:45:31 2015
@@ -113,6 +113,10 @@ public class PackageConverter implements
                                     (Tuple) next.get(2));
                             nullableTuple.setIndex(((Number) next.get(0))
                                     .byteValue());
+                            if (LOG.isDebugEnabled())
+                                LOG.debug("Setting index to " + next.get(0) +
+                                    " for tuple " + (Tuple)next.get(2) + " 
with key " +
+                                    next.get(1));
                             return nullableTuple;
                         } catch (ExecException e) {
                             throw new RuntimeException(e);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 Wed Apr 29 00:45:31 2015
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -33,7 +35,9 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.builtin.LOG;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -54,7 +58,8 @@ import com.google.common.collect.Lists;
 public class StoreConverter implements
         POConverter<Tuple, Tuple2<Text, Tuple>, POStore> {
 
-    private static final FromTupleFunction FROM_TUPLE_FUNCTION = new 
FromTupleFunction();
+  private static final Log LOG = LogFactory.getLog(StoreConverter.class);
+  private static final FromTupleFunction FROM_TUPLE_FUNCTION = new 
FromTupleFunction();
 
     private PigContext pigContext;
 
@@ -94,7 +99,10 @@ public class StoreConverter implements
                     .getFileName(), Text.class, Tuple.class,
                     PigOutputFormat.class, storeJobConf);
         }
-        return rddPairs.rdd();
+      RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd();
+      if (LOG.isDebugEnabled())
+          LOG.debug("RDD lineage: " + retRdd.toDebugString());
+      return retRdd;
     }
 
     private static POStore configureStorer(JobConf jobConf,

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Wed Apr 29 00:45:31 2015
@@ -308,7 +308,7 @@ public class SparkCompiler extends PhyPl
 
        public void visitDistinct(PODistinct op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                } catch (Exception e) {
                        int errCode = 2034;
                        String msg = "Error compiling operator "
@@ -343,7 +343,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitLoad(POLoad op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
                        int errCode = 2034;
@@ -379,7 +379,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitStore(POStore op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        phyToSparkOpMap.put(op, curSparkOp);
                        if (op.getSFile() != null && 
op.getSFile().getFuncSpec() != null)
                                
curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
@@ -394,7 +394,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitFilter(POFilter op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        processUDFs(op.getPlan());
                        phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
@@ -408,7 +408,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitCross(POCross op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
                        int errCode = 2034;
@@ -422,7 +422,7 @@ public class SparkCompiler extends PhyPl
        public void visitStream(POStream op) throws VisitorException {
                try {
                        POStreamSpark poStreamSpark = new POStreamSpark(op);
-                       nonBlocking(poStreamSpark);
+                       addToPlan(poStreamSpark);
                        phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
                        int errCode = 2034;
@@ -435,7 +435,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitSort(POSort op) throws VisitorException {
                try {
-            nonBlocking(op);
+            addToPlan(op);
             POSort sort = op;
             long limit = sort.getLimit();
             if (limit!=-1) {
@@ -455,7 +455,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitLimit(POLimit op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                } catch (Exception e) {
                        int errCode = 2034;
                        String msg = "Error compiling operator "
@@ -468,7 +468,7 @@ public class SparkCompiler extends PhyPl
        public void visitLocalRearrange(POLocalRearrange op)
                        throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        List<PhysicalPlan> plans = op.getPlans();
                        if (plans != null)
                                for (PhysicalPlan ep : plans)
@@ -520,7 +520,7 @@ public class SparkCompiler extends PhyPl
                }
 
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
                        int errCode = 2034;
@@ -533,7 +533,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitPOForEach(POForEach op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        List<PhysicalPlan> plans = op.getInputPlans();
                        if (plans != null) {
                                for (PhysicalPlan ep : plans) {
@@ -553,7 +553,7 @@ public class SparkCompiler extends PhyPl
        public void visitGlobalRearrange(POGlobalRearrange op)
                        throws VisitorException {
                try {
-                       blocking(op);
+                       addToPlan(op);
                        curSparkOp.customPartitioner = 
op.getCustomPartitioner();
                        phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
@@ -567,7 +567,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitPackage(POPackage op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        phyToSparkOpMap.put(op, curSparkOp);
                        if (op.getPkgr().getPackageType() == 
Packager.PackageType.JOIN) {
                                curSparkOp.markRegularJoin();
@@ -590,7 +590,7 @@ public class SparkCompiler extends PhyPl
        @Override
        public void visitUnion(POUnion op) throws VisitorException {
                try {
-                       nonBlocking(op);
+                       addToPlan(op);
                        phyToSparkOpMap.put(op, curSparkOp);
                } catch (Exception e) {
                        int errCode = 2034;
@@ -629,7 +629,7 @@ public class SparkCompiler extends PhyPl
                }
        }
 
-       private void nonBlocking(PhysicalOperator op) throws PlanException,
+       private void addToPlan(PhysicalOperator op) throws PlanException,
                        IOException {
                SparkOperator sparkOp = null;
                if (compiledInputs.length == 1) {
@@ -639,17 +639,6 @@ public class SparkCompiler extends PhyPl
                }
                sparkOp.physicalPlan.addAsLeaf(op);
                curSparkOp = sparkOp;
-       }
-
-       private void blocking(PhysicalOperator op) throws PlanException,
-                       IOException {
-               SparkOperator sparkOp = getSparkOp();
-               sparkPlan.add(sparkOp);
-               for (SparkOperator compileInput : compiledInputs) {
-                       sparkPlan.connect(compileInput, sparkOp);
-               }
-               sparkOp.physicalPlan.addAsLeaf(op);
-               curSparkOp = sparkOp;
        }
 
        private SparkOperator merge(SparkOperator[] compiledInputs)

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 Wed Apr 29 00:45:31 2015
@@ -17,15 +17,19 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.plan;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -41,6 +45,8 @@ import org.apache.pig.impl.util.Pair;
  * stitched in to the "value"
  */
 public class SparkPOPackageAnnotator extends SparkOpPlanVisitor {
+       private static final Log LOG = 
LogFactory.getLog(SparkPOPackageAnnotator.class);
+
        public SparkPOPackageAnnotator(SparkOperPlan plan) {
                super(plan, new DepthFirstWalker<SparkOperator, 
SparkOperPlan>(plan));
        }
@@ -51,86 +57,56 @@ public class SparkPOPackageAnnotator ext
                        PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(
                                        sparkOp.physicalPlan);
                        pkgDiscoverer.visit();
-                       POPackage pkg = pkgDiscoverer.getPkg();
-                       if (pkg != null) {
-                               handlePackage(sparkOp, pkg);
-                       }
-               }
-       }
-
-       private void handlePackage(SparkOperator pkgSparkOp, POPackage pkg)
-                       throws VisitorException {
-               int lrFound = 0;
-               List<SparkOperator> predecessors = this.mPlan
-                               .getPredecessors(pkgSparkOp);
-               if (predecessors != null && predecessors.size() > 0) {
-                       for (SparkOperator pred : predecessors) {
-                               lrFound += patchPackage(pred, pkgSparkOp, pkg);
-                               if (lrFound == pkg.getNumInps()) {
-                                       break;
-                               }
-                       }
-               }
-               if (lrFound != pkg.getNumInps()) {
-                       int errCode = 2086;
-                       String msg = "Unexpected problem during optimization. 
Could not find all LocalRearrange operators.";
-                       throw new OptimizerException(msg, errCode, 
PigException.BUG);
                }
        }
 
-       private int patchPackage(SparkOperator pred, SparkOperator pkgSparkOp,
-                       POPackage pkg) throws VisitorException {
-               LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
-                               pred.physicalPlan, pkg);
-               lrDiscoverer.visit();
-               // let our caller know if we managed to patch
-               // the package
-               return lrDiscoverer.getLoRearrangeFound();
-       }
-
        static class PackageDiscoverer extends PhyPlanVisitor {
-
                private POPackage pkg;
+               private PhysicalPlan plan;
 
                public PackageDiscoverer(PhysicalPlan plan) {
                        super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(
                                        plan));
+                       this.plan = plan;
                }
 
                @Override
                public void visitPackage(POPackage pkg) throws VisitorException 
{
                        this.pkg = pkg;
-               };
-
-               /**
-                * @return the pkg
-                */
-               public POPackage getPkg() {
-                       return pkg;
-               }
-
-       }
 
-       static class LoRearrangeDiscoverer extends PhyPlanVisitor {
-
-               private int loRearrangeFound = 0;
-               private POPackage pkg;
+                       // Find POLocalRearrange(s) corresponding to this 
POPackage
+                       PhysicalOperator graOp = 
plan.getPredecessors(pkg).get(0);
+                       if (! (graOp instanceof POGlobalRearrange)) {
+                                 throw new OptimizerException("Package 
operator is not preceded by " +
+                                           "GlobalRearrange operator in Spark 
Plan", 2087, PigException.BUG);
+                       }
 
-               public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
-                       super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(
-                                       plan));
-                       this.pkg = pkg;
-               }
+                       List<PhysicalOperator> lraOps = 
plan.getPredecessors(graOp);
+                       if (pkg.getNumInps() != lraOps.size()) {
+          throw new OptimizerException("Unexpected problem during 
optimization. " +
+                                                       "Could not find all 
LocalRearrange operators. Expected " + pkg.getNumInps() +
+                                           ". Got " + lraOps.size() + ".", 
2086, PigException.BUG);
+                       }
+                       Collections.sort(lraOps);
+                       for (PhysicalOperator op : lraOps) {
+                               if (! (op instanceof POLocalRearrange)) {
+                                       throw new 
OptimizerException("GlobalRearrange operator can only be preceded by " +
+                                                       "LocalRearrange 
operator(s) in Spark Plan", 2087, PigException.BUG);
+                               }
+                               annotatePkgWithLRA((POLocalRearrange)op);
+                       }
+               };
 
-               @Override
-               public void visitLocalRearrange(POLocalRearrange lrearrange)
+               private void annotatePkgWithLRA(POLocalRearrange lrearrange)
                                throws VisitorException {
-                       loRearrangeFound++;
+
                        Map<Integer, Pair<Boolean, Map<Integer, Integer>>> 
keyInfo;
+                       if (LOG.isDebugEnabled())
+                            LOG.debug("Annotating package " + pkg + " with 
localrearrange operator "
+               + lrearrange + " with index " + lrearrange.getIndex());
 
                        if (pkg.getPkgr() instanceof LitePackager) {
                                if (lrearrange.getIndex() != 0) {
-                                       // Throw some exception here
                                        throw new RuntimeException(
                                                        "POLocalRearrange for 
POPackageLite cannot have index other than 0, but has index - "
                                                                        + 
lrearrange.getIndex());
@@ -158,17 +134,12 @@ public class SparkPOPackageAnnotator ext
                                        Integer.valueOf(lrearrange.getIndex()),
                                        new Pair<Boolean, Map<Integer, 
Integer>>(lrearrange
                                                        .isProjectStar(), 
lrearrange.getProjectedColsMap()));
+                       if (LOG.isDebugEnabled())
+          LOG.debug("KeyInfo for packager for package operator " + pkg + " is "
+              + keyInfo );
                        pkg.getPkgr().setKeyInfo(keyInfo);
                        pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
                        
pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
                }
-
-               /**
-                * @return the loRearrangeFound
-                */
-               public int getLoRearrangeFound() {
-                       return loRearrangeFound;
-               }
-
        }
-}
+}
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1676651&r1=1676650&r2=1676651&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
 Wed Apr 29 00:45:31 2015
@@ -291,19 +291,25 @@ public class AccumulatorOptimizerUtil {
     }
 
     public static void addAccumulatorSpark(PhysicalPlan plan) throws
-            VisitorException {
+        VisitorException {
         List<PhysicalOperator> pos = plan.getRoots();
         if (pos == null || pos.size() == 0) {
             return;
         }
 
-        // See if this is a POGlobalRearrange
-        PhysicalOperator po_globalRearrange = pos.get(0);
-        if (!po_globalRearrange.getClass().equals(POGlobalRearrange.class)) {
-            return;
+        List<POGlobalRearrange> gras = PlanHelper.getPhysicalOperators(plan,
+            POGlobalRearrange.class);
+
+        for (POGlobalRearrange gra : gras) {
+            addAccumulatorSparkForGRASubDAG(plan, gra);
         }
+    }
+
+
+    private static void addAccumulatorSparkForGRASubDAG(PhysicalPlan plan,
+        POGlobalRearrange gra) throws VisitorException {
 
-        List<PhysicalOperator> poPackages = 
plan.getSuccessors(po_globalRearrange);
+        List<PhysicalOperator> poPackages = plan.getSuccessors(gra);
 
         if (poPackages == null || poPackages.size() == 0) {
             return;
@@ -370,4 +376,4 @@ public class AccumulatorOptimizerUtil {
             po_foreach.setAccumulative();
         }
     }
-}
+}
\ No newline at end of file


Reply via email to