Author: dvryaboy
Date: Wed Jul 14 06:14:33 2010
New Revision: 963952

URL: http://svn.apache.org/viewvc?rev=963952&view=rev
Log:
PIG-1428: Make a StatusReporter singleton available for incrementing counters 
(dvryaboy)

Added:
    
hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Modified:
    hadoop/pig/branches/branch-0.7/CHANGES.txt
    
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    
hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java

Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=963952&r1=963951&r2=963952&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Wed Jul 14 06:14:33 2010
@@ -22,9 +22,6 @@ Release 0.7.0 - 2010-05-03
 
 INCOMPATIBLE CHANGES
 
-PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs
-(rding)
-
 PIG-1292: Interface Refinements (hashutosh)
 
 PIG-1259: ResourceFieldSchema.setSchema should not allow a bag field without a
@@ -71,6 +68,11 @@ manner (rding via pradeepkth)
 
 IMPROVEMENTS
 
+PIG-1428: Make a StatusReporter singleton available for incrementing counters 
(dvryaboy)
+
+PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs
+(rding)
+
 PIG-1309: Map-side Cogroup (hashutosh)
 
 PIG-1441: new test targets (olgan)

Modified: 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=963952&r1=963951&r2=963952&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
 Wed Jul 14 06:14:33 2010
