Author: rding
Date: Mon Sep 27 19:56:28 2010
New Revision: 1001893

URL: http://svn.apache.org/viewvc?rev=1001893&view=rev
Log:
PIG-1641: Incorrect counters in local mode

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=1001893&r1=1001892&r2=1001893&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Sep 27 19:56:28 2010
@@ -207,6 +207,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1641: Incorrect counters in local mode (rding)
+
 PIG-1647: Logical simplifier throws a NPE (yanz)
 
 PIG-1642: Order by doesn't use estimation to determine the parallelism (rding)

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java?rev=1001893&r1=1001892&r2=1001893&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java Mon Sep 
27 19:56:28 2010
@@ -84,9 +84,9 @@ public final class InputStats {
         return type;
     }
     
-    String getDisplayString() {
+    String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
-        if (success) {
+        if (success) {            
             sb.append("Successfully ");
             if (type == INPUT_TYPE.sampler) {
                 sb.append("sampled ");
@@ -95,7 +95,12 @@ public final class InputStats {
             } else {
                 sb.append("read ");
             }
-            sb.append(records).append(" records ");
+            
+            if (!local) {
+                sb.append(records).append(" records ");
+            } else {
+                sb.append("records ");
+            }
             if (bytes > 0) {
                 sb.append("(").append(bytes).append(" bytes) ");
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1001893&r1=1001892&r2=1001893&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Mon Sep 27 
19:56:28 2010
@@ -64,7 +64,16 @@ public final class JobStats extends Oper
         
     public static final String ALIAS = "JobStatistics:alias";
     public static final String FEATURE = "JobStatistics:feature";
+    
+    public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" +
+               "MaxMapTime\tMinMapTIme\tAvgMapTime\tMaxReduceTime\t" +
+               "MinReduceTime\tAvgReduceTime\tAlias\tFeature\tOutputs";
    
+    public static final String FAILURE_HEADER = 
"JobId\tAlias\tFeature\tMessage\tOutputs";
+    
+    // currently counters are not working in local mode - see PIG-1286
+    public static final String SUCCESS_HEADER_LOCAL = 
"JobId\tAlias\tFeature\tOutputs";
+    
     private static final Log LOG = LogFactory.getLog(JobStats.class);
     
     public static enum JobState { UNKNOWN, SUCCESS, FAILED; }
@@ -280,14 +289,16 @@ public final class JobStats extends Oper
         avgReduceTime = avg;       
     }  
     
-    String getDisplayString() {
+    String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
         String id = (jobId == null) ? "N/A" : jobId.toString();
-        if (state == JobState.FAILED) {           
+        if (state == JobState.FAILED || local) {           
             sb.append(id).append("\t")
                 .append(getAlias()).append("\t")
-                .append(getFeature()).append("\t")
-                .append("Message: ").append(getErrorMessage()).append("\t");
+                .append(getFeature()).append("\t");
+            if (state == JobState.FAILED) {
+                sb.append("Message: ").append(getErrorMessage()).append("\t");
+            }
         } else if (state == JobState.SUCCESS) {
             sb.append(id).append("\t")
                 .append(numberMaps).append("\t")

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1001893&r1=1001892&r2=1001893&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java Mon Sep 
27 19:56:28 2010
@@ -92,11 +92,15 @@ public final class OutputStats {
         return conf;
     }
     
-    String getDisplayString() {
+    String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
         if (success) {
-            sb.append("Successfully stored ").append(records).append(
-                    " records ");
+            sb.append("Successfully stored ");
+            if (!local) {
+                sb.append(records).append(" records ");
+            } else {
+                sb.append("records ");
+            }
             if (bytes > 0) {
                 sb.append("(").append(bytes).append(" bytes) ");
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1001893&r1=1001892&r2=1001893&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon Sep 27 
19:56:28 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigRunner.ReturnCode;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
@@ -559,6 +560,17 @@ public final class PigStats {
             LOG.warn("unknown return code, can't display the results");
             return;
         }
+        if (pigContext == null) {
+            LOG.warn("unknown exec type, don't display the results");
+            return;
+        }
+ 
+        // currently counters are not working in local mode - see PIG-1286
+        ExecType execType = pigContext.getExecType();
+        if (execType == ExecType.LOCAL) {
+            LOG.info("Detected Local mode. Stats reported below may be 
incomplete");
+        }
+        
         SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
         StringBuilder sb = new StringBuilder();
         
sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
@@ -576,47 +588,52 @@ public final class PigStats {
             sb.append("Failed!\n");
         }
         sb.append("\n");
+                
         if (returnCode == ReturnCode.SUCCESS 
-                || returnCode == ReturnCode.PARTIAL_FAILURE) {
+                || returnCode == ReturnCode.PARTIAL_FAILURE) {            
             sb.append("Job Stats (time in seconds):\n");
-            sb.append("JobId\tMaps\tReduces\tMaxMapTime\tMinMapTIme\t" +
-                    
"AvgMapTime\tMaxReduceTime\tMinReduceTime\tAvgReduceTime\t" +
-                    "Alias\tFeature\tOutputs\n");
+            if (execType == ExecType.LOCAL) {
+                sb.append(JobStats.SUCCESS_HEADER_LOCAL).append("\n");
+            } else {
+                sb.append(JobStats.SUCCESS_HEADER).append("\n");
+            }
             List<JobStats> arr = jobPlan.getSuccessfulJobs();
             for (JobStats js : arr) {                
-                sb.append(js.getDisplayString());
+                sb.append(js.getDisplayString(execType == ExecType.LOCAL));
             }
             sb.append("\n");
         }
         if (returnCode == ReturnCode.FAILURE
                 || returnCode == ReturnCode.PARTIAL_FAILURE) {
             sb.append("Failed Jobs:\n");
-            sb.append("JobId\tAlias\tFeature\tMessage\tOutputs\n");
+            sb.append(JobStats.FAILURE_HEADER).append("\n");
             List<JobStats> arr = jobPlan.getFailedJobs();
             for (JobStats js : arr) {   
-                sb.append(js.getDisplayString());
+                sb.append(js.getDisplayString(execType == ExecType.LOCAL));
             }
             sb.append("\n");
         }
         sb.append("Input(s):\n");
         for (InputStats is : getInputStats()) {
-            sb.append(is.getDisplayString());
+            sb.append(is.getDisplayString(execType == ExecType.LOCAL));
         }
         sb.append("\n");
         sb.append("Output(s):\n");
         for (OutputStats ds : getOutputStats()) {
-            sb.append(ds.getDisplayString());
+            sb.append(ds.getDisplayString(execType == ExecType.LOCAL));
         }
         
-        sb.append("\nCounters:\n");
-        sb.append("Total records written : " + 
getRecordWritten()).append("\n");
-        sb.append("Total bytes written : " + getBytesWritten()).append("\n");
-        sb.append("Spillable Memory Manager spill count : "
-                + getSMMSpillCount()).append("\n");
-        sb.append("Total bags proactively spilled: " 
-                + getProactiveSpillCountObjects()).append("\n");
-        sb.append("Total records proactively spilled: " 
-                + getProactiveSpillCountRecords()).append("\n");
+        if (execType != ExecType.LOCAL) {
+            sb.append("\nCounters:\n");
+            sb.append("Total records written : " + 
getRecordWritten()).append("\n");
+            sb.append("Total bytes written : " + 
getBytesWritten()).append("\n");
+            sb.append("Spillable Memory Manager spill count : "
+                    + getSMMSpillCount()).append("\n");
+            sb.append("Total bags proactively spilled: " 
+                    + getProactiveSpillCountObjects()).append("\n");
+            sb.append("Total records proactively spilled: " 
+                    + getProactiveSpillCountRecords()).append("\n");
+        }
         
         sb.append("\nJob DAG:\n").append(jobPlan.toString());
         


Reply via email to