Author: rding
Date: Thu Apr 8 22:54:06 2010
New Revision: 932161
URL: http://svn.apache.org/viewvc?rev=932161&view=rev
Log:
PIG-1299: Implement Pig counter to track number of output rows for each output
files
Added:
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Apr 8 22:54:06 2010
@@ -39,6 +39,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1299: Implement Pig counter to track number of output rows for each output
+files (rding)
+
PIG-1366: PigStorage's pushProjection implementation results in NPE under
certain data conditions (pradeepkth)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Thu Apr 8 22:54:06 2010
@@ -468,6 +468,10 @@ public class JobControlCompiler{
tmpLocation = new Path(tmpLocationStr);
nwJob.setOutputFormatClass(PigOutputFormat.class);
+
+ for (POStore sto: storeLocations) {
+ sto.setMultiStore(true);
+ }
conf.set("pig.streaming.log.dir",
new Path(tmpLocation, LOG_DIR).toString());
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Thu Apr 8 22:54:06 2010
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -63,6 +64,7 @@ import org.apache.pig.impl.util.LogUtils
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
/**
* Main class that launches pig for Map Reduce
@@ -249,12 +251,12 @@ public class MapReduceLauncher extends L
stats.setJobClient(jobClient);
stats.setJobControl(jc);
stats.accumulateStats();
-
+
jc.stop();
}
log.info( "100% complete");
-
+
boolean failed = false;
int finalStores = 0;
// Look to see if any jobs failed. If so, we need to report that.
@@ -286,35 +288,54 @@ public class MapReduceLauncher extends L
Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
- if(succJobs!=null) {
- for(Job job : succJobs){
- List<POStore> sts = jcc.getStores(job);
- for (POStore st: sts) {
- // Currently (as of Feb 3 2010), hadoop's local mode does
not
- // call cleanupJob on OutputCommitter (see
https://issues.apache.org/jira/browse/MAPREDUCE-1447)
- // So to workaround that bug, we are calling
setStoreSchema on
- // StoreFunc's which implement StoreMetadata here
+ if (succJobs != null) {
+ Map<String, String> storeCounters = new HashMap<String, String>();
+ for (Job job : succJobs) {
+ List<POStore> sts = jcc.getStores(job);
+ for (POStore st : sts) {
+ // Currently (as of Feb 3 2010), hadoop's local mode does
+ // not call cleanupJob on OutputCommitter (see
+ // https://issues.apache.org/jira/browse/MAPREDUCE-1447)
+ // So to workaround that bug, we are calling setStoreSchema
+ // on StoreFunc's which implement StoreMetadata here
/**********************************************************/
- // NOTE: THE FOLLOWING IF SHOULD BE REMOVED ONCE
MAPREDUCE-1447
+ // NOTE: THE FOLLOWING IF SHOULD BE REMOVED ONCE
+ // MAPREDUCE-1447
// IS FIXED - TestStore.testSetStoreSchema() should fail at
// that time and removing this code should fix it.
/**********************************************************/
- if(pc.getExecType() == ExecType.LOCAL) {
+ if (pc.getExecType() == ExecType.LOCAL) {
storeSchema(job, st);
}
if (!st.isTmpStore()) {
succeededStores.add(st);
finalStores++;
- log.info("Successfully stored result in:
\""+st.getSFile().getFileName()+"\"");
+ if (st.isMultiStore()) {
+ String counterName =
PigStatsUtil.getMultiStoreCounterName(st);
+ long count = PigStatsUtil.getMultiStoreCount(job,
+ jobClient, counterName);
+ log.info("Successfully stored " + count + "
records in: \""
+ + st.getSFile().getFileName() + "\"");
+ storeCounters.put(counterName,
Long.valueOf(count).toString());
+ } else {
+ log.info("Successfully stored result in: \""
+ + st.getSFile().getFileName() + "\"");
+ }
+ } else {
+ log.debug("Successfully stored result in: \""
+ + st.getSFile().getFileName() + "\"");
}
- else
- log.debug("Successfully stored result in:
\""+st.getSFile().getFileName()+"\"");
}
- getStats(job,jobClient, false, pc);
- if(aggregateWarning) {
+
+ getStats(job, jobClient, false, pc);
+ if (aggregateWarning) {
computeWarningAggregate(job, jobClient, warningAggMap);
}
}
+ if (storeCounters.size() > 0) {
+ stats.addStatsGroup(PigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+ storeCounters);
+ }
}
if(aggregateWarning) {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
Thu Apr 8 22:54:06 2010
@@ -18,13 +18,18 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.pig.StoreFuncInterface;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
* This class is used to have a POStore write to DFS via a output
* collector/record writer. It sets up a modified job configuration to
@@ -32,19 +37,23 @@ import org.apache.pig.backend.hadoop.exe
* directory. This is done so that multiple output directories can be
* used in the same job.
*/
+...@suppresswarnings("unchecked")
public class MapReducePOStoreImpl extends POStoreImpl {
-
+
private TaskAttemptContext context;
+
+ private PigStatusReporter reporter;
- @SuppressWarnings("unchecked")
private RecordWriter writer;
-
- public MapReducePOStoreImpl(TaskAttemptContext context) {
+
+ public MapReducePOStoreImpl(TaskInputOutputContext context) {
// get a copy of the Configuration so that changes to the
// configuration below (like setting the output location) do
// not affect the caller's copy
Configuration outputConf = new
Configuration(context.getConfiguration());
-
+
+ reporter = new PigStatusReporter(context);
+
// make a copy of the Context to use here - since in the same
// task (map or reduce) we could have multiple stores, we should
// make this copy so that the same context does not get over-written
@@ -53,11 +62,10 @@ public class MapReducePOStoreImpl extend
context.getTaskAttemptID());
}
- @SuppressWarnings("unchecked")
@Override
public StoreFuncInterface createStoreFunc(POStore store)
- throws IOException {
-
+ throws IOException {
+
StoreFuncInterface storeFunc = store.getStoreFunc();
// call the setStoreLocation on the storeFunc giving it the
@@ -67,19 +75,18 @@ public class MapReducePOStoreImpl extend
// this modified Configuration into the configuration of the
// Context we have
PigOutputFormat.setLocation(context, store);
- OutputFormat outputFormat = null;
- try {
- outputFormat = storeFunc.getOutputFormat();
+ OutputFormat outputFormat = storeFunc.getOutputFormat();
- // create a new record writer
+ // create a new record writer
+ try {
writer = outputFormat.getRecordWriter(context);
- storeFunc.prepareToWrite(writer);
- return storeFunc;
-
- }catch(Exception e) {
+ } catch (InterruptedException e) {
throw new IOException(e);
}
-
+
+ storeFunc.prepareToWrite(writer);
+
+ return storeFunc;
}
@Override
@@ -105,4 +112,11 @@ public class MapReducePOStoreImpl extend
writer = null;
}
}
+
+ public Counter createRecordCounter(POStore store) {
+ Counter outputRecordCounter = reporter.getCounter(
+ PigStatsUtil.MULTI_STORE_COUNTER_GROUP, PigStatsUtil
+ .getMultiStoreCounterName(store));
+ return outputRecordCounter;
+ }
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
Thu Apr 8 22:54:06 2010
@@ -42,6 +42,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
public class PigCombiner {
@@ -134,7 +135,8 @@ public class PigCombiner {
PigHadoopLogger pigHadoopLogger =
PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setTaskIOContext(context);
+ pigHadoopLogger.setReporter(new PigStatusReporter(context));
+
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
Thu Apr 8 22:54:06 2010
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
*
@@ -40,7 +41,9 @@ public final class PigHadoopLogger imple
}
private static Log log = LogFactory.getLog(PigHadoopLogger.class);
- private TaskInputOutputContext<?, ?, ?, ?> taskIOContext = null;
+
+ private PigStatusReporter reporter = null;
+
private boolean aggregate = false;
private PigHadoopLogger() {
@@ -49,10 +52,10 @@ public final class PigHadoopLogger imple
@SuppressWarnings("unchecked")
public void warn(Object o, String msg, Enum warningEnum) {
String displayMessage = o.getClass().getName() + ": " + msg;
- if(aggregate) {
- if(taskIOContext != null) {
- Counter c = taskIOContext.getCounter(warningEnum);
- c.increment(1);
+
+ if (aggregate) {
+ if (reporter != null) {
+ reporter.getCounter(warningEnum).increment(1);
} else {
//TODO:
//in local mode of execution if the PigHadoopLogger is used
initially,
@@ -69,12 +72,8 @@ public final class PigHadoopLogger imple
}
}
- public TaskInputOutputContext<?, ?, ?, ?> getTaskIOContext() {
- return taskIOContext;
- }
-
- public synchronized void setTaskIOContext(TaskInputOutputContext<?, ?, ?,
?> tioc) {
- this.taskIOContext = tioc;
+ public synchronized void setReporter(PigStatusReporter rep) {
+ this.reporter = rep;
}
public boolean getAggregate() {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
Thu Apr 8 22:54:06 2010
@@ -48,6 +48,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
public abstract class PigMapBase extends Mapper<Text, Tuple,
PigNullableWritable, Writable> {
private static final Tuple DUMMYTUPLE = null;
@@ -199,7 +200,7 @@ public abstract class PigMapBase extends
this.outputCollector = context;
pigReporter.setRep(context);
PhysicalOperator.setReporter(pigReporter);
-
+
for (POStore store: stores) {
MapReducePOStoreImpl impl
= new MapReducePOStoreImpl(context);
@@ -211,7 +212,7 @@ public abstract class PigMapBase extends
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setTaskIOContext(context);
+ pigHadoopLogger.setReporter(new PigStatusReporter(context));
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
Thu Apr 8 22:54:06 2010
@@ -54,6 +54,7 @@ import org.apache.pig.impl.plan.Dependen
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
* This class is the static Mapper & Reducer classes that
@@ -338,7 +339,7 @@ public class PigMapReduce {
PigHadoopLogger pigHadoopLogger =
PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setTaskIOContext(context);
+ pigHadoopLogger.setReporter(new PigStatusReporter(context));
PhysicalOperator.setPigLogger(pigHadoopLogger);
@@ -550,7 +551,8 @@ public class PigMapReduce {
PigHadoopLogger pigHadoopLogger =
PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setTaskIOContext(context);
+ pigHadoopLogger.setReporter(new PigStatusReporter(context));
+
PhysicalOperator.setPigLogger(pigHadoopLogger);
for (POStore store: stores) {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
Thu Apr 8 22:54:06 2010
@@ -20,12 +20,12 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.pig.PigException;
import org.apache.pig.SortInfo;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReducePOStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -50,14 +50,18 @@ public class POStore extends PhysicalOpe
private static final long serialVersionUID = 1L;
private static Result empty = new Result(POStatus.STATUS_NULL, null);
transient private StoreFuncInterface storer;
- transient private final Log log = LogFactory.getLog(getClass());
transient private POStoreImpl impl;
private FileSpec sFile;
private Schema schema;
+
+ transient private Counter outputRecordCounter = null;
// flag to distinguish user stores from MRCompiler stores.
private boolean isTmpStore;
+ // flag to distinguish single store from multiquery store.
+ private boolean isMultiStore;
+
// If we know how to reload the store, here's how. The lFile
// FileSpec is set in PigServer.postProcess. It can be used to
// reload this store, if the optimizer has the need.
@@ -90,6 +94,10 @@ public class POStore extends PhysicalOpe
if (impl != null) {
try{
storer = impl.createStoreFunc(this);
+ if (!isTmpStore && impl instanceof MapReducePOStoreImpl) {
+ outputRecordCounter =
+ ((MapReducePOStoreImpl)
impl).createRecordCounter(this);
+ }
}catch (IOException ioe) {
int errCode = 2081;
String msg = "Unable to setup the store function.";
@@ -126,6 +134,9 @@ public class POStore extends PhysicalOpe
case POStatus.STATUS_OK:
storer.putNext((Tuple)res.result);
res = empty;
+ if (outputRecordCounter != null) {
+ outputRecordCounter.increment(1);
+ }
break;
case POStatus.STATUS_EOP:
break;
@@ -228,4 +239,12 @@ public class POStore extends PhysicalOpe
public void setSignature(String signature) {
this.signature = signature;
}
+
+ public void setMultiStore(boolean isMultiStore) {
+ this.isMultiStore = isMultiStore;
+ }
+
+ public boolean isMultiStore() {
+ return isMultiStore;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Thu Apr 8
22:54:06 2010
@@ -101,6 +101,10 @@ public class PigStats {
throw new RuntimeException("Unrecognized mode. Either MapReduce or
Local mode expected.");
}
+ public void addStatsGroup(String key, Map<String, String> value) {
+ stats.put(key, value);
+ }
+
private Map<String, Map<String, String>> accumulateLocalStats() {
//The counter placed before a store in the local plan should be able
to get the number of records
for(PhysicalOperator op : php.getLeaves()) {
Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=932161&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Thu
Apr 8 22:54:06 2010
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.mortbay.log.Log;
+
+public abstract class PigStatsUtil {
+
+ public static final String MULTI_STORE_RECORD_COUNTER
+ = "Output records in ";
+ public static final String MULTI_STORE_COUNTER_GROUP
+ = "MultiStoreCounters";
+
+ @SuppressWarnings("deprecation")
+ public static long getMultiStoreCount(Job job, JobClient jobClient,
+ String counterName) {
+ long value = 0;
+ try {
+ RunningJob rj = jobClient.getJob(job.getAssignedJobID());
+ if (rj != null) {
+ Counters.Counter counter = rj.getCounters().getGroup(
+
MULTI_STORE_COUNTER_GROUP).getCounterForName(counterName);
+ value = counter.getValue();
+ }
+ } catch (IOException e) {
+ Log.warn("Failed to get the counter for " + counterName);
+ }
+ return value;
+ }
+
+ public static String getMultiStoreCounterName(POStore store) {
+ return MULTI_STORE_RECORD_COUNTER +
+ new Path(store.getSFile().getFileName()).getName();
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=932161&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
(added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Thu Apr 8 22:54:06 2010
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.Progressable;
+
+...@suppresswarnings("unchecked")
+public class PigStatusReporter extends StatusReporter implements Progressable {
+
+ private TaskInputOutputContext context;
+
+ public PigStatusReporter(TaskInputOutputContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return (context == null) ? null : context.getCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return (context == null) ? null : context.getCounter(group, name);
+ }
+
+ @Override
+ public void progress() {
+ if (context != null) {
+ context.progress();
+ }
+ }
+
+ @Override
+ public void setStatus(String status) {
+ if (context != null) {
+ context.setStatus(status);
+ }
+ }
+
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=932161&r1=932160&r2=932161&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Thu Apr 8
22:54:06 2010
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -32,8 +33,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.junit.Test;
public class TestCounters extends TestCase {
@@ -378,7 +381,7 @@ public class TestCounters extends TestCa
assertEquals(count, pigStats.getRecordsWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
-
+
@Test
public void testMapCombineReduceBinStorage() throws IOException,
ExecException {
int count = 0;
@@ -513,6 +516,95 @@ public class TestCounters extends TestCa
}
+ @Test
+ public void testMapOnlyMultiQueryStores() throws Exception {
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ for(int i = 0; i < MAX; i++) {
+ int t = r.nextInt(100);
+ pw.println(t);
+ }
+ pw.close();
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+ cluster.getProperties());
+ pigServer.setBatchOn();
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = filter a by $0 > 50;");
+ pigServer.registerQuery("c = filter a by $0 <= 50;");
+ pigServer.registerQuery("store b into '/tmp/outout1';");
+ pigServer.registerQuery("store c into '/tmp/outout2';");
+ List<ExecJob> jobs = pigServer.executeBatch();
+
+ assertTrue(jobs != null && jobs.size() == 2);
+
+ Map<String, Map<String, String>> stats =
+ jobs.get(0).getStatistics().getPigStats();
+
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
+ cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
+
+ Map<String, String> entry =
+ stats.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+
+ long counter = 0;
+ for (String val : entry.values()) {
+ counter += new Long(val);
+ }
+
+ assertEquals(MAX, counter);
+ }
+
+ @Test
+ public void testMultiQueryStores() throws Exception {
+ int[] nums = new int[100];
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+ for(int i = 0; i < MAX; i++) {
+ int t = r.nextInt(100);
+ pw.println(t);
+ nums[t]++;
+ }
+ pw.close();
+
+ int groups = 0;
+ for (int i : nums) {
+ if (i > 0) groups++;
+ }
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+ cluster.getProperties());
+ pigServer.setBatchOn();
+ pigServer.registerQuery("a = load '" + file + "';");
+ pigServer.registerQuery("b = filter a by $0 >= 50;");
+ pigServer.registerQuery("c = group b by $0;");
+ pigServer.registerQuery("d = foreach c generate group;");
+ pigServer.registerQuery("e = filter a by $0 < 50;");
+ pigServer.registerQuery("f = group e by $0;");
+ pigServer.registerQuery("g = foreach f generate group;");
+ pigServer.registerQuery("store d into '/tmp/outout1';");
+ pigServer.registerQuery("store g into '/tmp/outout2';");
+ List<ExecJob> jobs = pigServer.executeBatch();
+
+ assertTrue(jobs != null && jobs.size() == 2);
+
+ Map<String, Map<String, String>> stats =
+ jobs.get(0).getStatistics().getPigStats();
+
+ cluster.getFileSystem().delete(new Path(file), true);
+ cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
+ cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
+
+ Map<String, String> entry =
+ stats.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+
+ long counter = 0;
+ for (String val : entry.values()) {
+ counter += new Long(val);
+ }
+
+ assertEquals(groups, counter);
+ }
+
/*
* IMPORTANT NOTE:
* COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -