Author: olga
Date: Thu Jan 22 12:26:07 2009
New Revision: 736781
URL: http://svn.apache.org/viewvc?rev=736781&view=rev
Log:
PIG-629: getting rid of targeted tuple
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jan 22 12:26:07 2009
@@ -364,7 +364,9 @@
PIG-597: Fix for how * is treated by UDFs (shravanmn via olgan)
+ PIG-629: performance improvement: getting rid of targeted tuple (pradeepk
+ via olgan)
+
PIG-623: Fix spelling errors in output messages (tomwhite via sms)
PIG-622: Include pig executable in distribution (tomwhite via sms)
-
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
Thu Jan 22 12:26:07 2009
@@ -54,7 +54,7 @@
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
-public class PigInputFormat implements InputFormat<Text, TargetedTuple>,
+public class PigInputFormat implements InputFormat<Text, Tuple>,
JobConfigurable {
public static final Log LOG = LogFactory
@@ -211,7 +211,7 @@
return splits.toArray(new SliceWrapper[splits.size()]);
}
- public RecordReader<Text, TargetedTuple> getRecordReader(InputSplit split,
+ public RecordReader<Text, Tuple> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
PigInputFormat.sJob = job;
activeSplit = (SliceWrapper) split;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
Thu Jan 22 12:26:07 2009
@@ -19,6 +19,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -44,6 +45,8 @@
import org.apache.pig.impl.util.SpillableMemoryManager;
public abstract class PigMapBase extends MapReduceBase{
+ private static final Tuple DUMMYTUPLE = null;
+
private final Log log = LogFactory.getLog(getClass());
protected byte keyType;
@@ -59,8 +62,13 @@
// to transmit heartbeat
ProgressableReporter pigReporter;
- private boolean errorInMap = false;
+ protected boolean errorInMap = false;
+ PhysicalOperator[] roots;
+
+ private PhysicalOperator leaf;
+
+ private boolean initialized = false;
/**
* Will be called when all the tuples in the input
@@ -83,8 +91,6 @@
// This will result in nothing happening in the case
// where there is no stream in the pipeline
mp.endOfAllInput = true;
- List<PhysicalOperator> leaves = mp.getLeaves();
- PhysicalOperator leaf = leaves.get(0);
try {
runPipeline(leaf);
} catch (ExecException e) {
@@ -124,6 +130,16 @@
long sleepTime = job.getLong("pig.reporter.sleep.time", 10000);
pigReporter = new ProgressableReporter();
+ if(!(mp.isEmpty())) {
+ List<OperatorKey> targetOpKeys =
+
(ArrayList<OperatorKey>)ObjectSerializer.deserialize(job.get("map.target.ops"));
+ ArrayList<PhysicalOperator> targetOpsAsList = new
ArrayList<PhysicalOperator>();
+ for (OperatorKey targetKey : targetOpKeys) {
+ targetOpsAsList.add(mp.getOperator(targetKey));
+ }
+ roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
+ leaf = mp.getLeaves().get(0);
+ }
} catch (IOException e) {
log.error(e.getMessage() + "was caused by:");
log.error(e.getCause().getMessage());
@@ -139,19 +155,22 @@
* the tuple as-is whereas map-reduce collects it after extracting
* the key and indexed tuple.
*/
- public void map(Text key, TargetedTuple inpTuple,
+ public void map(Text key, Tuple inpTuple,
OutputCollector<PigNullableWritable, Writable> oc,
Reporter reporter) throws IOException {
- // cache the collector for use in runPipeline() which
- // can be called from close()
- this.outputCollector = oc;
- pigReporter.setRep(reporter);
- PhysicalOperator.setReporter(pigReporter);
+ if(!initialized) {
+ initialized = true;
+ // cache the collector for use in runPipeline() which
+ // can be called from close()
+ this.outputCollector = oc;
+ pigReporter.setRep(reporter);
+ PhysicalOperator.setReporter(pigReporter);
+ }
if(mp.isEmpty()){
try{
- collect(oc,inpTuple.toTuple());
+ collect(oc,inpTuple);
} catch (ExecException e) {
IOException ioe = new IOException(e.getMessage());
ioe.initCause(e.getCause());
@@ -160,17 +179,9 @@
return;
}
- for (OperatorKey targetKey : inpTuple.targetOps) {
-
- PhysicalOperator target = mp.getOperator(targetKey);
- Tuple t = inpTuple.toTuple();
- target.attachInput(t);
+ for (PhysicalOperator root : roots) {
+ root.attachInput(inpTuple);
}
- List<PhysicalOperator> leaves = mp.getLeaves();
-
- PhysicalOperator leaf = leaves.get(0);
-
-
try {
runPipeline(leaf);
@@ -182,9 +193,8 @@
}
private void runPipeline(PhysicalOperator leaf) throws IOException,
ExecException {
- Tuple dummyTuple = null;
while(true){
- Result res = leaf.getNext(dummyTuple);
+ Result res = leaf.getNext(DUMMYTUPLE);
if(res.returnStatus==POStatus.STATUS_OK){
collect(outputCollector,(Tuple)res.result);
continue;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
Thu Jan 22 12:26:07 2009
@@ -63,7 +63,7 @@
public class PigMapOnly {
public static class Map extends PigMapBase implements
- Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
+ Mapper<Text, Tuple, PigNullableWritable, Writable> {
@Override
public void collect(OutputCollector<PigNullableWritable, Writable> oc,
Tuple tuple) throws ExecException, IOException {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
Thu Jan 22 12:26:07 2009
@@ -80,7 +80,7 @@
private final static Tuple DUMMYTUPLE = null;
public static class Map extends PigMapBase implements
- Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
+ Mapper<Text, Tuple, PigNullableWritable, Writable> {
@Override
public void collect(OutputCollector<PigNullableWritable, Writable> oc,
Tuple tuple) throws ExecException, IOException {
@@ -105,7 +105,7 @@
* in the order by is wrapped into a tuple (if it isn't already a tuple)
*/
public static class MapWithComparator extends PigMapBase implements
- Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
+ Mapper<Text, Tuple, PigNullableWritable, Writable> {
@Override
public void collect(OutputCollector<PigNullableWritable, Writable> oc,
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
Thu Jan 22 12:26:07 2009
@@ -47,8 +47,10 @@
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.ObjectSerializer;
/**
* Wraps a {...@link Slice} in an {...@link InputSplit} so it's usable by
hadoop.
@@ -113,7 +115,7 @@
return lastConf;
}
- public RecordReader<Text, TargetedTuple> makeReader(JobConf job) throws
IOException {
+ public RecordReader<Text, Tuple> makeReader(JobConf job) throws
IOException {
lastConf = job;
DataStorage store = new
HDataStorage(ConfigurationUtil.toProperties(job));
// if the execution is against Mapred DFS, set
@@ -122,6 +124,7 @@
store.setActiveContainer(store.asContainer("/user/" +
job.getUser()));
wrapped.init(store);
+ job.set("map.target.ops", ObjectSerializer.serialize(targetOps));
// Mimic org.apache.hadoop.mapred.FileSplit if feasible...
String[] locations = wrapped.getLocations();
if (locations.length > 0) {
@@ -130,18 +133,19 @@
job.setLong("map.input.length", wrapped.getLength());
}
- return new RecordReader<Text, TargetedTuple>() {
+ return new RecordReader<Text, Tuple>() {
+ TupleFactory tupFac = TupleFactory.getInstance();
public void close() throws IOException {
wrapped.close();
}
public Text createKey() {
- return new Text();
+ return null; // we never use the key!
}
- public TargetedTuple createValue() {
- return new TargetedTuple();
+ public Tuple createValue() {
+ return tupFac.newTuple();
}
public long getPos() throws IOException {
@@ -152,9 +156,8 @@
return wrapped.getProgress();
}
- public boolean next(Text key, TargetedTuple value) throws
IOException {
- value.setTargetOps(targetOps);
- return wrapped.next((Tuple)value);
+ public boolean next(Text key, Tuple value) throws IOException {
+ return wrapped.next(value);
}
};
}