Author: olga
Date: Thu Jan 22 12:26:07 2009
New Revision: 736781

URL: http://svn.apache.org/viewvc?rev=736781&view=rev
Log:
PIG-629: getting rid of targeted tuple

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jan 22 12:26:07 2009
@@ -364,7 +364,9 @@
 
     PIG-597: Fix for how * is treated by UDFs (shravanmn via olgan)
 
+    PIG-629: performance improvement: getting rid of targeted tuple (pradeepk
+    via olgan)
+
     PIG-623: Fix spelling errors in output messages (tomwhite via sms)
 
     PIG-622: Include pig executable in distribution (tomwhite via sms)
-

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Thu Jan 22 12:26:07 2009
@@ -54,7 +54,7 @@
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 
-public class PigInputFormat implements InputFormat<Text, TargetedTuple>,
+public class PigInputFormat implements InputFormat<Text, Tuple>,
         JobConfigurable {
 
     public static final Log LOG = LogFactory
@@ -211,7 +211,7 @@
         return splits.toArray(new SliceWrapper[splits.size()]);
     }
 
-    public RecordReader<Text, TargetedTuple> getRecordReader(InputSplit split,
+    public RecordReader<Text, Tuple> getRecordReader(InputSplit split,
             JobConf job, Reporter reporter) throws IOException {
         PigInputFormat.sJob = job;
         activeSplit = (SliceWrapper) split;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Thu Jan 22 12:26:07 2009
@@ -19,6 +19,7 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -44,6 +45,8 @@
 import org.apache.pig.impl.util.SpillableMemoryManager;
 
 public abstract class PigMapBase extends MapReduceBase{
+    private static final Tuple DUMMYTUPLE = null;
+
     private final Log log = LogFactory.getLog(getClass());
     
     protected byte keyType;
@@ -59,8 +62,13 @@
     // to transmit heartbeat
     ProgressableReporter pigReporter;
 
-    private boolean errorInMap = false;
+    protected boolean errorInMap = false;
     
+    PhysicalOperator[] roots;
+
+    private PhysicalOperator leaf;
+
+    private boolean initialized = false;
     
     /**
      * Will be called when all the tuples in the input
@@ -83,8 +91,6 @@
             // This will result in nothing happening in the case
             // where there is no stream in the pipeline
             mp.endOfAllInput = true;
-            List<PhysicalOperator> leaves = mp.getLeaves();
-            PhysicalOperator leaf = leaves.get(0);
             try {
                 runPipeline(leaf);
             } catch (ExecException e) {
@@ -124,6 +130,16 @@
             long sleepTime = job.getLong("pig.reporter.sleep.time", 10000);
             
             pigReporter = new ProgressableReporter();
+            if(!(mp.isEmpty())) {
+                List<OperatorKey> targetOpKeys = 
+                    
(ArrayList<OperatorKey>)ObjectSerializer.deserialize(job.get("map.target.ops"));
+                ArrayList<PhysicalOperator> targetOpsAsList = new 
ArrayList<PhysicalOperator>();
+                for (OperatorKey targetKey : targetOpKeys) {                   
 
+                    targetOpsAsList.add(mp.getOperator(targetKey));
+                }
+                roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
+                leaf = mp.getLeaves().get(0);
+            }
         } catch (IOException e) {
             log.error(e.getMessage() + "was caused by:");
             log.error(e.getCause().getMessage());
@@ -139,19 +155,22 @@
      * the tuple as-is whereas map-reduce collects it after extracting
      * the key and indexed tuple.
      */
-    public void map(Text key, TargetedTuple inpTuple,
+    public void map(Text key, Tuple inpTuple,
             OutputCollector<PigNullableWritable, Writable> oc,
             Reporter reporter) throws IOException {
         
-        // cache the collector for use in runPipeline() which
-        // can be called from close()
-        this.outputCollector = oc;
-        pigReporter.setRep(reporter);
-        PhysicalOperator.setReporter(pigReporter);
+        if(!initialized) {
+            initialized  = true;
+            // cache the collector for use in runPipeline() which
+            // can be called from close()
+            this.outputCollector = oc;
+            pigReporter.setRep(reporter);
+            PhysicalOperator.setReporter(pigReporter);
+        }
         
         if(mp.isEmpty()){
             try{
-                collect(oc,inpTuple.toTuple());
+                collect(oc,inpTuple);
             } catch (ExecException e) {
                 IOException ioe = new IOException(e.getMessage());
                 ioe.initCause(e.getCause());
@@ -160,17 +179,9 @@
             return;
         }
         
-        for (OperatorKey targetKey : inpTuple.targetOps) {
-            
-            PhysicalOperator target = mp.getOperator(targetKey);
-            Tuple t = inpTuple.toTuple();
-            target.attachInput(t);
+        for (PhysicalOperator root : roots) {
+            root.attachInput(inpTuple);
         }
-        List<PhysicalOperator> leaves = mp.getLeaves();
-        
-        PhysicalOperator leaf = leaves.get(0);
-        
-        
         try {
             runPipeline(leaf);
             
@@ -182,9 +193,8 @@
     }
 
     private void runPipeline(PhysicalOperator leaf) throws IOException, 
ExecException {
-        Tuple dummyTuple = null;
         while(true){
-            Result res = leaf.getNext(dummyTuple);
+            Result res = leaf.getNext(DUMMYTUPLE);
             if(res.returnStatus==POStatus.STATUS_OK){
                 collect(outputCollector,(Tuple)res.result);
                 continue;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
 Thu Jan 22 12:26:07 2009
@@ -63,7 +63,7 @@
 public class PigMapOnly {
 
     public static class Map extends PigMapBase implements
-            Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
+            Mapper<Text, Tuple, PigNullableWritable, Writable> {
 
         @Override
         public void collect(OutputCollector<PigNullableWritable, Writable> oc, 
Tuple tuple) throws ExecException, IOException {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Thu Jan 22 12:26:07 2009
@@ -80,7 +80,7 @@
     private final static Tuple DUMMYTUPLE = null;
     
     public static class Map extends PigMapBase implements
-            Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
+            Mapper<Text, Tuple, PigNullableWritable, Writable> {
 
         @Override
         public void collect(OutputCollector<PigNullableWritable, Writable> oc, 
Tuple tuple) throws ExecException, IOException {
@@ -105,7 +105,7 @@
      * in the order by is wrapped into a tuple (if it isn't already a tuple)
      */
     public static class MapWithComparator extends PigMapBase implements
-            Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
+            Mapper<Text, Tuple, PigNullableWritable, Writable> {
 
         @Override
         public void collect(OutputCollector<PigNullableWritable, Writable> oc,

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=736781&r1=736780&r2=736781&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
 Thu Jan 22 12:26:07 2009
@@ -47,8 +47,10 @@
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.ObjectSerializer;
 
 /**
  * Wraps a {...@link Slice} in an {...@link InputSplit} so it's usable by 
hadoop.
@@ -113,7 +115,7 @@
         return lastConf;
     }
 
-    public RecordReader<Text, TargetedTuple> makeReader(JobConf job) throws 
IOException {
+    public RecordReader<Text, Tuple> makeReader(JobConf job) throws 
IOException {
         lastConf = job;        
         DataStorage store = new 
HDataStorage(ConfigurationUtil.toProperties(job));
         // if the execution is against Mapred DFS, set
@@ -122,6 +124,7 @@
             store.setActiveContainer(store.asContainer("/user/" + 
job.getUser()));
         wrapped.init(store);
         
+        job.set("map.target.ops", ObjectSerializer.serialize(targetOps));
         // Mimic org.apache.hadoop.mapred.FileSplit if feasible...
         String[] locations = wrapped.getLocations();
         if (locations.length > 0) {
@@ -130,18 +133,19 @@
             job.setLong("map.input.length", wrapped.getLength());
         }
         
-        return new RecordReader<Text, TargetedTuple>() {
+        return new RecordReader<Text, Tuple>() {
 
+            TupleFactory tupFac = TupleFactory.getInstance();
             public void close() throws IOException {
                 wrapped.close();
             }
 
             public Text createKey() {
-                return new Text();
+                return null; // we never use the key!
             }
 
-            public TargetedTuple createValue() {
-                return new TargetedTuple();
+            public Tuple createValue() {
+                return tupFac.newTuple();
             }
 
             public long getPos() throws IOException {
@@ -152,9 +156,8 @@
                 return wrapped.getProgress();
             }
 
-            public boolean next(Text key, TargetedTuple value) throws 
IOException {
-                value.setTargetOps(targetOps);
-                return wrapped.next((Tuple)value);
+            public boolean next(Text key, Tuple value) throws IOException {
+                return wrapped.next(value);
             }
         };
     }


Reply via email to