Author: dvryaboy Date: Thu Jun 17 17:24:00 2010 New Revision: 955682 URL: http://svn.apache.org/viewvc?rev=955682&view=rev Log: PIG-1428: Make a StatusReporter singleton available for incrementing counters (dvryaboy)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Jun 17 17:24:00 2010 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-1428: Make a StatusReporter singleton available for incrementing counters (dvryaboy) + PIG-972: Make describe work with nested foreach (aniket486 via daijy) PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Thu Jun 17 17:24:00 2010 @@ -51,8 +51,8 @@ public class MapReducePOStoreImpl extend // configuration below (like setting the output location) do // not affect the caller's copy Configuration outputConf = new Configuration(context.getConfiguration()); - - reporter = new PigStatusReporter(context); + PigStatusReporter.setContext(context); + reporter = PigStatusReporter.getInstance(); // make a copy of the Context to use here - since in the same // task (map or reduce) we could have multiple stores, we should Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Thu Jun 17 17:24:00 2010 @@ -135,7 +135,8 @@ public class PigCombiner { PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); pigHadoopLogger.setAggregate(aggregateWarning); - pigHadoopLogger.setReporter(new PigStatusReporter(context)); + PigStatusReporter.setContext(context); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); PhysicalOperator.setPigLogger(pigHadoopLogger); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Thu Jun 17 17:24:00 2010 @@ -19,8 +19,6 @@ package org.apache.pig.backend.hadoop.ex import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger; import org.apache.pig.tools.pigstats.PigStatusReporter; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Thu Jun 17 17:24:00 2010 @@ -212,7 +212,8 @@ public abstract class PigMapBase extends PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); pigHadoopLogger.setAggregate(aggregateWarning); - pigHadoopLogger.setReporter(new PigStatusReporter(context)); + PigStatusReporter.setContext(context); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); PhysicalOperator.setPigLogger(pigHadoopLogger); } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Jun 17 17:24:00 2010 @@ -349,7 +349,8 @@ public class PigMapReduce { PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); pigHadoopLogger.setAggregate(aggregateWarning); - pigHadoopLogger.setReporter(new PigStatusReporter(context)); + PigStatusReporter.setContext(context); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); PhysicalOperator.setPigLogger(pigHadoopLogger); @@ -561,7 +562,8 @@ public class PigMapReduce { PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); pigHadoopLogger.setAggregate(aggregateWarning); - pigHadoopLogger.setReporter(new PigStatusReporter(context)); + PigStatusReporter.setContext(context); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); PhysicalOperator.setPigLogger(pigHadoopLogger); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Thu Jun 17 17:24:00 2010 @@ -17,10 +17,15 @@ */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer; +import org.apache.pig.classification.InterfaceAudience; +import org.apache.pig.classification.InterfaceStability; + /** * * An interface to allow aggregation of messages */ +...@interfaceaudience.private +...@interfacestability.evolving public interface PigLogger { /** Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Thu Jun 17 17:24:00 2010 @@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger; import org.apache.pig.impl.util.BagFormat; import org.apache.pig.impl.util.Spillable; +import org.apache.pig.tools.pigstats.PigStatusReporter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -373,10 +374,12 @@ public abstract class DefaultAbstractBag } protected void incSpillCount(Enum counter) { - // Increment the spill count - // warn is a misnomer. The function updates the counter. If the update - // fails, it dumps a warning - PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter); + PigStatusReporter reporter = PigStatusReporter.getInstance(); + if (reporter != null) { + reporter.getCounter(counter).increment(1); + } else { + PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter); + } } public static abstract class BagDelimiterTuple extends DefaultTuple{} Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=955682&r1=955681&r2=955682&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Thu Jun 17 17:24:00 2010 @@ -22,13 +22,32 @@ import org.apache.hadoop.mapreduce.Count import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.util.Progressable; +import org.apache.pig.classification.InterfaceAudience; +import org.apache.pig.classification.InterfaceStability; @SuppressWarnings("unchecked") +...@interfaceaudience.public +...@interfacestability.evolving public class PigStatusReporter extends StatusReporter implements Progressable { private TaskInputOutputContext context; + private static PigStatusReporter reporter = null; + /** + * Get singleton instance of the context + * @param context + */ + public static PigStatusReporter getInstance() { + if (reporter == null) { + reporter = new PigStatusReporter(null); + } + return reporter; + } + + public static void setContext(TaskInputOutputContext context) { + reporter = new PigStatusReporter(context); + } - public PigStatusReporter(TaskInputOutputContext context) { + private PigStatusReporter(TaskInputOutputContext context) { this.context = context; }