Author: daijy
Date: Tue Sep  8 18:01:39 2009
New Revision: 812599

URL: http://svn.apache.org/viewvc?rev=812599&view=rev
Log:
PIG-943: Pig crash when it cannot get counter from hadoop

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigWarning.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=812599&r1=812598&r2=812599&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Sep  8 18:01:39 2009
@@ -69,6 +69,8 @@
 
 BUG FIXES
 
+    PIG-943: Pig crash when it cannot get counter from hadoop (daijy)
+
     PIG-935: Skewed join throws an exception when used with map keys(sriranjan
     via pradeepkth)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/PigWarning.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigWarning.java?rev=812599&r1=812598&r2=812599&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigWarning.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigWarning.java Tue Sep  8 18:01:39 2009
@@ -58,5 +58,6 @@
     UNABLE_TO_SPILL,
     UNABLE_TO_CLOSE_SPILL_FILE,
     UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED,
-    USING_OVERLOADED_FUNCTION;
+    USING_OVERLOADED_FUNCTION,
+    NULL_COUNTER_COUNT;
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=812599&r1=812598&r2=812599&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Tue Sep  8 18:01:39 2009
@@ -253,8 +253,16 @@
         // scripts mess up the stats reporting from hadoop.
         List<String> rji = stats.getRootJobIDs();
         if (rji != null && rji.size() == 1 && finalStores == 1) {
-            log.info("Records written : " + stats.getRecordsWritten());
-            log.info("Bytes written : " + stats.getBytesWritten());
+            if(stats.getRecordsWritten()==-1) {
+                log.info("Records written : Unable to determine number of 
records written");
+            } else {
+                log.info("Records written : " + stats.getRecordsWritten());
+            }
+            if(stats.getBytesWritten()==-1) {
+                log.info("Bytes written : Unable to determine number of bytes 
written");
+            } else {
+                log.info("Bytes written : " + stats.getBytesWritten());
+            }
         }
 
         if (!failed) {
@@ -394,13 +402,29 @@
        try {
                runningJob = jobClient.getJob(mapRedJobID);
                if(runningJob != null) {
-               Counters counters = runningJob.getCounters();
-                       for(Enum e : PigWarning.values()) {
-                               Long currentCount = aggMap.get(e);
-                               currentCount = (currentCount == null? 0 : 
currentCount);
-                               currentCount += counters.getCounter(e);
-                               aggMap.put(e, currentCount);
-                       }
+                       Counters counters = runningJob.getCounters();
+                if (counters==null)
+                {
+                    long nullCounterCount = 
aggMap.get(PigWarning.NULL_COUNTER_COUNT)==null?0 : 
aggMap.get(PigWarning.NULL_COUNTER_COUNT);
+                    nullCounterCount++;
+                    aggMap.put(PigWarning.NULL_COUNTER_COUNT, 
nullCounterCount);
+                }
+                for (Enum e : PigWarning.values()) {
+                    if (e != PigWarning.NULL_COUNTER_COUNT) {
+                        Long currentCount = aggMap.get(e);
+                        currentCount = (currentCount == null ? 0 : 
currentCount);
+                        // This code checks if the counters is null, if it is,
+                        // we need to report to the user that the number
+                        // of warning aggregations may not be correct. In fact,
+                        // Counters should not be null, it is
+                        // a hadoop bug, once this bug is fixed in hadoop, the
+                        // null handling code should never be hit.
+                        // See Pig-943
+                        if (counters != null)
+                            currentCount += counters.getCounter(e);
+                        aggMap.put(e, currentCount);
+                    }
+                }
                }
        } catch (IOException ioe) {
                String msg = "Unable to retrieve job to compute warning 
aggregation.";

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java?rev=812599&r1=812598&r2=812599&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/plan/CompilationMessageCollector.java 
Tue Sep  8 18:01:39 2009
@@ -24,6 +24,7 @@
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
+import org.apache.pig.PigWarning;
 /***
  * This class is used for collecting all messages (error + warning) in 
  * compilation process. These messages are reported back to users 
@@ -131,12 +132,19 @@
     }
     
     public static void logAggregate(Map<Enum, Long> aggMap, MessageType 
messageType, Log log) {
-       for(Enum e: aggMap.keySet()) {
-               Long count = aggMap.get(e);
-               if(count != null && count > 0) {
-                       String message = "Encountered " + messageType + " " + 
e.toString() + " " + count + " time(s).";
-                       logMessage(message, messageType, log);
-               }
+        long nullCounterCount = 
aggMap.get(PigWarning.NULL_COUNTER_COUNT)==null?0 : 
aggMap.get(PigWarning.NULL_COUNTER_COUNT);
+        if (nullCounterCount!=0 && aggMap.size()>1) // 
PigWarning.NULL_COUNTER_COUNT is definitely in appMap
+            logMessage("Unable to retrieve hadoop counter for " + 
nullCounterCount + 
+                    " jobs, the number following warnings may not be correct", 
messageType, log);
+        for(Enum e: aggMap.keySet()) {
+            if (e!=PigWarning.NULL_COUNTER_COUNT)
+            {
+                Long count = aggMap.get(e);
+                if(count != null && count > 0) {
+                    String message = "Encountered " + messageType + " " + 
e.toString() + " " + count + " time(s).";
+                    logMessage(message, messageType, log);
+                }
+            }
        }       
     }
     

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=812599&r1=812598&r2=812599&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Tue Sep  8 
18:01:39 2009
@@ -147,22 +147,34 @@
                 Counters counters = null;
                 try {
                     counters = rj.getCounters();
-                    Counters.Group taskgroup = 
counters.getGroup("org.apache.hadoop.mapred.Task$Counter");
-                    Counters.Group hdfsgroup = 
counters.getGroup("org.apache.hadoop.mapred.Task$FileSystemCounter");
-
-                    jobStats.put("PIG_STATS_MAP_INPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("MAP_INPUT_RECORDS").getCounter())).toString());
-                    jobStats.put("PIG_STATS_MAP_OUTPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("MAP_OUTPUT_RECORDS").getCounter())).toString());
-                    jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
-                    jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
-                    jobStats.put("PIG_STATS_BYTES_WRITTEN", 
(Long.valueOf(hdfsgroup.getCounterForName("HDFS_WRITE").getCounter())).toString());
+                    // This code checks if the counters is null, if it is, 
then all the stats are unknown.
+                    // We use -1 to indicate unknown counter. In fact, 
Counters should not be null, it is
+                    // a hadoop bug, once this bug is fixed in hadoop, the 
null handling code should never be hit.
+                    // See Pig-943
+                    if (counters!=null)
+                    {
+                        Counters.Group taskgroup = 
counters.getGroup("org.apache.hadoop.mapred.Task$Counter");
+                        Counters.Group hdfsgroup = 
counters.getGroup("org.apache.hadoop.mapred.Task$FileSystemCounter");
+                        jobStats.put("PIG_STATS_MAP_INPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("MAP_INPUT_RECORDS").getCounter())).toString());
+                        jobStats.put("PIG_STATS_MAP_OUTPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("MAP_OUTPUT_RECORDS").getCounter())).toString());
+                        jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
+                        jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
+                        jobStats.put("PIG_STATS_BYTES_WRITTEN", 
(Long.valueOf(hdfsgroup.getCounterForName("HDFS_WRITE").getCounter())).toString());
+                    }
+                    else
+                    {
+                        jobStats.put("PIG_STATS_MAP_INPUT_RECORDS", "-1");
+                        jobStats.put("PIG_STATS_MAP_OUTPUT_RECORDS", "-1");
+                        jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", "-1");
+                        jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", "-1");
+                        jobStats.put("PIG_STATS_BYTES_WRITTEN", "-1");
+                    }
+                    
                 } catch (IOException e) {
                     // TODO Auto-generated catch block
                     String error = "Unable to get the counters.";
                     throw new ExecException(error, e);
                 }
-                
-            
-            
         }
         
         getLastJobIDs(jc.getSuccessfulJobs());
@@ -225,9 +237,21 @@
             if (jobStats == null) continue;
             String reducePlan = jobStats.get("PIG_STATS_REDUCE_PLAN");
                if(reducePlan == null) {
-               records += 
Long.parseLong(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+                   if 
(Long.parseLong(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"))==-1L)
+                {
+                       records = -1;
+                    break;
+                }
+                   else
+                       records += 
Long.parseLong(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
                } else {
-               records += 
Long.parseLong(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+                   if 
(Long.parseLong(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"))==-1L)
+                {
+                    records = -1;
+                    break;
+                }
+                else
+                    records += 
Long.parseLong(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
                }
         }
        return records;
@@ -255,6 +279,11 @@
         for (String jid : rootJobIDs) {
             Map<String, String> jobStats = stats.get(jid);
             if (jobStats == null) continue;
+            if (Long.parseLong(jobStats.get("PIG_STATS_BYTES_WRITTEN"))==-1L)
+            {
+                bytesWritten = -1L;
+                break;
+            }
             bytesWritten += 
Long.parseLong(jobStats.get("PIG_STATS_BYTES_WRITTEN"));
         }
         return bytesWritten;


Reply via email to