Author: olga Date: Thu Dec 24 22:20:28 2009 New Revision: 893815 URL: http://svn.apache.org/viewvc?rev=893815&view=rev Log: PIG-1102: Collect number of spills per job (sriranjan via olgan)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Dec 24 22:20:28 2009 @@ -24,6 +24,8 @@ IMPROVEMENTS +PIG-1102: Collect number of spills per job (sriranjan via olgan) + PIG-1149: Allow instantiation of SampleLoaders with parametrized LoadFuncs (dvryaboy via olgan) Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Thu Dec 24 22:20:28 2009 @@ -28,9 +28,11 @@ import java.util.Iterator; import java.util.ArrayList; +import org.apache.pig.PigCounters; import org.apache.pig.PigException; import org.apache.pig.PigWarning; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger; import org.apache.pig.impl.util.BagFormat; @@ -371,7 +373,14 @@ log.warn(msg, e); } } - + + protected void incSpillCount(Enum counter) { + // Increment the spill count + // warn is a misnomer. The function updates the counter. If the update + // fails, it dumps a warning + PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter); + } + public static abstract class BagDelimiterTuple extends DefaultTuple{} public static class StartBag extends BagDelimiterTuple{ private static final long serialVersionUID = 1L;} Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Thu Dec 24 22:20:28 2009 @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigCounters; import org.apache.pig.PigWarning; @@ -122,6 +123,8 @@ } mContents.clear(); } + // Increment the spill count + incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); return spilled; } Modified: hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Thu Dec 24 22:20:28 2009 @@ -36,7 +36,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigCounters; import org.apache.pig.PigWarning; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; @@ -182,6 +184,8 @@ } mContents.clear(); } + // Increment the spill count + incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); return spilled; } Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Thu Dec 24 22:20:28 2009 @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigCounters; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; @@ -99,8 +100,14 @@ log.debug("Memory can hold "+ mContents.size() + " records, put the rest in spill file."); } out = getSpillFile(); + } t.write(out); + + if (cacheLimit!= 0 && mContents.size() % cacheLimit == 0) { + /* Increment the spill count*/ + incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT); + } } catch(IOException e) { throw new RuntimeException(e); Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Dec 24 22:20:28 2009 @@ -37,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigCounters; import org.apache.pig.PigWarning; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; @@ -228,7 +229,9 @@ mContents.clear(); mMemSizeChanged = true; memUsage = 0; - + + // Increment the spill count + incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT); return spilled; } Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Thu Dec 24 22:20:28 2009 @@ -38,6 +38,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigCounters; import org.apache.pig.PigWarning; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; @@ -238,6 +239,8 @@ mMemSizeChanged = true; memUsage = 0; + // Increment the spill count + incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT); return spilled; } Modified: hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Thu Dec 24 22:20:28 2009 @@ -35,6 +35,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigCounters; import org.apache.pig.PigWarning; @@ -155,6 +156,8 @@ } mContents.clear(); } + // Increment the spill count + incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); return spilled; } Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=893815&r1=893814&r2=893815&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Thu Dec 24 22:20:28 2009 @@ -39,11 +39,13 @@ import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.pig.ExecType; +import org.apache.pig.PigCounters; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.data.BagFactory; import org.apache.pig.impl.util.ObjectSerializer; public class PigStats { @@ -186,6 +188,9 @@ jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString()); jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString()); jobStats.put("PIG_STATS_BYTES_WRITTEN", (Long.valueOf(hdfsgroup.getCounterForName("HDFS_BYTES_WRITTEN").getCounter())).toString()); + jobStats.put("PIG_STATS_SMM_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT).getCounter())).toString() ); + jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.PROACTIVE_SPILL_COUNT).getCounter())).toString() ); + } else { @@ -194,6 +199,8 @@ jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", "-1"); jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", "-1"); jobStats.put("PIG_STATS_BYTES_WRITTEN", "-1"); + jobStats.put("PIG_STATS_SMM_SPILL_COUNT", "-1"); + jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", "-1"); } } catch (IOException e) { @@ -294,6 +301,21 @@ } + public long getSMMSpillCount() { + long spillCount = 0; + for (String jid : rootJobIDs) { + Map<String, String> jobStats = stats.get(jid); + if (jobStats == null) continue; + if (Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"))==-1L) + { + spillCount = -1L; + break; + } + spillCount += Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT")); + } + return spillCount; + } + private long getLocalBytesWritten() { for(PhysicalOperator op : php.getLeaves()) return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN"));