@@ -22,9 +22,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.pig.StoreFuncInterface;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
 /**
  * This class is used to have a POStore write to DFS via a output
  * collector/record writer. It sets up a modified job configuration to
@@ -36,21 +39,27 @@ public class MapReducePOStoreImpl extend
     
     private TaskAttemptContext context;
 
+    private PigStatusReporter reporter;
+
     @SuppressWarnings("unchecked")
     private RecordWriter writer;
     
-    public MapReducePOStoreImpl(TaskAttemptContext context) {
+    public MapReducePOStoreImpl(TaskInputOutputContext context) {
         // get a copy of the Configuration so that changes to the
         // configuration below (like setting the output location) do
         // not affect the caller's copy
         Configuration outputConf = new 
Configuration(context.getConfiguration());
 
+        PigStatusReporter.setContext(context);
+                               reporter = PigStatusReporter.getInstance();
+
         // make a copy of the Context to use here - since in the same
         // task (map or reduce) we could have multiple stores, we should
         // make this copy so that the same context does not get over-written
         // by the different stores.
         this.context = new TaskAttemptContext(outputConf, 
                 context.getTaskAttemptID());
+
     }
     
     @SuppressWarnings("unchecked")

Modified: 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=963952&r1=963951&r2=963952&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 Wed Jul 14 06:14:33 2010
@@ -42,6 +42,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class PigCombiner {
 
@@ -134,7 +135,8 @@ public class PigCombiner {
 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setTaskIOContext(context);
+                PigStatusReporter.setContext(context);
+                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
             }
             

Modified: 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=963952&r1=963951&r2=963952&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 Wed Jul 14 06:14:33 2010
@@ -19,9 +19,9 @@ package org.apache.pig.backend.hadoop.ex
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
 
 /**
  * 
@@ -40,7 +40,7 @@ public final class PigHadoopLogger imple
     } 
 
     private static Log log = LogFactory.getLog(PigHadoopLogger.class);
-    private TaskInputOutputContext<?, ?, ?, ?> taskIOContext = null;
+    private PigStatusReporter reporter = null;
     private boolean aggregate = false;
 
     private PigHadoopLogger() {
@@ -50,10 +50,9 @@ public final class PigHadoopLogger imple
     public void warn(Object o, String msg, Enum warningEnum) {
         String displayMessage = o.getClass().getName() + ": " + msg;
         if(aggregate) {
-            if(taskIOContext != null) {
-                Counter c = taskIOContext.getCounter(warningEnum);
-                c.increment(1);
-            } else {
+            if (reporter != null) {
+                     reporter.getCounter(warningEnum).increment(1);
+                                               } else {
                 //TODO:
                 //in local mode of execution if the PigHadoopLogger is used 
initially,
                 //then aggregation cannot be performed as the reporter will be 
null. 
@@ -69,14 +68,10 @@ public final class PigHadoopLogger imple
         }
     }    
 
-    public TaskInputOutputContext<?, ?, ?, ?> getTaskIOContext() {
-        return taskIOContext;
+    public synchronized void setReporter(PigStatusReporter rep) {
+        this.reporter = rep;
     }
 
-    public synchronized void setTaskIOContext(TaskInputOutputContext<?, ?, ?, 
?> tioc) {
-        this.taskIOContext = tioc;
-    }
-    
     public boolean getAggregate() {
         return aggregate;
     }

Modified: 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=963952&r1=963951&r2=963952&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Wed Jul 14 06:14:33 2010
@@ -48,6 +48,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public abstract class PigMapBase extends Mapper<Text, Tuple, 
PigNullableWritable, Writable> {
     private static final Tuple DUMMYTUPLE = null;
@@ -211,7 +212,8 @@ public abstract class PigMapBase extends
 
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
             pigHadoopLogger.setAggregate(aggregateWarning);
-            pigHadoopLogger.setTaskIOContext(context);
+            PigStatusReporter.setContext(context);
+            pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
             PhysicalOperator.setPigLogger(pigHadoopLogger);
         }
         

Modified: 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=963952&r1=963951&r2=963952&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Wed Jul 14 06:14:33 2010
@@ -55,6 +55,8 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
 
 /**
  * This class is the static Mapper &amp; Reducer classes that
@@ -348,8 +350,9 @@ public class PigMapReduce {
 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setTaskIOContext(context);
-                
+                PigStatusReporter.setContext(context); 
+                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
 
                 for (POStore store: stores) {
@@ -560,7 +563,8 @@ public class PigMapReduce {
                 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setTaskIOContext(context);
+                PigStatusReporter.setContext(context);
+                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
                 
                 for (POStore store: stores) {

Modified: 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java?rev=963952&r1=963951&r2=963952&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java 
(original)
+++ 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java 
Wed Jul 14 06:14:33 2010
@@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.impl.util.BagFormat;
 import org.apache.pig.impl.util.Spillable;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -373,10 +374,12 @@ public abstract class DefaultAbstractBag
     }
     
     protected void incSpillCount(Enum counter) {
-        // Increment the spill count
-        // warn is a misnomer. The function updates the counter. If the update
-        // fails, it dumps a warning
-        PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", 
counter);
+        PigStatusReporter reporter = PigStatusReporter.getInstance();
+        if (reporter != null) {
+            reporter.getCounter(counter).increment(1);
+        } else {
+            PigHadoopLogger.getInstance().warn(this, "Spill counter 
incremented", counter);
+        }
     }
     
     public static abstract class BagDelimiterTuple extends DefaultTuple{}

Added: 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=963952&view=auto
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
 (added)
+++ 
hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
 Wed Jul 14 06:14:33 2010
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.Progressable;
+
+...@suppresswarnings("unchecked")
+public class PigStatusReporter extends StatusReporter implements Progressable {
+
+    private TaskInputOutputContext context;
+    private static PigStatusReporter reporter = null;
+    /**
+     * Get singleton instance of the context
+     */
+    public static PigStatusReporter getInstance() {
+        if (reporter == null) {
+            reporter = new PigStatusReporter(null);
+        }
+        return reporter;
+    }
+    
+    public static void setContext(TaskInputOutputContext context) {
+        reporter = new PigStatusReporter(context);
+    }
+    
+    private PigStatusReporter(TaskInputOutputContext context) {
+        this.context = context;
+    }
+    
+    @Override
+    public Counter getCounter(Enum<?> name) {        
+        return (context == null) ? null : context.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+        return (context == null) ? null : context.getCounter(group, name);
+    }
+
+    @Override
+    public void progress() {
+        if (context != null) {
+            context.progress();
+        }
+    }
+
+    @Override
+    public void setStatus(String status) {
+        if (context != null) {
+            context.setStatus(status);
+        }
+    }
+
+}


Reply via email to