Author: yanz
Date: Thu Apr  8 16:21:57 2010
New Revision: 931990

URL: http://svn.apache.org/viewvc?rev=931990&view=rev
Log:
PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader (xuefux 
via yanz)

Added:
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java
Modified:
    hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Thu Apr  8 
16:21:57 2010
@@ -16,6 +16,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader 
(xuefux via yanz)
+
     PIG-1306 Support of locally sorted input splits (yanz)
 
     PIG-1268 Need an ant target that runs all pig-related tests in Zebra 
(xuefuz via yanz)

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
 Thu Apr  8 16:21:57 2010
@@ -122,26 +122,6 @@ abstract class TableExpr {
    * @return A table scanner.
    * @throws IOException
    */
-  public TableScanner getScanner(UnsortedTableSplit split, String projection,
-      Configuration conf) throws IOException, ParseException {
-    BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), 
conf);
-    reader.setProjection(projection);
-    return reader.getScanner(split.getSplit(), true);
-  }
-  
-  /**
-   * Get a scanner with an unsorted split.
-   * 
-   * @param split
-   *          The range split.
-   * @param projection
-   *          The projection schema. It should never be null.
-   * @param conf
-   *          The configuration
-   * @return A table scanner.
-   * @throws IOException
-   */
   public TableScanner getScanner(RowTableSplit split, String projection,
       Configuration conf) throws IOException, ParseException, ParseException {
     BasicTable.Reader reader =

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
 Thu Apr  8 16:21:57 2010
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.zebra.tfile.RawComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -581,66 +580,6 @@ public class TableInputFormat implements
     conf.setLong("table.input.split.minSize", minSize);
   }
   
-  private static InputSplit[] getUnsortedSplits(JobConf conf, int numSplits,
-      TableExpr expr, List<BasicTable.Reader> readers,
-      List<BasicTableStatus> status) throws IOException {
-    long totalBytes = 0;
-    for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext();) {
-      BasicTableStatus s = it.next();
-      totalBytes += s.getSize();
-    }
-
-    long maxSplits = totalBytes / getMinSplitSize(conf);
-
-    if (numSplits > maxSplits) {
-      numSplits = -1;
-    }
-
-    ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
-    if (numSplits <= 0) {
-      if (totalBytes <= 0) {
-        BasicTable.Reader reader = readers.get(0);
-        UnsortedTableSplit split =
-          new UnsortedTableSplit(reader, null, conf);
-        ret.add(split);
-      } else {
-        for (int i = 0; i < readers.size(); ++i) {
-          BasicTable.Reader reader = readers.get(i);
-          List<RangeSplit> subSplits = reader.rangeSplit(-1);
-          for (Iterator<RangeSplit> it = subSplits.iterator(); it.hasNext();) {
-            UnsortedTableSplit split =
-              new UnsortedTableSplit(reader, it.next(), conf);
-            ret.add(split);
-          }
-        }
-      }
-    } else {
-      long goalSize = totalBytes / numSplits;
-
-      double SPLIT_SLOP = 1.1;
-      for (int i = 0; i < readers.size(); ++i) {
-        BasicTable.Reader reader = readers.get(i);
-        BasicTableStatus s = status.get(i);
-        int nSplits =
-            (int) ((s.getSize() + goalSize * (2 - SPLIT_SLOP)) / goalSize);
-        if (nSplits > 1) {
-          List<RangeSplit> subSplits = reader.rangeSplit(nSplits);
-          for (Iterator<RangeSplit> it = subSplits.iterator(); it.hasNext();) {
-            UnsortedTableSplit split =
-                new UnsortedTableSplit(reader, it.next(), conf);
-            ret.add(split);
-          }
-        } else {
-          UnsortedTableSplit split = new UnsortedTableSplit(reader, null, 
conf);
-          ret.add(split);
-        }
-      }
-    }
-
-    LOG.info("getSplits : returning " + ret.size() + " file splits.");
-    return ret.toArray(new InputSplit[ret.size()]);
-  }
-  
   private static class DummyFileInputFormat extends 
