Author: gates
Date: Tue Dec  1 23:55:24 2009
New Revision: 886015

URL: http://svn.apache.org/viewvc?rev=886015&view=rev
Log:
PIG-1098 Zebra Performance Optimizations.

Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Tue Dec  1 23:55:24 2009
@@ -8,6 +8,8 @@
 
   IMPROVEMENTS
 
+    PIG-1098 Zebra Performance Optimizations (yanz via gates)
+
        PIG-1074 Zebra store function should allow '::' in column names in 
output
        schema (yanz via gates)
 

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 Tue Dec  1 23:55:24 2009
@@ -52,6 +52,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRowSplit;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGScanner;
 import org.apache.hadoop.zebra.types.CGSchema;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
@@ -874,7 +875,7 @@
      */
     private class BTScanner implements TableScanner {
       private Projection schema;
-      private TableScanner[] cgScanners;
+      private CGScanner[] cgScanners;
       private int opCount = 0;
       Random random = new Random(System.nanoTime());
       // checking for consistency once every 1000 times.
@@ -936,7 +937,7 @@
       }
     
       // Helper function for initialization.
-      private TableScanner createCGScanner(int cgIndex, CGRowSplit cgRowSplit, 
+      private CGScanner createCGScanner(int cgIndex, CGRowSplit cgRowSplit, 
                                            RangeSplit rangeSplit,
                                            BytesWritable beginKey, 
                                            BytesWritable endKey) 
@@ -972,7 +973,7 @@
         
         try {
           schema = partition.getProjection();
-          cgScanners = new TableScanner[colGroups.length];
+          cgScanners = new CGScanner[colGroups.length];
           for (int i = 0; i < colGroups.length; ++i) {
             if (!isCGDeleted(i) && partition.isCGNeeded(i)) 
             {
@@ -1020,7 +1021,7 @@
         for (int nx = 0; nx < cgScanners.length; nx++) {
           if (cgScanners[nx] != null)
           {
-            cur = cgScanners[nx].advance();
+            cur = cgScanners[nx].advanceCG();
             if (!firstAdvance) {
               if (cur != first) {
                 throw new IOException(
@@ -1038,9 +1039,6 @@
 
       @Override
       public boolean atEnd() throws IOException {
-        if (cgScanners.length == 0) {
-          return true;
-        }
         boolean ret = true;
         int i;
         for (i = 0; i < cgScanners.length; i++)
@@ -1077,16 +1075,12 @@
 
       @Override
       public void getKey(BytesWritable key) throws IOException {
-        if (cgScanners.length == 0) {
-          return;
-        }
-        
         int i;
         for (i = 0; i < cgScanners.length; i++)
         {
           if (cgScanners[i] != null)
           {
-            cgScanners[i].getKey(key);
+            cgScanners[i].getCGKey(key);
             break;
           }
         }
@@ -1104,7 +1098,7 @@
           if (cgScanners[index] != null)
           {
             BytesWritable key2 = new BytesWritable();
-            cgScanners[index].getKey(key2);
+            cgScanners[index].getCGKey(key2);
             if (key.equals(key2)) {
               return;
             }
@@ -1129,7 +1123,7 @@
             {
               if (cgTuples[i] == null)
                 throw new AssertionError("cgTuples["+i+"] is null");
-              cgScanners[i].getValue(cgTuples[i]);
+              cgScanners[i].getCGValue(cgTuples[i]);
             }
           }
         }

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
 Tue Dec  1 23:55:24 2009
@@ -136,8 +136,8 @@
       @Override
       public int compare(Entry<String, Long> o1, Entry<String, Long> o2) {
         long diff = o1.getValue() - o2.getValue();
-        if (diff < 0) return -1;
-        if (diff > 0) return 1;
+        if (diff < 0) return 1;
+        if (diff > 0) return -1;
         return 0;
       }
     });

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
 Tue Dec  1 23:55:24 2009
@@ -381,7 +381,7 @@
      * @return A scanner object.
      * @throws IOException
      */
-    public synchronized TableScanner getScanner(BytesWritable beginKey,
+    public synchronized CGScanner getScanner(BytesWritable beginKey,
         BytesWritable endKey, boolean closeReader) throws IOException,
         ParseException {
       if (closed) {
@@ -422,7 +422,7 @@
      * @return A scanner object.
      * @throws IOException
      */
-    public synchronized TableScanner getScanner(CGRangeSplit split,
+    public synchronized CGScanner getScanner(CGRangeSplit split,
         boolean closeReader) throws IOException, ParseException {
       if (closed) {
         throw new EOFException("Reader already closed");
@@ -449,7 +449,7 @@
      * @param rowSplit specifies part index, start row, and end row.
      * @return A scanner object.
      */
-    public synchronized TableScanner getScanner(boolean closeReader, 
+    public synchronized CGScanner getScanner(boolean closeReader, 
                                                 CGRowSplit rowSplit)
                         throws IOException, ParseException {
       if (closed) {
@@ -1013,22 +1013,34 @@
 
       @Override
       public void getKey(BytesWritable key) throws IOException {
-        if (atEnd()) {
-          throw new EOFException("No more key-value to read");
+          if (atEnd()) {
+            throw new EOFException("No more key-value to read");
+          }
+          scanners[current].getKey(key);
+        }
+
+        @Override
+        public void getValue(Tuple row) throws IOException {
+          if (atEnd()) {
+            throw new EOFException("No more key-value to read");
+          }
+          try {
+            scanners[current].getValue(row);
+          } catch (ParseException e) {
+            throw new IOException("Invalid Projection: "+e.getMessage());
+          }
         }
+
+      public void getCGKey(BytesWritable key) throws IOException {
         scanners[current].getKey(key);
       }
 
-      @Override
-      public void getValue(Tuple row) throws IOException {
-        if (atEnd()) {
-          throw new EOFException("No more key-value to read");
-        }
+      public void getCGValue(Tuple row) throws IOException {
         try {
-          scanners[current].getValue(row);
-        } catch (ParseException e) {
-          throw new IOException("Invalid Projection: "+e.getMessage());
-        }
+            scanners[current].getValue(row);
+          } catch (ParseException e) {
+            throw new IOException("Invalid Projection: "+e.getMessage());
+          }
       }
 
       @Override
@@ -1042,18 +1054,29 @@
 
       @Override
       public boolean advance() throws IOException {
-        if (atEnd()) {
-          return false;
+          if (atEnd()) {
+            return false;
+          }
+          scanners[current].advance();
+          if (scanners[current].atEnd()) {
+            ++current;
+            if (!atEnd()) {
+              scanners[current].rewind();
+            }
+          }
+          return true;
         }
-        scanners[current].advance();
-        if (scanners[current].atEnd()) {
-          ++current;
-          if (!atEnd()) {
-            scanners[current].rewind();
+
+      public boolean advanceCG() throws IOException {
+          scanners[current].advance();
+          if (scanners[current].atEnd()) {
+            ++current;
+            if (!atEnd()) {
+              scanners[current].rewind();
+            }
           }
+          return true;
         }
-        return true;
-      }
 
       @Override
       public boolean atEnd() throws IOException {

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
 Tue Dec  1 23:55:24 2009
@@ -94,7 +94,7 @@
 
   @Override
   public float getProgress() throws IOException {
-    return (float) ((scanner.atEnd()) ? 1.0 : 0);
+    return  (float)((scanner.atEnd()) ? 1.0 : 0);
   }
 
   @Override

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
 Tue Dec  1 23:55:24 2009
@@ -599,18 +599,21 @@
    */
   @Override
   public int compareTo(Schema other) {
-    if (this.mFields.size() != other.mFields.size()) {
-      return this.mFields.size() - other.mFields.size();
+    int mFieldsSize = this.mFields.size(); 
+    if (mFieldsSize != other.mFields.size()) {
+      return mFieldsSize - other.mFields.size();
     }
     int ret = 0;
-    for (int nx = 0; nx < this.mFields.size(); nx++) {
-      if (mFields.get(nx).schema == null
-          && other.mFields.get(nx).schema != null) return -1;
-      else if (mFields.get(nx).schema != null
-          && other.mFields.get(nx).schema == null) return 1;
-      else if (mFields.get(nx).schema == null
-          && other.mFields.get(nx).schema == null) return 0;
-      ret = mFields.get(nx).schema.compareTo(other.mFields.get(nx).schema);
+    for (int nx = 0; nx < mFieldsSize; nx++) {
+      Schema mFieldSchema = mFields.get(nx).schema;
+      Schema otherFieldSchema = other.mFields.get(nx).schema;
+      if (mFieldSchema == null
+          && otherFieldSchema != null) return -1;
+      else if (mFieldSchema != null
+          && otherFieldSchema == null) return 1;
+      else if (mFieldSchema == null
+          && otherFieldSchema == null) return 0;
+      ret = mFieldSchema.compareTo(otherFieldSchema);
       if (ret != 0) {
         return ret;
       }

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
 Tue Dec  1 23:55:24 2009
@@ -444,14 +444,17 @@
     }
 
     void insert(final BytesWritable key) throws ExecException {
-      for (int i = 0; i < mSize; i++)
-        ((Tuple) mTuple).set(mSources.get(i).getProjIndex(), mSources.get(i)
-            .getRecord());
+      for (int i = 0; i < mSize; i++) {
+        PartitionedColumn mSource = mSources.get(i);
+        ((Tuple) mTuple).set(mSource.getProjIndex(), mSource.getRecord());
+      }
     }
 
     void read() throws ExecException {
-      for (int i = 0; i < mSize; i++)
-        mSources.get(i).setRecord(mTuple.get(mSources.get(i).getProjIndex()));
+      for (int i = 0; i < mSize; i++) {
+        PartitionedColumn mSource = mSources.get(i);             
+        mSource.setRecord(mTuple.get(mSource.getProjIndex()));
+      }
     }
 
     void setSource(Tuple tuple) {
@@ -661,6 +664,7 @@
   }
 
   private HashMap<Integer, CGEntry> mCGs = null; // involved CGs
+  private CGEntry[] mCGList = new CGEntry[0];
   private ArrayList<PartitionedColumn> mExecs = null; // stitches to be
   // performed in sequence:
   // called by LOAD
@@ -1177,6 +1181,10 @@
     {
       result = new CGEntry(cgindex);
       mCGs.put(cgindex, result);
+      
+      // Constructing a collection of mCGs so that 
+      // we don't have to do it again [performance]
+      mCGList = (CGEntry[])mCGs.values().toArray(new CGEntry[mCGs.size()]);
     }
     return result;
   }
@@ -1269,20 +1277,23 @@
    * read in a tuple based on stitches
    */
   public void read(Tuple t) throws AssertionError, IOException, Exception {
-    if (mStitchSize == 0 || mCGs == null || mCGs.isEmpty())
+    if (mStitchSize == 0 || mCGs == null || mCGList.length == 0)
       return;
 
     // dispatch
     mExecs.get(mStitchSize - 1).setRecord(t);
 
+    //Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
+    //Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
+    //while (it.hasNext())
+    //  it.next().getValue().read();
+
     // read in CG data
-    Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
-    Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
-    while (it.hasNext())
-      it.next().getValue().read();
+    for (int i = 0; i < mCGList.length; i++)
+      mCGList[i].read();
 
     // dispatch
-    mExecs.get(mStitchSize - 1).setRecord(t);
+    // mExecs.get(mStitchSize - 1).setRecord(t);
 
     // start the stitch
     for (int i = 0; i < mStitchSize; i++)
@@ -1296,7 +1307,7 @@
    */
   public void insert(final BytesWritable key, final Tuple t)
       throws AssertionError, IOException, Exception {
-    if (mSplitSize == 0 || mCGs == null || mCGs.isEmpty())
+    if (mSplitSize == 0 || mCGs == null || mCGList.length == 0)
       throw new AssertionError("Empty Column Group List!");
 
     // dispatch
@@ -1308,10 +1319,14 @@
       mExecs.get(i).split();
 
     // insert CG data
-    Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
-    Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
-    while (it.hasNext())
-      it.next().getValue().insert(key);
+    //Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
+    //Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
+    //while (it.hasNext())
+    //  it.next().getValue().insert(key);
+    
+    for (int i = 0; i < mCGList.length; i++)
+      mCGList[i].insert(key);
+    
     return;
   }
 
@@ -1323,12 +1338,16 @@
       throw new ParseException(
           "Internal Logical Error: Invalid number of column groups");
     for (int i = 0; i < tuples.length; i++) {
-      if (mCGs.get(i) != null) {
+      CGEntry mCG = mCGs.get(i);
+      if (mCG != null) {
         if (tuples[i] == null) {
-          mCGs.get(i).cleanup();
+          mCG.cleanup();
           mCGs.remove(i);
+          // Constructing a collection of mCGs so that 
+          // we don't have to do it again [performance]
+          mCGList = (CGEntry[])mCGs.values().toArray(new CGEntry[mCGs.size()]);
         } else {
-          mCGs.get(i).setSource(tuples[i]);
+          mCG.setSource(tuples[i]);
         }
       }
     }

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
 Tue Dec  1 23:55:24 2009
@@ -176,13 +176,15 @@
                {
                        int i;
                        dispatch(dest);
-      clearMaps();
-                       for (i = 0; i < exec.size(); i++)
+            clearMaps();
+            int execSize = exec.size();
+                       for (i = 0; i < execSize; i++)
                        {
-                               if (exec.get(i) != null)
+                           SplitColumn execElement = exec.get(i);
+                               if (execElement != null)
                                {
                                        // split is necessary
-                                       exec.get(i).split();
+                                       execElement.split();
                                }
                        }
                }
@@ -289,10 +291,11 @@
                 {
                         for (int i = 0; i < size; i++)
                         {
-                                if (children.get(i).projIndex != -1) // a 
leaf: set projection directly
-                                       
((Tuple)children.get(i).leaf.field).set(children.get(i).projIndex, ((Tuple) 
field).get(children.get(i).fieldIndex));
+                            SplitColumn child = children.get(i);
+                                if (child.projIndex != -1) // a leaf: set 
projection directly
+                                       
((Tuple)child.leaf.field).set(child.projIndex, ((Tuple) 
field).get(child.fieldIndex));
                                 else
-                                        children.get(i).field = ((Tuple) 
field).get(children.get(i).fieldIndex);
+                                        child.field = ((Tuple) 
field).get(child.fieldIndex);
                         }
                 } else if (st == Partition.SplitType.COLLECTION) {
                    DataBag srcBag, tgtBag;
@@ -300,17 +303,18 @@
                    Tuple tuple;
                    for (int i = 0; i < size; i++)
                    {
-                     if (children.get(i).projIndex != -1) // a leaf: set 
projection directly
+                     SplitColumn child = children.get(i);
+                     if (child.projIndex != -1) // a leaf: set projection 
directly
                      {
-                       tgtBag = 
(DataBag)((Tuple)children.get(i).leaf.field).get(children.get(i).projIndex);
+                       tgtBag = 
(DataBag)((Tuple)child.leaf.field).get(child.projIndex);
                      } else {
-                       tgtBag = (DataBag) children.get(i).field;
+                       tgtBag = (DataBag) child.field;
                        tgtBag.clear();
                      }
                      for (Iterator<Tuple> it = srcBag.iterator(); 
it.hasNext(); )
                      {
                        tuple = TypesUtils.createTuple(scratchSchema);
-                       tuple.set(0, it.next().get(children.get(i).fieldIndex));
+                       tuple.set(0, it.next().get(child.fieldIndex));
                        tgtBag.add(tuple);
                      }
                    }
@@ -320,7 +324,8 @@
        Object value;
                         for (int i = 0; i < size; i++)
                         {
-                                if (children.get(i).projIndex != -1) // a 
leaf: set projection directly
+                            SplitColumn child = children.get(i);
+                                if (child.projIndex != -1) // a leaf: set 
projection directly
          {
            for (it = keys.iterator(); it.hasNext(); )
            {
@@ -328,13 +333,13 @@
              value = ((Map<String, Object>) field).get(key);
              if (value == null)
                continue;
-                                          ((Map<String, Object>) 
(((Tuple)children.get(i).leaf.field).get(children.get(i).projIndex))).put(key, 
value);
+                                          ((Map<String, Object>) 
(((Tuple)child.leaf.field).get(child.projIndex))).put(key, value);
            }
          } else {
            for (it = keys.iterator(); it.hasNext(); )
            {
              key = it.next();
-                                          children.get(i).field = 
((Map<String, Object>) field).get(key);
+                                          child.field = ((Map<String, Object>) 
field).get(key);
            }
          }
                         }

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
 Tue Dec  1 23:55:24 2009
@@ -86,7 +86,8 @@
    */
   public static void resetTuple(Tuple tuple) {
     try {
-      for (int i = 0; i < tuple.size(); ++i) {
+      int tupleSize = tuple.size();
+      for (int i = 0; i < tupleSize; ++i) {
         tuple.set(i, null);
       }
     }


Reply via email to