Author: xuefu Date: Wed Apr 29 00:45:31 2015 New Revision: 1676651 URL: http://svn.apache.org/r1676651 Log: PIG-4518: SparkOperator should correspond to complete Spark job (Mohit via Xuefu)
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1676651&r1=1676650&r2=1676651&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Apr 29 00:45:31 2015 @@ -17,6 +17,8 @@ */ package org.apache.pig.backend.hadoop.executionengine.spark; +import com.google.common.collect.Lists; + import java.io.File; import java.io.IOException; import java.io.PrintStream; @@ -32,10 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import com.google.common.collect.Lists; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -101,7 +100,6 @@ import org.apache.pig.impl.plan.VisitorE import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.spark.SparkPigStats; import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; - import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.scheduler.JobLogger; @@ -125,13 +123,16 @@ public class SparkLauncher extends Launc @Override public PigStats launchPig(PhysicalPlan physicalPlan, String grpName, PigContext pigContext) throws Exception { - LOG.debug(physicalPlan); + if (LOG.isDebugEnabled()) + LOG.debug(physicalPlan); JobConf jobConf = SparkUtil.newJobConf(pigContext); jobConf.set(PigConstants.LOCAL_CODE_DIR, System.getProperty("java.io.tmpdir")); SchemaTupleBackend.initialize(jobConf, pigContext); SparkOperPlan sparkplan = compile(physicalPlan, pigContext); + if (LOG.isDebugEnabled()) + explain(sparkplan, System.out, "text", true); SparkPigStats sparkStats = (SparkPigStats) pigContext .getExecutionEngine().instantiatePigStats(); PigStats.start(sparkStats); @@ -185,7 +186,7 @@ public class SparkLauncher extends Launc cleanUpSparkJob(pigContext, currentDirectoryPath); sparkStats.finish(); - return sparkStats; + return sparkStats; } private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException { @@ -443,6 +444,8 @@ public class SparkLauncher extends Launc List<SparkOperator> leaves = sparkPlan.getLeaves(); Collections.sort(leaves); Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap(); + if (LOG.isDebugEnabled()) + LOG.debug("Converting " + leaves.size() + " Spark Operators"); for (SparkOperator leaf : leaves) { Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap(); sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds, @@ -494,7 +497,7 @@ public class SparkLauncher extends Launc predecessorRDDs, convertMap); sparkOpRdds.put(sparkOperator.getOperatorKey(), physicalOpRdds.get(leafPO.getOperatorKey())); - }catch(Exception e) { + } catch(Exception e) { if( e instanceof SparkException) { LOG.info("throw SparkException, error founds when running " + "rdds in spark"); @@ -507,7 +510,7 @@ public class SparkLauncher extends Launc List<POStore> poStores = PlanHelper.getPhysicalOperators( sparkOperator.physicalPlan, POStore.class); if (poStores != null && poStores.size() == 1) { - POStore poStore = poStores.get(0); + POStore poStore = poStores.get(0); if (!isFail) { for (int jobID : getJobIDs(seenJobIDs)) { SparkStatsUtil.waitForJobAddStats(jobID, poStore, @@ -519,10 +522,10 @@ public class SparkLauncher extends Launc conf, exception); } } else { - LOG.info(String - .format(String.format("sparkOperator:{} does not have POStore or" + - " sparkOperator has more than 1 POStore, the size of POStore"), - sparkOperator.name(), poStores.size())); + LOG.info(String + .format(String.format("sparkOperator:{} does not have POStore or" + + " sparkOperator has more than 1 POStore. {} is the size of POStore."), + sparkOperator.name(), poStores.size())); } } @@ -542,6 +545,7 @@ public class SparkLauncher extends Launc convertMap); predecessorRdds.add(rdds.get(predecessor.getOperatorKey())); } + } else { if (rddsFromPredeSparkOper != null && rddsFromPredeSparkOper.size() > 0) { @@ -552,7 +556,7 @@ public class SparkLauncher extends Launc POConverter converter = convertMap.get(physicalOperator.getClass()); if (converter == null) { throw new IllegalArgumentException( - "Spork unsupported PhysicalOperator: " + physicalOperator); + "Pig on Spark does not support Physical Operator: " + physicalOperator); } LOG.info("Converting operator " @@ -573,9 +577,12 @@ public class SparkLauncher extends Launc public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps, String format, boolean verbose) throws IOException { SparkOperPlan sparkPlan = compile(pp, pc); - ps.println("#-----------------------------------------------------#"); - ps.println("#The Spark node relations are:"); - ps.println("#-----------------------------------------------------#"); + explain(sparkPlan, ps, format, verbose); + } + + private void explain(SparkOperPlan sparkPlan, PrintStream ps, + String format, boolean verbose) + throws IOException { Map<OperatorKey, SparkOperator> allOperKeys = sparkPlan.getKeys(); List<OperatorKey> operKeyList = new ArrayList(allOperKeys.keySet()); Collections.sort(operKeyList); @@ -591,13 +598,14 @@ public class SparkLauncher extends Launc } ps.println(); } + if (format.equals("text")) { SparkPrinter printer = new SparkPrinter(ps, sparkPlan); printer.setVerbose(verbose); printer.visit(); } else { // TODO: add support for other file format throw new IOException( - "Non-text output of explain is not supported."); + "Non-text output of explain is not supported."); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1676651&r1=1676650&r2=1676651&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java Wed Apr 29 00:45:31 2015 @@ -113,6 +113,10 @@ public class PackageConverter implements (Tuple) next.get(2)); nullableTuple.setIndex(((Number) next.get(0)) .byteValue()); + if (LOG.isDebugEnabled()) + LOG.debug("Setting index to " + next.get(0) + + " for tuple " + (Tuple)next.get(2) + " with key " + + next.get(1)); return nullableTuple; } catch (ExecException e) { throw new RuntimeException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1676651&r1=1676650&r2=1676651&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java Wed Apr 29 00:45:31 2015 @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; @@ -33,7 +35,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.builtin.LOG; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.util.ObjectSerializer; @@ -54,7 +58,8 @@ import com.google.common.collect.Lists; public class StoreConverter implements POConverter<Tuple, Tuple2<Text, Tuple>, POStore> { - private static final FromTupleFunction FROM_TUPLE_FUNCTION = new FromTupleFunction(); + private static final Log LOG = LogFactory.getLog(StoreConverter.class); + private static final FromTupleFunction FROM_TUPLE_FUNCTION = new FromTupleFunction(); private PigContext pigContext; @@ -94,7 +99,10 @@ public class StoreConverter implements .getFileName(), Text.class, Tuple.class, PigOutputFormat.class, storeJobConf); } - return rddPairs.rdd(); + RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd(); + if (LOG.isDebugEnabled()) + LOG.debug("RDD lineage: " + retRdd.toDebugString()); + return retRdd; } private static POStore configureStorer(JobConf jobConf, Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1676651&r1=1676650&r2=1676651&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Wed Apr 29 00:45:31 2015 @@ -308,7 +308,7 @@ public class SparkCompiler extends PhyPl public void visitDistinct(PODistinct op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); } catch (Exception e) { int errCode = 2034; String msg = "Error compiling operator " @@ -343,7 +343,7 @@ public class SparkCompiler extends PhyPl @Override public void visitLoad(POLoad op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { int errCode = 2034; @@ -379,7 +379,7 @@ public class SparkCompiler extends PhyPl @Override public void visitStore(POStore op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); phyToSparkOpMap.put(op, curSparkOp); if (op.getSFile() != null && op.getSFile().getFuncSpec() != null) curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString()); @@ -394,7 +394,7 @@ public class SparkCompiler extends PhyPl @Override public void visitFilter(POFilter op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); processUDFs(op.getPlan()); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { @@ -408,7 +408,7 @@ public class SparkCompiler extends PhyPl @Override public void visitCross(POCross op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { int errCode = 2034; @@ -422,7 +422,7 @@ public class SparkCompiler extends PhyPl public void visitStream(POStream op) throws VisitorException { try { POStreamSpark poStreamSpark = new POStreamSpark(op); - nonBlocking(poStreamSpark); + addToPlan(poStreamSpark); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { int errCode = 2034; @@ -435,7 +435,7 @@ public class SparkCompiler extends PhyPl @Override public void visitSort(POSort op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); POSort sort = op; long limit = sort.getLimit(); if (limit!=-1) { @@ -455,7 +455,7 @@ public class SparkCompiler extends PhyPl @Override public void visitLimit(POLimit op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); } catch (Exception e) { int errCode = 2034; String msg = "Error compiling operator " @@ -468,7 +468,7 @@ public class SparkCompiler extends PhyPl public void visitLocalRearrange(POLocalRearrange op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); List<PhysicalPlan> plans = op.getPlans(); if (plans != null) for (PhysicalPlan ep : plans) @@ -520,7 +520,7 @@ public class SparkCompiler extends PhyPl } try { - nonBlocking(op); + addToPlan(op); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { int errCode = 2034; @@ -533,7 +533,7 @@ public class SparkCompiler extends PhyPl @Override public void visitPOForEach(POForEach op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); List<PhysicalPlan> plans = op.getInputPlans(); if (plans != null) { for (PhysicalPlan ep : plans) { @@ -553,7 +553,7 @@ public class SparkCompiler extends PhyPl public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException { try { - blocking(op); + addToPlan(op); curSparkOp.customPartitioner = op.getCustomPartitioner(); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { @@ -567,7 +567,7 @@ public class SparkCompiler extends PhyPl @Override public void visitPackage(POPackage op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); phyToSparkOpMap.put(op, curSparkOp); if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) { curSparkOp.markRegularJoin(); @@ -590,7 +590,7 @@ public class SparkCompiler extends PhyPl @Override public void visitUnion(POUnion op) throws VisitorException { try { - nonBlocking(op); + addToPlan(op); phyToSparkOpMap.put(op, curSparkOp); } catch (Exception e) { int errCode = 2034; @@ -629,7 +629,7 @@ public class SparkCompiler extends PhyPl } } - private void nonBlocking(PhysicalOperator op) throws PlanException, + private void addToPlan(PhysicalOperator op) throws PlanException, IOException { SparkOperator sparkOp = null; if (compiledInputs.length == 1) { @@ -639,17 +639,6 @@ public class SparkCompiler extends PhyPl } sparkOp.physicalPlan.addAsLeaf(op); curSparkOp = sparkOp; - } - - private void blocking(PhysicalOperator op) throws PlanException, - IOException { - SparkOperator sparkOp = getSparkOp(); - sparkPlan.add(sparkOp); - for (SparkOperator compileInput : compiledInputs) { - sparkPlan.connect(compileInput, sparkOp); - } - sparkOp.physicalPlan.addAsLeaf(op); - curSparkOp = sparkOp; } private SparkOperator merge(SparkOperator[] compiledInputs) Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1676651&r1=1676650&r2=1676651&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java Wed Apr 29 00:45:31 2015 @@ -17,15 +17,19 @@ */ package org.apache.pig.backend.hadoop.executionengine.spark.plan; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.pig.PigException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.impl.plan.DepthFirstWalker; @@ -41,6 +45,8 @@ import org.apache.pig.impl.util.Pair; * stitched in to the "value" */ public class SparkPOPackageAnnotator extends SparkOpPlanVisitor { + private static final Log LOG = LogFactory.getLog(SparkPOPackageAnnotator.class); + public SparkPOPackageAnnotator(SparkOperPlan plan) { super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); } @@ -51,86 +57,56 @@ public class SparkPOPackageAnnotator ext PackageDiscoverer pkgDiscoverer = new PackageDiscoverer( sparkOp.physicalPlan); pkgDiscoverer.visit(); - POPackage pkg = pkgDiscoverer.getPkg(); - if (pkg != null) { - handlePackage(sparkOp, pkg); - } - } - } - - private void handlePackage(SparkOperator pkgSparkOp, POPackage pkg) - throws VisitorException { - int lrFound = 0; - List<SparkOperator> predecessors = this.mPlan - .getPredecessors(pkgSparkOp); - if (predecessors != null && predecessors.size() > 0) { - for (SparkOperator pred : predecessors) { - lrFound += patchPackage(pred, pkgSparkOp, pkg); - if (lrFound == pkg.getNumInps()) { - break; - } - } - } - if (lrFound != pkg.getNumInps()) { - int errCode = 2086; - String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators."; - throw new OptimizerException(msg, errCode, PigException.BUG); } } - private int patchPackage(SparkOperator pred, SparkOperator pkgSparkOp, - POPackage pkg) throws VisitorException { - LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer( - pred.physicalPlan, pkg); - lrDiscoverer.visit(); - // let our caller know if we managed to patch - // the package - return lrDiscoverer.getLoRearrangeFound(); - } - static class PackageDiscoverer extends PhyPlanVisitor { - private POPackage pkg; + private PhysicalPlan plan; public PackageDiscoverer(PhysicalPlan plan) { super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( plan)); + this.plan = plan; } @Override public void visitPackage(POPackage pkg) throws VisitorException { this.pkg = pkg; - }; - - /** - * @return the pkg - */ - public POPackage getPkg() { - return pkg; - } - - } - static class LoRearrangeDiscoverer extends PhyPlanVisitor { - - private int loRearrangeFound = 0; - private POPackage pkg; + // Find POLocalRearrange(s) corresponding to this POPackage + PhysicalOperator graOp = plan.getPredecessors(pkg).get(0); + if (! (graOp instanceof POGlobalRearrange)) { + throw new OptimizerException("Package operator is not preceded by " + + "GlobalRearrange operator in Spark Plan", 2087, PigException.BUG); + } - public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) { - super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( - plan)); - this.pkg = pkg; - } + List<PhysicalOperator> lraOps = plan.getPredecessors(graOp); + if (pkg.getNumInps() != lraOps.size()) { + throw new OptimizerException("Unexpected problem during optimization. " + + "Could not find all LocalRearrange operators. Expected " + pkg.getNumInps() + + ". Got " + lraOps.size() + ".", 2086, PigException.BUG); + } + Collections.sort(lraOps); + for (PhysicalOperator op : lraOps) { + if (! (op instanceof POLocalRearrange)) { + throw new OptimizerException("GlobalRearrange operator can only be preceded by " + + "LocalRearrange operator(s) in Spark Plan", 2087, PigException.BUG); + } + annotatePkgWithLRA((POLocalRearrange)op); + } + }; - @Override - public void visitLocalRearrange(POLocalRearrange lrearrange) + private void annotatePkgWithLRA(POLocalRearrange lrearrange) throws VisitorException { - loRearrangeFound++; + Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo; + if (LOG.isDebugEnabled()) + LOG.debug("Annotating package " + pkg + " with localrearrange operator " + + lrearrange + " with index " + lrearrange.getIndex()); if (pkg.getPkgr() instanceof LitePackager) { if (lrearrange.getIndex() != 0) { - // Throw some exception here throw new RuntimeException( "POLocalRearrange for POPackageLite cannot have index other than 0, but has index - " + lrearrange.getIndex()); @@ -158,17 +134,12 @@ public class SparkPOPackageAnnotator ext Integer.valueOf(lrearrange.getIndex()), new Pair<Boolean, Map<Integer, Integer>>(lrearrange .isProjectStar(), lrearrange.getProjectedColsMap())); + if (LOG.isDebugEnabled()) + LOG.debug("KeyInfo for packager for package operator " + pkg + " is " + + keyInfo ); pkg.getPkgr().setKeyInfo(keyInfo); pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); } - - /** - * @return the loRearrangeFound - */ - public int getLoRearrangeFound() { - return loRearrangeFound; - } - } -} +} \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1676651&r1=1676650&r2=1676651&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Wed Apr 29 00:45:31 2015 @@ -291,19 +291,25 @@ public class AccumulatorOptimizerUtil { } public static void addAccumulatorSpark(PhysicalPlan plan) throws - VisitorException { + VisitorException { List<PhysicalOperator> pos = plan.getRoots(); if (pos == null || pos.size() == 0) { return; } - // See if this is a POGlobalRearrange - PhysicalOperator po_globalRearrange = pos.get(0); - if (!po_globalRearrange.getClass().equals(POGlobalRearrange.class)) { - return; + List<POGlobalRearrange> gras = PlanHelper.getPhysicalOperators(plan, + POGlobalRearrange.class); + + for (POGlobalRearrange gra : gras) { + addAccumulatorSparkForGRASubDAG(plan, gra); } + } + + + private static void addAccumulatorSparkForGRASubDAG(PhysicalPlan plan, + POGlobalRearrange gra) throws VisitorException { - List<PhysicalOperator> poPackages = plan.getSuccessors(po_globalRearrange); + List<PhysicalOperator> poPackages = plan.getSuccessors(gra); if (poPackages == null || poPackages.size() == 0) { return; @@ -370,4 +376,4 @@ public class AccumulatorOptimizerUtil { po_foreach.setAccumulative(); } } -} +} \ No newline at end of file