Author: gates
Date: Fri Jun 5 16:26:15 2009
New Revision: 782057
URL: http://svn.apache.org/viewvc?rev=782057&view=rev
Log:
PIG-831: Turned off reporting of records and bytes written for mutli-store
queries as the returned results are confusing and wrong.
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=782057&r1=782056&r2=782057&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jun 5 16:26:15 2009
@@ -26,6 +26,9 @@
IMPROVEMENTS
+PIG-831: Turned off reporting of records and bytes written for mutli-store
+queries as the returned results are confusing and wrong. (gates)
+
PIG-813: documentation updates (chandec via olgan)
PIG-825: PIG_HADOOP_VERSION should be set to 18 (dvryaboy via gates).
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=782057&r1=782056&r2=782057&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Fri Jun 5 16:26:15 2009
@@ -192,6 +192,7 @@
log.info( "100% complete");
boolean failed = false;
+ int finalStores = 0;
// Look to see if any jobs failed. If so, we need to report that.
if (failedJobs != null && failedJobs.size() > 0) {
log.error(failedJobs.size()+" map reduce job(s) failed!");
@@ -210,6 +211,7 @@
if (!st.isTmpStore()) {
failedStores.add(st.getSFile());
failureMap.put(st.getSFile(), backendException);
+ finalStores++;
}
FileLocalizer.registerDeleteOnFail(st.getSFile().getFileName(), pc);
@@ -227,6 +229,7 @@
for (POStore st: sts) {
if (!st.isTmpStore()) {
succeededStores.add(st.getSFile());
+ finalStores++;
}
log.info("Successfully stored result in:
\""+st.getSFile().getFileName()+"\"");
}
@@ -241,9 +244,10 @@
CompilationMessageCollector.logAggregate(warningAggMap,
MessageType.Warning, log) ;
}
- if(stats.getPigStats().get(stats.getLastJobID()) == null)
- log.warn("Jobs not found in the JobClient. Please try to use
Local, Hadoop Distributed or Hadoop MiniCluster modes instead of Hadoop
LocalExecution");
- else {
+ // Report records and bytes written. Only do this in the single store
case. Multi-store
+ // scripts mess up the stats reporting from hadoop.
+ List<String> rji = stats.getRootJobIDs();
+ if (rji != null && rji.size() == 1 && finalStores == 1) {
log.info("Records written : " + stats.getRecordsWritten());
log.info("Bytes written : " + stats.getBytesWritten());
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=782057&r1=782056&r2=782057&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Fri Jun 5
16:26:15 2009
@@ -21,7 +21,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,8 +33,6 @@
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Counters.Group;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.ExecType;
@@ -45,7 +42,6 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import
org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.util.ObjectSerializer;
public class PigStats {
@@ -54,7 +50,8 @@
JobControl jc;
JobClient jobClient;
Map<String, Map<String, String>> stats = new HashMap<String,
Map<String,String>>();
- String lastJobID;
+ // String lastJobID;
+ ArrayList<String> rootJobIDs = new ArrayList<String>();
ExecType mode;
public void setMROperatorPlan(MROperPlan mrp) {
@@ -112,8 +109,6 @@
private Map<String, Map<String, String>> accumulateMRStats() throws
ExecException {
- Job lastJob = getLastJob(jc.getSuccessfulJobs());
-
for(Job job : jc.getSuccessfulJobs()) {
@@ -170,12 +165,14 @@
}
- if (lastJob != null) lastJobID = lastJob.getAssignedJobID().toString();
+ getLastJobIDs(jc.getSuccessfulJobs());
+
return stats;
}
- private Job getLastJob(List<Job> jobs) {
+ private void getLastJobIDs(List<Job> jobs) {
+ rootJobIDs.clear();
Set<Job> temp = new HashSet<Job>();
for(Job job : jobs) {
if(job.getDependingJobs() != null &&
job.getDependingJobs().size() > 0)
@@ -185,17 +182,13 @@
//difference between temp and jobs would be the set of leaves
//we can safely assume there would be only one leaf
for(Job job : jobs) {
- if(temp.contains(job))
- continue;
- else
- //this means a leaf
- return job;
+ if(temp.contains(job)) continue;
+ else rootJobIDs.add(job.getAssignedJobID().toString());
}
- return null;
}
- public String getLastJobID() {
- return lastJobID;
+ public List<String> getRootJobIDs() {
+ return rootJobIDs;
}
public Map<String, Map<String, String>> getPigStats() {
@@ -226,14 +219,18 @@
* @return
*/
private long getRecordsCountMR() {
- String reducePlan = stats.get(lastJobID).get("PIG_STATS_REDUCE_PLAN");
- String records = null;
- if(reducePlan == null) {
- records = stats.get(lastJobID).get("PIG_STATS_MAP_OUTPUT_RECORDS");
- } else {
- records =
stats.get(lastJobID).get("PIG_STATS_REDUCE_OUTPUT_RECORDS");
+ long records = 0;
+ for (String jid : rootJobIDs) {
+ Map<String, String> jobStats = stats.get(jid);
+ if (jobStats == null) continue;
+ String reducePlan = jobStats.get("PIG_STATS_REDUCE_PLAN");
+ if(reducePlan == null) {
+ records +=
Long.parseLong(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
+ } else {
+ records +=
Long.parseLong(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
+ }
}
- return Long.parseLong(records);
+ return records;
}
public long getBytesWritten() {
@@ -254,7 +251,13 @@
}
private long getMapReduceBytesWritten() {
- return
Long.parseLong(stats.get(lastJobID).get("PIG_STATS_BYTES_WRITTEN"));
+ long bytesWritten = 0;
+ for (String jid : rootJobIDs) {
+ Map<String, String> jobStats = stats.get(jid);
+ if (jobStats == null) continue;
+ bytesWritten +=
Long.parseLong(jobStats.get("PIG_STATS_BYTES_WRITTEN"));
+ }
+ return bytesWritten;
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=782057&r1=782056&r2=782057&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Fri Jun 5
16:26:15 2009
@@ -28,14 +28,12 @@
import junit.framework.TestCase;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.PigStats;
-import org.junit.Before;
import org.junit.Test;
public class TestCounters extends TestCase {
@@ -499,7 +497,7 @@
System.out.println("============================================");
}
- Map<String, String> jobStats = stats.get(pigStats.getLastJobID());
+ Map<String, String> jobStats =
stats.get(pigStats.getRootJobIDs().get(0));
System.out.println("Map input records : " +
jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
assertEquals(MAX,
Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));