FileInputFormat<BytesWritable, Tuple> {
     /**
      * the next constant and class are copies from FileInputFormat
@@ -1109,85 +1048,6 @@ class SortedTableSplit implements InputS
 /**
  * Adaptor class for unsorted InputSplit for table.
  */
-class UnsortedTableSplit implements InputSplit {
-  String path = null;
-  RangeSplit split = null;
-  String[] hosts = null;
-  long length = 1;
-
-  public UnsortedTableSplit(Reader reader, RangeSplit split, JobConf conf)
-      throws IOException {
-    this.path = reader.getPath();
-    this.split = split;
-    BlockDistribution dataDist = reader.getBlockDistribution(split);
-    if (dataDist != null) {
-      length = dataDist.getLength();
-      hosts =
-          dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 
5));
-    }
-  }
-  
-  public UnsortedTableSplit() {
-    // no-op for Writable construction
-  }
-  
-  @Override
-  public long getLength() throws IOException {
-    return length;
-  }
-
-  @Override
-  public String[] getLocations() throws IOException {
-    if (hosts == null)
-    {
-      String[] tmp = new String[1];
-      tmp[0] = "";
-      return tmp;
-    }
-    return hosts;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    path = WritableUtils.readString(in);
-    int bool = WritableUtils.readVInt(in);
-    if (bool == 1) {
-      if (split == null) split = new RangeSplit();
-      split.readFields(in);
-    }
-    else {
-      split = null;
-    }
-    hosts = WritableUtils.readStringArray(in);
-    length = WritableUtils.readVLong(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeString(out, path);
-    if (split == null) {
-      WritableUtils.writeVInt(out, 0);
-    }
-    else {
-      WritableUtils.writeVInt(out, 1);
-      split.write(out);
-    }
-    WritableUtils.writeStringArray(out, hosts);
-    WritableUtils.writeVLong(out, length);
-  }
-
-  public String getPath() {
-    return path;
-  }
-  
-  public RangeSplit getSplit() {
-    return split;
-  }
-}
-
-/**
- * Adaptor class for unsorted InputSplit for table.
- */
 class RowTableSplit implements InputSplit {
   String path = null;
   RowSplit split = null;

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
 Thu Apr  8 16:21:57 2010
@@ -18,7 +18,6 @@ package org.apache.hadoop.zebra.mapred;
 
 import java.io.IOException;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -51,22 +50,16 @@ public class TableRecordReader implement
    * @throws IOException
    */
   public TableRecordReader(TableExpr expr, String projection,
-      InputSplit split,
-      JobConf conf) throws IOException, ParseException {
-    if (split != null && split instanceof RowTableSplit) {
-      RowTableSplit rowSplit = (RowTableSplit) split;
-      if (!expr.sortedSplitRequired() && 
Projection.getVirtualColumnIndices(projection) != null)
-        throw new IllegalArgumentException("virtual column requires union of 
multiple sorted tables");
-      scanner = expr.getScanner(rowSplit, projection, conf);
-    } else if (expr.sortedSplitRequired()) {
-      SortedTableSplit tblSplit = (SortedTableSplit) split;
-      scanner =
-          expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
-              conf);
-    } else {
-      UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
-      scanner = expr.getScanner(tblSplit, projection, conf);
-    }
+                 InputSplit split, JobConf conf) throws IOException, 
ParseException {
+         if( split instanceof RowTableSplit ) {
+                 RowTableSplit rowSplit = (RowTableSplit)split;
+                 if( Projection.getVirtualColumnIndices( projection ) != null )
+                         throw new IllegalArgumentException("virtual column 
requires union of multiple sorted tables");
+                 scanner = expr.getScanner(rowSplit, projection, conf);
+         } else {
+                 SortedTableSplit tblSplit = (SortedTableSplit)split;
+                 scanner = expr.getScanner( tblSplit.getBegin(), 
tblSplit.getEnd(), projection, conf );
+         }
   }
   
   @Override

Added: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java?rev=931990&view=auto
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java
 (added)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java
 Thu Apr  8 16:21:57 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.data.DataReaderWriter;
+
+public class SortedTableSplitComparable implements 
WritableComparable<SortedTableSplitComparable> {
+    private static final long serialVersionUID = 1L;
+    
+    protected Integer index;
+    
+    //need a default constructor to be able to de-serialize using just 
+    // the Writable interface
+    public SortedTableSplitComparable(){}
+    
+    public SortedTableSplitComparable(int index){
+        this.index = index;
+    }
+
+
+    @Override
+    public int compareTo(SortedTableSplitComparable other) {
+        return Integer.signum( index - other.index );
+    }
+
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        index = (Integer)DataReaderWriter.readDatum(in);
+    }
+
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        DataReaderWriter.writeDatum(out, index);
+    }
+
+    @Override
+    public String toString(){
+        return "Index = " + index ; 
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        return index;
+    }
+
+    /* (non-Javadoc)
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        SortedTableSplitComparable other = (SortedTableSplitComparable) obj;
+        return this.index.intValue() == other.index.intValue();
+    }
+}

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
 Thu Apr  8 16:21:57 2010
@@ -111,26 +111,6 @@ abstract class TableExpr {
   }
 
   /**
-   * Get a scanner with an unsorted split.
-   * 
-   * @param split
-   *          The range split.
-   * @param projection
-   *          The projection schema. It should never be null.
-   * @param conf
-   *          The configuration
-   * @return A table scanner.
-   * @throws IOException
-   */
-  public TableScanner getScanner(UnsortedTableSplit split, String projection,
-      Configuration conf) throws IOException, ParseException {
-    BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), 
conf);
-    reader.setProjection(projection);
-    return reader.getScanner(split.getSplit(), true);
-  }
-  
-  /**
    * Get a scanner with an row split.
    * 
    * @param split

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
 Thu Apr  8 16:21:57 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.zebra.tfile.RawComparable;
@@ -162,7 +163,7 @@ public class TableInputFormat extends In
   private static final String GLOBALLY_SORTED = "globally_sorted";
   private static final String LOCALLY_SORTED = "locally_sorted";
   static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
-
+  
   /**
    * Set the paths to the input table.
    * 
@@ -396,7 +397,6 @@ public class TableInputFormat extends In
    *          JobContext object.
    * @param sortInfo
    *          ZebraSortInfo object containing sorting information.
-   *        
    */
   
    public static void requireSortedTable(JobContext jobContext, ZebraSortInfo 
sortInfo) throws IOException {
@@ -590,7 +590,7 @@ public class TableInputFormat extends In
         bd = BlockDistribution.sum(bd, 
reader.getBlockDistribution((RangeSplit) null));
       }
       
