Author: zly Date: Thu Feb 16 21:26:32 2017 New Revision: 1783304 URL: http://svn.apache.org/viewvc?rev=1783304&view=rev Log: PIG-4899: The number of records of input file is calculated wrongly in spark mode in multiquery case (Adam via Liyun)
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1783304&r1=1783303&r2=1783304&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Thu Feb 16 21:26:32 2017 @@ -162,13 +162,8 @@ public class LoadConverter implements RD private SparkEngineConf sparkEngineConf; private boolean initialized; - //LoadConverter#ToTupleFunction is executed more than once in multiquery case causing - //invalid number of input records, 'skip' flag below indicates first load is finished. - private boolean skip; - public ToTupleFunction(SparkEngineConf sparkEngineConf){ this.sparkEngineConf = sparkEngineConf; - } @Override @@ -177,14 +172,9 @@ public class LoadConverter implements RD long partitionId = TaskContext.get().partitionId(); PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId)); - //We're in POSplit and already counted all input records, - //in a multiquery case skip will be set to true after the first load is finished: - if (sparkCounters != null && SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName, counterName).getValue() > 0) { - skip=true; - } initialized = true; } - if (sparkCounters != null && disableCounter == false && skip == false) { + if (sparkCounters != null && disableCounter == false) { sparkCounters.increment(counterGroupName, counterName, 1L); } return v1._2(); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1783304&r1=1783303&r2=1783304&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Thu Feb 16 21:26:32 2017 @@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; @@ -133,6 +134,9 @@ public class SparkPigStats extends PigSt Map.Entry pairs = (Map.Entry)statIt.next(); LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue()); } + for (InputStats inputStat : js.getInputs()){ + LOG.info("\t"+inputStat.getDisplayString()); + } } } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1783304&r1=1783303&r2=1783304&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Thu Feb 16 21:26:32 2017 @@ -18,8 +18,12 @@ package org.apache.pig.tools.pigstats.spark; -import org.apache.hadoop.mapred.JobConf; +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener; import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; @@ -97,7 +101,21 @@ public class SparkStatsUtil { public static long getLoadSparkCounterValue(POLoad load) { SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance(); - return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load)); + int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan()); + return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load))/loadersCount; + } + + private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){ + List<PhysicalOperator> successors = pp.getSuccessors(op); + if (successors == null || successors.size()==0) return 1; + for (PhysicalOperator successor : successors){ + if (successor instanceof POSplit){ + return ((POSplit)successor).getPlans().size(); + }else{ + return countCoLoadsIfInSplit(successor,pp); + } + } + return 1; } public static boolean isJobSuccess(int jobID,