Author: olga
Date: Thu Dec 24 22:20:28 2009
New Revision: 893815

URL: http://svn.apache.org/viewvc?rev=893815&view=rev
Log:
PIG-1102: Collect number of spills per job (sriranjan via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Dec 24 22:20:28 2009
@@ -24,6 +24,8 @@
 
 IMPROVEMENTS
 
+PIG-1102: Collect number of spills per job (sriranjan via olgan)
+
 PIG-1149: Allow instantiation of SampleLoaders with parametrized LoadFuncs
 (dvryaboy via olgan)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Thu Dec 24 
22:20:28 2009
@@ -28,9 +28,11 @@
 import java.util.Iterator;
 import java.util.ArrayList;
 
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.impl.util.BagFormat;
@@ -371,7 +373,14 @@
                log.warn(msg, e);
        }       
     }
-
+    
+    protected void incSpillCount(Enum counter) {
+        // Increment the spill count
+        // warn is a misnomer. The function updates the counter. If the update
+        // fails, it dumps a warning
+        PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", 
counter);
+    }
+    
     public static abstract class BagDelimiterTuple extends DefaultTuple{}
     public static class StartBag extends BagDelimiterTuple{
         private static final long serialVersionUID = 1L;}

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Thu Dec 24 
22:20:28 2009
@@ -30,6 +30,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
 
 
@@ -122,6 +123,8 @@
             }
             mContents.clear();
         }
+        // Increment the spill count
+        incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Thu Dec 24 
22:20:28 2009
@@ -36,7 +36,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 
 
 
@@ -182,6 +184,8 @@
             }
             mContents.clear();
         }
+        // Increment the spill count
+        incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Thu Dec 24 
22:20:28 2009
@@ -22,6 +22,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 
 
@@ -99,8 +100,14 @@
                                log.debug("Memory can hold "+ mContents.size() 
+ " records, put the rest in spill file.");
                        }
                     out = getSpillFile();
+
                 }
                 t.write(out);
+                
+                if (cacheLimit!= 0 && mContents.size() % cacheLimit == 0) {
+                    /* Increment the spill count*/
+                    incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);          
          
+                }
             }
             catch(IOException e) {
                 throw new RuntimeException(e);

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Dec 
24 22:20:28 2009
@@ -37,6 +37,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 
@@ -228,7 +229,9 @@
         mContents.clear();
         mMemSizeChanged = true;
         memUsage = 0;
-                
+
+        // Increment the spill count
+        incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Thu Dec 24 
22:20:28 2009
@@ -38,6 +38,7 @@
   
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -238,6 +239,8 @@
         mMemSizeChanged = true;
         memUsage = 0;
         
+        // Increment the spill count
+        incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Thu Dec 24 
22:20:28 2009
@@ -35,6 +35,7 @@
   
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
 
 
@@ -155,6 +156,8 @@
             }
             mContents.clear();
         }
+        // Increment the spill count
+        incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Thu Dec 24 
22:20:28 2009
@@ -39,11 +39,13 @@
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigCounters;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigStats {
@@ -186,6 +188,9 @@
                         jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
                         jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", 
(Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
                         jobStats.put("PIG_STATS_BYTES_WRITTEN", 
(Long.valueOf(hdfsgroup.getCounterForName("HDFS_BYTES_WRITTEN").getCounter())).toString());
+                        jobStats.put("PIG_STATS_SMM_SPILL_COUNT", 
(Long.valueOf(counters.findCounter(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT).getCounter())).toString()
 );
+                        jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", 
(Long.valueOf(counters.findCounter(PigCounters.PROACTIVE_SPILL_COUNT).getCounter())).toString()
 );
+
                     }
                     else
                     {
@@ -194,6 +199,8 @@
                         jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", "-1");
                         jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", "-1");
                         jobStats.put("PIG_STATS_BYTES_WRITTEN", "-1");
+                        jobStats.put("PIG_STATS_SMM_SPILL_COUNT", "-1");
+                        jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", "-1");
                     }
                     
                 } catch (IOException e) {
@@ -294,6 +301,21 @@
        
     }
     
+    public long getSMMSpillCount() {
+        long spillCount = 0;
+        for (String jid : rootJobIDs) {
+            Map<String, String> jobStats = stats.get(jid);
+            if (jobStats == null) continue;
+            if (Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"))==-1L)
+            {
+                spillCount = -1L;
+                break;
+            }
+            spillCount += 
Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"));
+        }
+        return spillCount;
+    }
+    
     private long getLocalBytesWritten() {
        for(PhysicalOperator op : php.getLeaves())
                return 
Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN"));


Reply via email to