Author: rding
Date: Thu Apr  8 22:54:06 2010
New Revision: 932161

URL: http://svn.apache.org/viewvc?rev=932161&view=rev
Log:
PIG-1299: Implement Pig counter to track number of output rows for each output 
files

Added:
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Apr  8 22:54:06 2010
@@ -39,6 +39,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1299: Implement Pig counter to track number of output rows for each output
+files (rding)
+
 PIG-1366: PigStorage's pushProjection implementation results in NPE under
 certain data conditions (pradeepkth)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Thu Apr  8 22:54:06 2010
@@ -468,6 +468,10 @@ public class JobControlCompiler{
                 tmpLocation = new Path(tmpLocationStr);
 
                 nwJob.setOutputFormatClass(PigOutputFormat.class);
+                
+                for (POStore sto: storeLocations) {
+                    sto.setMultiStore(true);
+                }
  
                 conf.set("pig.streaming.log.dir", 
                             new Path(tmpLocation, LOG_DIR).toString());

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Thu Apr  8 22:54:06 2010
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -63,6 +64,7 @@ import org.apache.pig.impl.util.LogUtils
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 
 /**
  * Main class that launches pig for Map Reduce
@@ -249,12 +251,12 @@ public class MapReduceLauncher extends L
             stats.setJobClient(jobClient);
             stats.setJobControl(jc);
             stats.accumulateStats();
-            
+
             jc.stop(); 
         }
 
         log.info( "100% complete");
-
+      
         boolean failed = false;
         int finalStores = 0;
         // Look to see if any jobs failed.  If so, we need to report that.
@@ -286,35 +288,54 @@ public class MapReduceLauncher extends L
 
         Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
                 
-        if(succJobs!=null) {
-            for(Job job : succJobs){
-                List<POStore> sts = jcc.getStores(job);
-                for (POStore st: sts) {
-                    // Currently (as of Feb 3 2010), hadoop's local mode does 
not
-                    // call cleanupJob on OutputCommitter (see 
https://issues.apache.org/jira/browse/MAPREDUCE-1447)
-                    // So to workaround that bug, we are calling 
setStoreSchema on
-                    // StoreFunc's which implement StoreMetadata here
+        if (succJobs != null) {
+            Map<String, String> storeCounters = new HashMap<String, String>();
+            for (Job job : succJobs) {
+                List<POStore> sts = jcc.getStores(job);                
+                for (POStore st : sts) {
+                    // Currently (as of Feb 3 2010), hadoop's local mode does
+                    // not call cleanupJob on OutputCommitter (see
+                    // https://issues.apache.org/jira/browse/MAPREDUCE-1447)
+                    // So to workaround that bug, we are calling setStoreSchema
+                    // on StoreFunc's which implement StoreMetadata here
                     
/**********************************************************/
-                    // NOTE: THE FOLLOWING IF SHOULD BE REMOVED ONCE 
MAPREDUCE-1447
+                    // NOTE: THE FOLLOWING IF SHOULD BE REMOVED ONCE
+                    // MAPREDUCE-1447
                     // IS FIXED - TestStore.testSetStoreSchema() should fail at
                     // that time and removing this code should fix it.
                     
/**********************************************************/
-                    if(pc.getExecType() == ExecType.LOCAL) {
+                    if (pc.getExecType() == ExecType.LOCAL) {
                         storeSchema(job, st);
                     }
                     if (!st.isTmpStore()) {
                         succeededStores.add(st);
                         finalStores++;
-                        log.info("Successfully stored result in: 
\""+st.getSFile().getFileName()+"\"");
+                        if (st.isMultiStore()) {
+                            String counterName = 
PigStatsUtil.getMultiStoreCounterName(st);
+                            long count = PigStatsUtil.getMultiStoreCount(job,
+                                    jobClient, counterName);
+                            log.info("Successfully stored " + count + " 
records in: \""
+                                    + st.getSFile().getFileName() + "\"");
+                            storeCounters.put(counterName, 
Long.valueOf(count).toString());
+                        } else {
+                            log.info("Successfully stored result in: \""
+                                    + st.getSFile().getFileName() + "\"");
+                        }                       
+                    } else {
+                        log.debug("Successfully stored result in: \""
+                                + st.getSFile().getFileName() + "\"");
                     }
-                    else
-                        log.debug("Successfully stored result in: 
\""+st.getSFile().getFileName()+"\"");
                 }
-                getStats(job,jobClient, false, pc);
-                if(aggregateWarning) {
+                                
+                getStats(job, jobClient, false, pc);
+                if (aggregateWarning) {
                     computeWarningAggregate(job, jobClient, warningAggMap);
                 }
             }
+            if (storeCounters.size() > 0) {
+                stats.addStatsGroup(PigStatsUtil.MULTI_STORE_COUNTER_GROUP, 
+                        storeCounters);
+            }
         }
         
         if(aggregateWarning) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
 Thu Apr  8 22:54:06 2010
