Author: gates
Date: Fri Jun  5 16:26:15 2009
New Revision: 782057

URL: http://svn.apache.org/viewvc?rev=782057&view=rev
Log:
PIG-831: Turned off reporting of records and bytes written for mutli-store
queries as the returned results are confusing and wrong.


Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=782057&r1=782056&r2=782057&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jun  5 16:26:15 2009
@@ -26,6 +26,9 @@
 
 IMPROVEMENTS
 
+PIG-831: Turned off reporting of records and bytes written for mutli-store
+queries as the returned results are confusing and wrong. (gates)
+
 PIG-813: documentation updates (chandec via olgan)
 
 PIG-825: PIG_HADOOP_VERSION should be set to 18 (dvryaboy via gates).

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=782057&r1=782056&r2=782057&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Fri Jun  5 16:26:15 2009
@@ -192,6 +192,7 @@
         log.info( "100% complete");
 
         boolean failed = false;
+        int finalStores = 0;
         // Look to see if any jobs failed.  If so, we need to report that.
         if (failedJobs != null && failedJobs.size() > 0) {
             log.error(failedJobs.size()+" map reduce job(s) failed!");
@@ -210,6 +211,7 @@
                     if (!st.isTmpStore()) {
                         failedStores.add(st.getSFile());
                         failureMap.put(st.getSFile(), backendException);
+                        finalStores++;
                     }
 
                     
FileLocalizer.registerDeleteOnFail(st.getSFile().getFileName(), pc);
@@ -227,6 +229,7 @@
                 for (POStore st: sts) {
                     if (!st.isTmpStore()) {
                         succeededStores.add(st.getSFile());
+                        finalStores++;
                     }
                     log.info("Successfully stored result in: 
\""+st.getSFile().getFileName()+"\"");
                 }
@@ -241,9 +244,10 @@
             CompilationMessageCollector.logAggregate(warningAggMap, 
MessageType.Warning, log) ;
         }
         
-        if(stats.getPigStats().get(stats.getLastJobID()) == null)
-            log.warn("Jobs not found in the JobClient. Please try to use 
Local, Hadoop Distributed or Hadoop MiniCluster modes instead of Hadoop 
LocalExecution");
-        else {
+        // Report records and bytes written.  Only do this in the single store 
case.  Multi-store
+        // scripts mess up the stats reporting from hadoop.
+        List<String> rji = stats.getRootJobIDs();
+        if (rji != null && rji.size() == 1 && finalStores == 1) {
             log.info("Records written : " + stats.getRecordsWritten());
             log.info("Bytes written : " + stats.getBytesWritten());
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=782057&r1=782056&r2=782057&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Fri Jun  5 
16:26:15 2009
@@ -21,7 +21,6 @@
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,8 +33,6 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.ExecType;
@@ -45,7 +42,6 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigStats {
@@ -54,7 +50,8 @@
     JobControl jc;
     JobClient jobClient;
     Map<String, Map<String, String>> stats = new HashMap<String, 
Map<String,String>>();
-    String lastJobID;
+    // String lastJobID;
+    ArrayList<String> rootJobIDs = new ArrayList<String>();
     ExecType mode;
     
     public void setMROperatorPlan(MROperPlan mrp) {
@@ -112,8 +109,6 @@
     
     private Map<String, Map<String, String>> accumulateMRStats() throws 
ExecException {
         
-        Job lastJob = getLastJob(jc.getSuccessfulJobs());
-        
         for(Job job : jc.getSuccessfulJobs()) {
             
             
@@ -170,12 +165,14 @@
             
         }
         
-        if (lastJob != null) lastJobID = lastJob.getAssignedJobID().toString();
+        getLastJobIDs(jc.getSuccessfulJobs());
+        
         return stats;
     }
     
 
-    private Job getLastJob(List<Job> jobs) {
+    private void getLastJobIDs(List<Job> jobs) {
+        rootJobIDs.clear();
          Set<Job> temp = new HashSet<Job>();
          for(Job job : jobs) {
              if(job.getDependingJobs() != null && 
job.getDependingJobs().size() > 0)
@@ -185,17 +182,13 @@
          //difference between temp and jobs would be the set of leaves
          //we can safely assume there would be only one leaf
          for(Job job : jobs) {
-             if(temp.contains(job))
-                 continue;
-             else
-                 //this means a leaf
-                 return job;
+             if(temp.contains(job)) continue;
+             else rootJobIDs.add(job.getAssignedJobID().toString());
          }
-         return null;
     }
     
-    public String getLastJobID() {
-        return lastJobID;
+    public List<String> getRootJobIDs() {
+        return rootJobIDs;
     }
     
     public Map<String, Map<String, String>> getPigStats() {
@@ -226,14 +219,18 @@
      * @return
      */
     private long getRecordsCountMR() {
-        String reducePlan = stats.get(lastJobID).get("PIG_STATS_REDUCE_PLAN");
-        String records = null;
-        if(reducePlan == null) {
-            records = stats.get(lastJobID).get("PIG_STATS_MAP_OUTPUT_RECORDS");
-        } else {
-            records = 
stats.get(lastJobID).get("PIG_STATS_REDUCE_OUTPUT_RECORDS");
+        long records = 0;
+        for (String jid : rootJobIDs) {
+            Map<String, String> jobStats = stats.get(jid);
+            if (jobStats == null) continue;
+            String reducePlan = jobStats.get("PIG_STATS_REDUCE_PLAN");
+               if(reducePlan == null) {
+               records += 
Long.parseLong(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+               } else {
+               records += 
Long.parseLong(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+               }
         }
-        return Long.parseLong(records);
+       return records;
     }
     
     public long getBytesWritten() {
@@ -254,7 +251,13 @@
     }
     
     private long getMapReduceBytesWritten() {
-        return 
Long.parseLong(stats.get(lastJobID).get("PIG_STATS_BYTES_WRITTEN"));
+        long bytesWritten = 0;
+        for (String jid : rootJobIDs) {
+            Map<String, String> jobStats = stats.get(jid);
+            if (jobStats == null) continue;
+            bytesWritten += 
Long.parseLong(jobStats.get("PIG_STATS_BYTES_WRITTEN"));
+        }
+        return bytesWritten;
     }
     
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=782057&r1=782056&r2=782057&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Fri Jun  5 
16:26:15 2009
@@ -28,14 +28,12 @@
 
 import junit.framework.TestCase;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestCounters extends TestCase {
@@ -499,7 +497,7 @@
             System.out.println("============================================");
         }
 
-        Map<String, String> jobStats = stats.get(pigStats.getLastJobID());
+        Map<String, String> jobStats = 
stats.get(pigStats.getRootJobIDs().get(0));
 
         System.out.println("Map input records : " + 
jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
         assertEquals(MAX, 
Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));


Reply via email to