Author: olga
Date: Thu Dec 24 22:20:28 2009
New Revision: 893815
URL: http://svn.apache.org/viewvc?rev=893815&view=rev
Log:
PIG-1102: Collect number of spills per job (sriranjan via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Dec 24 22:20:28 2009
@@ -24,6 +24,8 @@
IMPROVEMENTS
+PIG-1102: Collect number of spills per job (sriranjan via olgan)
+
PIG-1149: Allow instantiation of SampleLoaders with parametrized LoadFuncs
(dvryaboy via olgan)
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Thu Dec 24
22:20:28 2009
@@ -28,9 +28,11 @@
import java.util.Iterator;
import java.util.ArrayList;
+import org.apache.pig.PigCounters;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
import org.apache.pig.impl.util.BagFormat;
@@ -371,7 +373,14 @@
log.warn(msg, e);
}
}
-
+
+ protected void incSpillCount(Enum counter) {
+ // Increment the spill count
+ // warn is a misnomer. The function updates the counter. If the update
+ // fails, it dumps a warning
+ PigHadoopLogger.getInstance().warn(this, "Spill counter incremented",
counter);
+ }
+
public static abstract class BagDelimiterTuple extends DefaultTuple{}
public static class StartBag extends BagDelimiterTuple{
private static final long serialVersionUID = 1L;}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Thu Dec 24
22:20:28 2009
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
import org.apache.pig.PigWarning;
@@ -122,6 +123,8 @@
}
mContents.clear();
}
+ // Increment the spill count
+ incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
return spilled;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Thu Dec 24
22:20:28 2009
@@ -36,7 +36,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
import org.apache.pig.PigWarning;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
@@ -182,6 +184,8 @@
}
mContents.clear();
}
+ // Increment the spill count
+ incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
return spilled;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Thu Dec 24
22:20:28 2009
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -99,8 +100,14 @@
log.debug("Memory can hold "+ mContents.size()
+ " records, put the rest in spill file.");
}
out = getSpillFile();
+
}
t.write(out);
+
+ if (cacheLimit!= 0 && mContents.size() % cacheLimit == 0) {
+ /* Increment the spill count*/
+ incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
+ }
}
catch(IOException e) {
throw new RuntimeException(e);
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Dec
24 22:20:28 2009
@@ -37,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
import org.apache.pig.PigWarning;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -228,7 +229,9 @@
mContents.clear();
mMemSizeChanged = true;
memUsage = 0;
-
+
+ // Increment the spill count
+ incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
return spilled;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Thu Dec 24
22:20:28 2009
@@ -38,6 +38,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -238,6 +239,8 @@
mMemSizeChanged = true;
memUsage = 0;
+ // Increment the spill count
+ incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
return spilled;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Thu Dec 24
22:20:28 2009
@@ -35,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
import org.apache.pig.PigWarning;
@@ -155,6 +156,8 @@
}
mContents.clear();
}
+ // Increment the spill count
+ incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
return spilled;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=893815&r1=893814&r2=893815&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Thu Dec 24
22:20:28 2009
@@ -39,11 +39,13 @@
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ExecType;
+import org.apache.pig.PigCounters;
import org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.impl.util.ObjectSerializer;
public class PigStats {
@@ -186,6 +188,9 @@
jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS",
(Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS",
(Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
jobStats.put("PIG_STATS_BYTES_WRITTEN",
(Long.valueOf(hdfsgroup.getCounterForName("HDFS_BYTES_WRITTEN").getCounter())).toString());
+ jobStats.put("PIG_STATS_SMM_SPILL_COUNT",
(Long.valueOf(counters.findCounter(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT).getCounter())).toString()
);
+ jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT",
(Long.valueOf(counters.findCounter(PigCounters.PROACTIVE_SPILL_COUNT).getCounter())).toString()
);
+
}
else
{
@@ -194,6 +199,8 @@
jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", "-1");
jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", "-1");
jobStats.put("PIG_STATS_BYTES_WRITTEN", "-1");
+ jobStats.put("PIG_STATS_SMM_SPILL_COUNT", "-1");
+ jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", "-1");
}
} catch (IOException e) {
@@ -294,6 +301,21 @@
}
+ public long getSMMSpillCount() {
+ long spillCount = 0;
+ for (String jid : rootJobIDs) {
+ Map<String, String> jobStats = stats.get(jid);
+ if (jobStats == null) continue;
+ if (Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"))==-1L)
+ {
+ spillCount = -1L;
+ break;
+ }
+ spillCount +=
Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"));
+ }
+ return spillCount;
+ }
+
private long getLocalBytesWritten() {
for(PhysicalOperator op : php.getLeaves())
return
Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN"));