@@ -18,13 +18,18 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.pig.StoreFuncInterface;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 /**
  * This class is used to have a POStore write to DFS via a output
  * collector/record writer. It sets up a modified job configuration to
@@ -32,19 +37,23 @@ import org.apache.pig.backend.hadoop.exe
  * directory. This is done so that multiple output directories can be
  * used in the same job. 
  */
+...@suppresswarnings("unchecked")
 public class MapReducePOStoreImpl extends POStoreImpl {
-    
+            
     private TaskAttemptContext context;
+    
+    private PigStatusReporter reporter;
 
-    @SuppressWarnings("unchecked")
     private RecordWriter writer;
-    
-    public MapReducePOStoreImpl(TaskAttemptContext context) {
+           
+    public MapReducePOStoreImpl(TaskInputOutputContext context) {
         // get a copy of the Configuration so that changes to the
         // configuration below (like setting the output location) do
         // not affect the caller's copy
         Configuration outputConf = new 
Configuration(context.getConfiguration());
-
+                
+        reporter = new PigStatusReporter(context);
+       
         // make a copy of the Context to use here - since in the same
         // task (map or reduce) we could have multiple stores, we should
         // make this copy so that the same context does not get over-written
@@ -53,11 +62,10 @@ public class MapReducePOStoreImpl extend
                 context.getTaskAttemptID());
     }
     
-    @SuppressWarnings("unchecked")
     @Override
     public StoreFuncInterface createStoreFunc(POStore store) 
-        throws IOException {
-
+            throws IOException {
+ 
         StoreFuncInterface storeFunc = store.getStoreFunc();
 
         // call the setStoreLocation on the storeFunc giving it the
@@ -67,19 +75,18 @@ public class MapReducePOStoreImpl extend
         // this modified Configuration into the configuration of the
         // Context we have
         PigOutputFormat.setLocation(context, store);
-        OutputFormat outputFormat = null;
-        try {
-            outputFormat = storeFunc.getOutputFormat();
+        OutputFormat outputFormat = storeFunc.getOutputFormat();
 
-            // create a new record writer
+        // create a new record writer
+        try {
             writer = outputFormat.getRecordWriter(context);
-            storeFunc.prepareToWrite(writer);
-            return storeFunc;
-            
-        }catch(Exception e) {
+        } catch (InterruptedException e) {
             throw new IOException(e);
         }
-
+ 
+        storeFunc.prepareToWrite(writer);
+        
+        return storeFunc;
     }
 
     @Override
@@ -105,4 +112,11 @@ public class MapReducePOStoreImpl extend
             writer = null;
         }
     }
+    
+    public Counter createRecordCounter(POStore store) {
+        Counter outputRecordCounter = reporter.getCounter(
+                PigStatsUtil.MULTI_STORE_COUNTER_GROUP, PigStatsUtil
+                        .getMultiStoreCounterName(store));
+        return outputRecordCounter; 
+    }
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 Thu Apr  8 22:54:06 2010
@@ -42,6 +42,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class PigCombiner {
 
@@ -134,7 +135,8 @@ public class PigCombiner {
 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setTaskIOContext(context);
+                pigHadoopLogger.setReporter(new PigStatusReporter(context));
+
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
             }
             

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 Thu Apr  8 22:54:06 2010
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
  * 
@@ -40,7 +41,9 @@ public final class PigHadoopLogger imple
     } 
 
     private static Log log = LogFactory.getLog(PigHadoopLogger.class);
-    private TaskInputOutputContext<?, ?, ?, ?> taskIOContext = null;
+
+    private PigStatusReporter reporter = null;
+
     private boolean aggregate = false;
 
     private PigHadoopLogger() {
@@ -49,10 +52,10 @@ public final class PigHadoopLogger imple
     @SuppressWarnings("unchecked")
     public void warn(Object o, String msg, Enum warningEnum) {
         String displayMessage = o.getClass().getName() + ": " + msg;
-        if(aggregate) {
-            if(taskIOContext != null) {
-                Counter c = taskIOContext.getCounter(warningEnum);
-                c.increment(1);
+        
+        if (aggregate) {
+            if (reporter != null) {
+                reporter.getCounter(warningEnum).increment(1);
             } else {
                 //TODO:
                 //in local mode of execution if the PigHadoopLogger is used 
initially,
@@ -69,12 +72,8 @@ public final class PigHadoopLogger imple
         }
     }    
 
-    public TaskInputOutputContext<?, ?, ?, ?> getTaskIOContext() {
-        return taskIOContext;
-    }
-
-    public synchronized void setTaskIOContext(TaskInputOutputContext<?, ?, ?, 
?> tioc) {
-        this.taskIOContext = tioc;
+    public synchronized void setReporter(PigStatusReporter rep) {
+        this.reporter = rep;
     }
     
     public boolean getAggregate() {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Thu Apr  8 22:54:06 2010
@@ -48,6 +48,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public abstract class PigMapBase extends Mapper<Text, Tuple, 
PigNullableWritable, Writable> {
     private static final Tuple DUMMYTUPLE = null;
@@ -199,7 +200,7 @@ public abstract class PigMapBase extends
             this.outputCollector = context;
             pigReporter.setRep(context);
             PhysicalOperator.setReporter(pigReporter);
-
+           
             for (POStore store: stores) {
                 MapReducePOStoreImpl impl 
                     = new MapReducePOStoreImpl(context);
@@ -211,7 +212,7 @@ public abstract class PigMapBase extends
 
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
             pigHadoopLogger.setAggregate(aggregateWarning);
-            pigHadoopLogger.setTaskIOContext(context);
+            pigHadoopLogger.setReporter(new PigStatusReporter(context));
             PhysicalOperator.setPigLogger(pigHadoopLogger);
         }
         

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Thu Apr  8 22:54:06 2010
@@ -54,6 +54,7 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
  * This class is the static Mapper &amp; Reducer classes that
@@ -338,7 +339,7 @@ public class PigMapReduce {
 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setTaskIOContext(context);
+                pigHadoopLogger.setReporter(new PigStatusReporter(context));
                 
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
 
@@ -550,7 +551,8 @@ public class PigMapReduce {
                 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setTaskIOContext(context);
+                pigHadoopLogger.setReporter(new PigStatusReporter(context));
+
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
                 
                 for (POStore store: stores) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
 Thu Apr  8 22:54:06 2010
@@ -20,12 +20,12 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.pig.PigException;
 import org.apache.pig.SortInfo;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReducePOStoreImpl;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -50,14 +50,18 @@ public class POStore extends PhysicalOpe
     private static final long serialVersionUID = 1L;
     private static Result empty = new Result(POStatus.STATUS_NULL, null);
     transient private StoreFuncInterface storer;    
-    transient private final Log log = LogFactory.getLog(getClass());
     transient private POStoreImpl impl;
     private FileSpec sFile;
     private Schema schema;
+    
+    transient private Counter outputRecordCounter = null;
 
     // flag to distinguish user stores from MRCompiler stores.
     private boolean isTmpStore;
     
+    // flag to distinguish single store from multiquery store.
+    private boolean isMultiStore;
+    
     // If we know how to reload the store, here's how. The lFile
     // FileSpec is set in PigServer.postProcess. It can be used to
     // reload this store, if the optimizer has the need.
@@ -90,6 +94,10 @@ public class POStore extends PhysicalOpe
         if (impl != null) {
             try{
                 storer = impl.createStoreFunc(this);
+                if (!isTmpStore && impl instanceof MapReducePOStoreImpl) {
+                    outputRecordCounter = 
+                        ((MapReducePOStoreImpl) 
impl).createRecordCounter(this);
+                }
             }catch (IOException ioe) {
                 int errCode = 2081;
                 String msg = "Unable to setup the store function.";            
@@ -126,6 +134,9 @@ public class POStore extends PhysicalOpe
             case POStatus.STATUS_OK:
                 storer.putNext((Tuple)res.result);
                 res = empty;
+                if (outputRecordCounter != null) {
+                    outputRecordCounter.increment(1);
+                }
                 break;
             case POStatus.STATUS_EOP:
                 break;
@@ -228,4 +239,12 @@ public class POStore extends PhysicalOpe
     public void setSignature(String signature) {
         this.signature = signature;
     }
+
+    public void setMultiStore(boolean isMultiStore) {
+        this.isMultiStore = isMultiStore;
+    }
+
+    public boolean isMultiStore() {
+        return isMultiStore;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Thu Apr  8 
22:54:06 2010
@@ -101,6 +101,10 @@ public class PigStats {
             throw new RuntimeException("Unrecognized mode. Either MapReduce or 
Local mode expected.");
     }
     
+    public void addStatsGroup(String key, Map<String, String> value) {
+        stats.put(key, value);
+    }
+    
     private Map<String, Map<String, String>> accumulateLocalStats() {
         //The counter placed before a store in the local plan should be able 
to get the number of records
         for(PhysicalOperator op : php.getLeaves()) {

Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=932161&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Thu 
Apr  8 22:54:06 2010
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.mortbay.log.Log;
+
+public abstract class PigStatsUtil {
+
+    public static final String MULTI_STORE_RECORD_COUNTER 
+            = "Output records in ";
+    public static final String MULTI_STORE_COUNTER_GROUP 
+            = "MultiStoreCounters";
+    
+    @SuppressWarnings("deprecation")
+    public static long getMultiStoreCount(Job job, JobClient jobClient,
+            String counterName) {
+        long value = 0;
+        try {
+            RunningJob rj = jobClient.getJob(job.getAssignedJobID());
+            if (rj != null) {
+                Counters.Counter counter = rj.getCounters().getGroup(
+                        
MULTI_STORE_COUNTER_GROUP).getCounterForName(counterName);
+                value = counter.getValue();
+            }
+        } catch (IOException e) {
+            Log.warn("Failed to get the counter for " + counterName);
+        }
+        return value;        
+    }
+    
+    public static String getMultiStoreCounterName(POStore store) {
+        return MULTI_STORE_RECORD_COUNTER +
+                new Path(store.getSFile().getFileName()).getName();
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=932161&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java 
(added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java 
Thu Apr  8 22:54:06 2010
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.Progressable;
+
+...@suppresswarnings("unchecked")
+public class PigStatusReporter extends StatusReporter implements Progressable {
+
+    private TaskInputOutputContext context;
+    
+    public PigStatusReporter(TaskInputOutputContext context) {
+        this.context = context;
+    }
+    
+    @Override
+    public Counter getCounter(Enum<?> name) {        
+        return (context == null) ? null : context.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+        return (context == null) ? null : context.getCounter(group, name);
+    }
+
+    @Override
+    public void progress() {
+        if (context != null) {
+            context.progress();
+        }
+    }
+
+    @Override
+    public void setStatus(String status) {
+        if (context != null) {
+            context.setStatus(status);
+        }
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Thu Apr  8 
22:54:06 2010
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -32,8 +33,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.junit.Test;
 
 public class TestCounters extends TestCase {
@@ -378,7 +381,7 @@ public class TestCounters extends TestCa
         assertEquals(count, pigStats.getRecordsWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
-
+    
     @Test
     public void testMapCombineReduceBinStorage() throws IOException, 
ExecException {
         int count = 0;
@@ -513,6 +516,95 @@ public class TestCounters extends TestCa
 
     }
     
+    @Test
+    public void testMapOnlyMultiQueryStores() throws Exception {
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        for(int i = 0; i < MAX; i++) {
+            int t = r.nextInt(100);
+            pw.println(t);
+        }
+        pw.close();
+        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+                cluster.getProperties());
+        pigServer.setBatchOn();
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = filter a by $0 > 50;");
+        pigServer.registerQuery("c = filter a by $0 <= 50;");
+        pigServer.registerQuery("store b into '/tmp/outout1';");
+        pigServer.registerQuery("store c into '/tmp/outout2';");
+        List<ExecJob> jobs = pigServer.executeBatch();
+        
+        assertTrue(jobs != null && jobs.size() == 2);
+        
+        Map<String, Map<String, String>> stats = 
+            jobs.get(0).getStatistics().getPigStats();
+        
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
+        cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
+
+        Map<String, String> entry = 
+            stats.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        
+        long counter = 0;
+        for (String val : entry.values()) {
+            counter += new Long(val);
+        }
+        
+        assertEquals(MAX, counter);       
+    }    
+    
+    @Test
+    public void testMultiQueryStores() throws Exception {
+        int[] nums = new int[100];
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+        for(int i = 0; i < MAX; i++) {
+            int t = r.nextInt(100);
+            pw.println(t);
+            nums[t]++;
+        }
+        pw.close();
+        
+        int groups = 0;
+        for (int i : nums) {
+            if (i > 0) groups++;
+        }
+        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+                cluster.getProperties());
+        pigServer.setBatchOn();
+        pigServer.registerQuery("a = load '" + file + "';");
+        pigServer.registerQuery("b = filter a by $0 >= 50;");
+        pigServer.registerQuery("c = group b by $0;");
+        pigServer.registerQuery("d = foreach c generate group;");
+        pigServer.registerQuery("e = filter a by $0 < 50;");
+        pigServer.registerQuery("f = group e by $0;");
+        pigServer.registerQuery("g = foreach f generate group;");
+        pigServer.registerQuery("store d into '/tmp/outout1';");
+        pigServer.registerQuery("store g into '/tmp/outout2';");
+        List<ExecJob> jobs = pigServer.executeBatch();
+        
+        assertTrue(jobs != null && jobs.size() == 2);
+        
+        Map<String, Map<String, String>> stats = 
+            jobs.get(0).getStatistics().getPigStats();
+        
+        cluster.getFileSystem().delete(new Path(file), true);
+        cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
+        cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
+
+        Map<String, String> entry = 
+            stats.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        
+        long counter = 0;
+        for (String val : entry.values()) {
+            counter += new Long(val);      
+        }
+        
+        assertEquals(groups, counter);       
+    }    
+    
     /*
      * IMPORTANT NOTE:
      * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -


Reply via email to