-      SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
+      SortedTableSplit split = new SortedTableSplit(0, null, null, bd, conf);
       splits.add(split);
       return splits;
     }
@@ -637,7 +637,7 @@ public class TableInputFormat extends In
         beginB = new BytesWritable(begin.buffer());
       if (end != null)
         endB = new BytesWritable(end.buffer());
-      SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
+      SortedTableSplit split = new SortedTableSplit(i, beginB, endB, bd, conf);
       splits.add(split);
     }
     LOG.info("getSplits : returning " + splits.size() + " sorted splits.");
@@ -1033,13 +1033,33 @@ public class TableInputFormat extends In
        throw new IOException("Projection parsing failed : "+e.getMessage());
     }
   }
+
+       /**
+        * Get a comparable object from the given InputSplit object.
+        * 
+        * @param inputSplit An InputSplit instance. It should be type of 
SortedTableSplit.
+        * @return a comparable object of type WritableComparable
+        */
+       public static WritableComparable<?> 
getSortedTableSplitComparable(InputSplit inputSplit) {
+        SortedTableSplit split = null;
+        if( inputSplit instanceof SortedTableSplit ) {
+            split = (SortedTableSplit)inputSplit;
+        } else {
+            throw new RuntimeException( "LoadFunc expected split of type [" + 
+                       SortedTableSplit.class.getCanonicalName() + "]" );
+        }
+               return new SortedTableSplitComparable( split.getIndex() );
+       }
+
 }
 
 /**
  * Adaptor class for sorted InputSplit for table.
  */
 class SortedTableSplit extends InputSplit implements Writable {
-
+       // the order of the split in all splits generated.
+       private int index;
+       
        BytesWritable begin = null, end = null;
   
   String[] hosts;
@@ -1050,8 +1070,10 @@ class SortedTableSplit extends InputSpli
     // no-op for Writable construction
   }
   
