Author: zly
Date: Thu Feb 16 21:26:32 2017
New Revision: 1783304

URL: http://svn.apache.org/viewvc?rev=1783304&view=rev
Log:
PIG-4899: The number of records of input file is calculated wrongly in spark 
mode in multiquery case (Adam via Liyun)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1783304&r1=1783303&r2=1783304&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Thu Feb 16 21:26:32 2017
@@ -162,13 +162,8 @@ public class LoadConverter implements RD
         private SparkEngineConf sparkEngineConf;
         private boolean initialized;
 
-        //LoadConverter#ToTupleFunction is executed more than once in 
multiquery case causing
-        //invalid number of input records, 'skip' flag below indicates first 
load is finished.
-        private boolean skip;
-
         public ToTupleFunction(SparkEngineConf sparkEngineConf){
                this.sparkEngineConf = sparkEngineConf;
-
         }
 
         @Override
@@ -177,14 +172,9 @@ public class LoadConverter implements RD
                 long partitionId = TaskContext.get().partitionId();
                 
PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, 
Long.toString(partitionId));
 
-                //We're in POSplit and already counted all input records,
-                //in a multiquery case skip will be set to true after the 
first load is finished:
-                if (sparkCounters != null && 
SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName, 
counterName).getValue() > 0) {
-                    skip=true;
-                }
                 initialized = true;
             }
-            if (sparkCounters != null && disableCounter == false && skip == 
false) {
+            if (sparkCounters != null && disableCounter == false) {
                 sparkCounters.increment(counterGroupName, counterName, 1L);
             }
             return v1._2();

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1783304&r1=1783303&r2=1783304&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java 
Thu Feb 16 21:26:32 2017
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -133,6 +134,9 @@ public class SparkPigStats extends PigSt
                 Map.Entry pairs = (Map.Entry)statIt.next();
                 LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue());
             }
+            for (InputStats inputStat : js.getInputs()){
+                LOG.info("\t"+inputStat.getDisplayString());
+            }
         }
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1783304&r1=1783303&r2=1783304&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java 
Thu Feb 16 21:26:32 2017
@@ -18,8 +18,12 @@
 
 package org.apache.pig.tools.pigstats.spark;
 
-import org.apache.hadoop.mapred.JobConf;
+import java.util.List;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
@@ -97,7 +101,21 @@ public class SparkStatsUtil {
 
     public static long getLoadSparkCounterValue(POLoad load) {
         SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
-        return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, 
getLoadSparkCounterName(load));
+        int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan());
+        return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, 
getLoadSparkCounterName(load))/loadersCount;
+    }
+
+    private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan 
pp){
+        List<PhysicalOperator> successors = pp.getSuccessors(op);
+        if (successors == null || successors.size()==0) return 1;
+        for (PhysicalOperator successor : successors){
+            if (successor instanceof POSplit){
+                return ((POSplit)successor).getPlans().size();
+            }else{
+                return countCoLoadsIfInSplit(successor,pp);
+            }
+        }
+        return 1;
     }
 
     public static boolean isJobSuccess(int jobID,


Reply via email to