-  public SortedTableSplit(BytesWritable begin, BytesWritable end,
+  public SortedTableSplit(int index, BytesWritable begin, BytesWritable end,
       BlockDistribution bd, Configuration conf) {
+         this.index = index;
+         
     if (begin != null) {
       this.begin = new BytesWritable();
       this.begin.set(begin.getBytes(), 0, begin.getLength());
@@ -1068,6 +1090,10 @@ class SortedTableSplit extends InputSpli
     }
   }
   
+       public int getIndex() {
+               return index;
+       }
+
   @Override
   public long getLength() throws IOException {
     return length;
@@ -1141,85 +1167,6 @@ class SortedTableSplit extends InputSpli
 /**
  * Adaptor class for unsorted InputSplit for table.
  */
-class UnsortedTableSplit extends InputSplit implements Writable {
-  String path = null;
-  RangeSplit split = null;
-  String[] hosts = null;
-  long length = 1;
-
-  public UnsortedTableSplit(Reader reader, RangeSplit split, Configuration 
conf)
-      throws IOException {
-    this.path = reader.getPath();
-    this.split = split;
-    BlockDistribution dataDist = reader.getBlockDistribution(split);
-    if (dataDist != null) {
-      length = dataDist.getLength();
-      hosts =
-          dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 
5));
-    }
-  }
-  
-  public UnsortedTableSplit() {
-    // no-op for Writable construction
-  }
-  
-  @Override
-  public long getLength() throws IOException {
-    return length;
-  }
-
-  @Override
-  public String[] getLocations() throws IOException {
-    if (hosts == null)
-    {
-      String[] tmp = new String[1];
-      tmp[0] = "";
-      return tmp;
-    }
-    return hosts;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    path = WritableUtils.readString(in);
-    int bool = WritableUtils.readVInt(in);
-    if (bool == 1) {
-      if (split == null) split = new RangeSplit();
-      split.readFields(in);
-    }
-    else {
-      split = null;
-    }
-    hosts = WritableUtils.readStringArray(in);
-    length = WritableUtils.readVLong(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeString(out, path);
-    if (split == null) {
-      WritableUtils.writeVInt(out, 0);
-    }
-    else {
-      WritableUtils.writeVInt(out, 1);
-      split.write(out);
-    }
-    WritableUtils.writeStringArray(out, hosts);
-    WritableUtils.writeVLong(out, length);
-  }
-
-  public String getPath() {
-    return path;
-  }
-  
-  public RangeSplit getSplit() {
-    return split;
-  }
-}
-
-/**
- * Adaptor class for unsorted InputSplit for table.
- */
 class RowTableSplit extends InputSplit implements Writable{
   /**
      * 

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
 Thu Apr  8 16:21:57 2010
@@ -52,20 +52,19 @@ public class TableRecordReader extends R
    * @throws IOException
    */
   public TableRecordReader(TableExpr expr, String projection,
-      InputSplit split, JobContext jobContext) throws IOException, 
ParseException {
+                 InputSplit split, JobContext jobContext) throws IOException, 
ParseException {
          Configuration conf = jobContext.getConfiguration();
-         if (split instanceof RowTableSplit) {
-      RowTableSplit rowSplit = (RowTableSplit) split;
-      if ((!expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1) &&
-          Projection.getVirtualColumnIndices(projection) != null)
-        throw new IllegalArgumentException("virtual column requires union of 
multiple sorted tables");
-      scanner = expr.getScanner(rowSplit, projection, conf);
+         if( split instanceof RowTableSplit ) {
+                 RowTableSplit rowSplit = (RowTableSplit)split;
+                 if( Projection.getVirtualColumnIndices( projection ) != null 
&& 
+                     ( !expr.sortedSplitRequired() || rowSplit.getTableIndex() 
== -1 )  ) {
+                         throw new IllegalArgumentException("virtual column 
requires union of multiple sorted tables");
+                 }
+                 scanner = expr.getScanner(rowSplit, projection, conf);
          } else {
-      SortedTableSplit tblSplit = (SortedTableSplit) split;
-      scanner =
-          expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
-              conf);
-    }
+                 SortedTableSplit tblSplit = (SortedTableSplit)split;
+                 scanner = expr.getScanner( tblSplit.getBegin(), 
tblSplit.getEnd(), projection, conf );
+         }
   }
   
   @Override

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
 Thu Apr  8 16:21:57 2010
@@ -32,14 +32,15 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
 import org.apache.hadoop.zebra.mapreduce.TableRecordReader;
 import org.apache.hadoop.zebra.mapreduce.TableInputFormat.SplitMode;
-import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.ColumnType;
 import org.apache.hadoop.zebra.schema.Schema;
@@ -50,6 +51,7 @@ import org.apache.pig.Expression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPushDown;
+import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
@@ -65,7 +67,7 @@ import org.apache.pig.CollectableLoadFun
  * Pig IndexableLoadFunc and Slicer for Zebra Table
  */
 public class TableLoader extends LoadFunc implements LoadMetadata, 
LoadPushDown,
-        IndexableLoadFunc, CollectableLoadFunc {
+        IndexableLoadFunc, CollectableLoadFunc, OrderedLoadFunc {
     static final Log LOG = LogFactory.getLog(TableLoader.class);
 
     private static final String UDFCONTEXT_PROJ_STRING = 
"zebra.UDFContext.projectionString";
@@ -423,11 +425,17 @@ public class TableLoader extends LoadFun
         public void setUDFContextSignature(String signature) {
             udfContextSignature = signature;
         }
+
+               @Override
+               public WritableComparable<?> getSplitComparable(InputSplit 
split)
+                               throws IOException {
+               return TableInputFormat.getSortedTableSplitComparable( split );
+               }
         
-    @Override
-    public void ensureAllKeyInstancesInSameSplit() throws IOException {
-      Properties properties = 
UDFContext.getUDFContext().getUDFProperties(this.getClass(),
-          new String[] { udfContextSignature } );
-      properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, "true");
-    }
+               @Override
+               public void ensureAllKeyInstancesInSameSplit() throws 
IOException {
+                       Properties properties = 
UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+                                       new String[] { udfContextSignature } );
+                       properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, 
"true");
+               }
 }

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
 Thu Apr  8 16:21:57 2010
@@ -127,4 +127,26 @@ public class TestTfileSplit {
 
                Assert.assertEquals(splits.size(), 0);
        }
+       
+       @Test
+       public void testSortedSplitOrdering() throws IOException, 
ParseException {
+               BasicTable.drop(path, conf);
+               TestBasicTable.createBasicTable(1, 1000000, "a, b, c, d, e, f", 
"[a, e, d]", "a", path, true);    
+
+               TableInputFormat inputFormat = new TableInputFormat();
+               Job job = new Job(conf);
+               inputFormat.setInputPaths(job, path);
+               inputFormat.setMinSplitSize(job, 100);
+               inputFormat.setProjection(job, "d");
+               inputFormat.requireSortedTable( job, null );
+               List<InputSplit> splits = inputFormat.getSplits(job);
+               
+               int index = 0;
+               for( InputSplit is : splits ) {
+                       Assert.assertTrue( is instanceof SortedTableSplit );
+                       SortedTableSplit split = (SortedTableSplit)is;
+                       Assert.assertEquals( index++, split.getIndex() );
+               }
+       }
+       
 }


Reply via email to