Author: yanz
Date: Fri May 21 17:49:24 2010
New Revision: 947091

URL: http://svn.apache.org/viewvc?rev=947091&view=rev
Log:
PIG-1425 support of source table index on unsorted table in the mapred APIs 
(yanz)

Added:
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=947091&r1=947090&r2=947091&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Fri May 21 17:49:24 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    PIG-1425 support of source table index on unsorted table in the mapred 
APIs (yanz)
+
     PIG-1375 Support of multiple Zebra table writing through Pig (chaow via 
yanz)
 
     PIG-1351 Addition of type check when writing to basic table (chaow via 
yanz)

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=947091&r1=947090&r2=947091&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
 Fri May 21 17:49:24 2010
@@ -29,6 +29,8 @@ import org.apache.hadoop.zebra.io.BasicT
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.pig.data.Tuple;
 
 /**
  * Table expression for reading a BasicTable.
@@ -117,6 +119,12 @@ class BasicTableExpr extends TableExpr {
   } 
 
   @Override
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    return new BasicTableScanner(split, projection, conf);
+  }
+
+  @Override
   public Schema getSchema(Configuration conf) throws IOException {
     return BasicTable.Reader.getSchema(path, conf);
   }
@@ -131,4 +139,75 @@ class BasicTableExpr extends TableExpr {
   {
     BasicTable.dumpInfo(path.toString(), ps, conf, indent);
   }
+
+  /**
+   * Basic Table Scanner
+   */
+  class BasicTableScanner implements TableScanner {
+    private int tableIndex = -1;
+    private Integer[] virtualColumnIndices = null;
+    private TableScanner scanner = null;
+    
+    BasicTableScanner(RowTableSplit split, String projection,
+        Configuration conf) throws IOException, ParseException, ParseException 
{
+      tableIndex = split.getTableIndex();
+      virtualColumnIndices = Projection.getVirtualColumnIndices(projection);
+      BasicTable.Reader reader =
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), 
conf);
+      reader.setProjection(projection);
+      scanner = reader.getScanner(true, split.getSplit());
+    }
+    
+    @Override
+    public boolean advance() throws IOException {
+      return scanner.advance();
+    }
+    
+    @Override
+    public boolean atEnd() throws IOException {
+      return scanner.atEnd();
+    }
+    
+    @Override
+    public Schema getSchema() {
+      return scanner.getSchema();
+    }
+    
+    @Override
+    public void getKey(BytesWritable key) throws IOException {
+      scanner.getKey(key);
+    }
+    
+    @Override
+    public void getValue(Tuple row) throws IOException {
+      scanner.getValue(row);
+      if (virtualColumnIndices != null)
+      {
+        for (int i = 0; i < virtualColumnIndices.length; i++)
+        {
+          row.set(virtualColumnIndices[i], tableIndex);
+        }
+      }
+    }
+    
+    @Override
+    public boolean seekTo(BytesWritable key) throws IOException {
+      return scanner.seekTo(key);
+    }
+    
+    @Override
+    public void seekToEnd() throws IOException {
+      scanner.seekToEnd();
+    }
+    
+    @Override 
+    public void close() throws IOException {
+      scanner.close();
+    }
+    
+    @Override
+    public String getProjection() {
+      return scanner.getProjection();
+    }
+  }
 }

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=947091&r1=947090&r2=947091&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
 Fri May 21 17:49:24 2010
@@ -123,11 +123,8 @@ abstract class TableExpr {
    * @throws IOException
    */
   public TableScanner getScanner(RowTableSplit split, String projection,
-      Configuration conf) throws IOException, ParseException, ParseException {
-    BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), 
conf);
-    reader.setProjection(projection);
-    return reader.getScanner(true, split.getSplit());
+      Configuration conf) throws IOException, ParseException {
+    return null;
   }
   
   /**

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=947091&r1=947090&r2=947091&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
 Fri May 21 17:49:24 2010
@@ -235,10 +235,6 @@ public class TableInputFormat implements
    */
   public static void setProjection(JobConf conf, String projection) throws 
ParseException {
     conf.set(INPUT_PROJ, Schema.normalize(projection));
-
-    // virtual source_table columns require sorted table
-    if (Projection.getVirtualColumnIndices(projection) != null && 
!getSorted(conf))
-        throw new ParseException("The source_table virtual column is only 
availabe for sorted table unions.");
   }
   
   /**
@@ -266,10 +262,6 @@ public class TableInputFormat implements
     }
     
     conf.set(INPUT_PROJ, normalizedProjectionString);
-
-    // virtual source_table columns require sorted table
-    if (Projection.getVirtualColumnIndices(projection.toString()) != null && 
!getSorted(conf))
-      throw new ParseException("The source_table virtual column is only 
availabe for sorted table unions.");
   }  
 
   /**
@@ -718,6 +710,7 @@ public class TableInputFormat implements
     boolean first = true;
     PathFilter filter = null;
     List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
+    int[] realReaderIndices = new int[readers.size()];
 
     for (int i = 0; i < readers.size(); ++i) {
       BasicTable.Reader reader = readers.get(i);
@@ -727,6 +720,7 @@ public class TableInputFormat implements
       /* We can create input splits only if there does exist a valid column 
group for split.
        * Otherwise, we do not create input splits. */
       if (splitCGIndex >= 0) {        
+        realReaderIndices[realReaders.size()] = i;
         realReaders.add(reader);
         if (first)
         {
@@ -836,10 +830,10 @@ public class TableInputFormat implements
           batches[++numBatches] = splitLen;
         
         List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, 
splitCGIndex, batches, numBatches);
-    
+        int realTableIndex = realReaderIndices[tableIndex];
         for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
           RowSplit subSplit = it.next();
-          RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+          RowTableSplit split = new RowTableSplit(reader, subSplit, 
realTableIndex, conf);
           ret.add(split);
         }
       }
@@ -1050,14 +1044,16 @@ class SortedTableSplit implements InputS
  */
 class RowTableSplit implements InputSplit {
   String path = null;
+  int tableIndex;
   RowSplit split = null;
   String[] hosts = null;
   long length = 1;
 
-  public RowTableSplit(Reader reader, RowSplit split, JobConf conf)
+  public RowTableSplit(Reader reader, RowSplit split, int tableIndex, JobConf 
conf)
       throws IOException {
     this.path = reader.getPath();
     this.split = split;
+    this.tableIndex = tableIndex;
     BlockDistribution dataDist = reader.getBlockDistribution(split);
     if (dataDist != null) {
       length = dataDist.getLength();
@@ -1082,6 +1078,7 @@ class RowTableSplit implements InputSpli
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    tableIndex = WritableUtils.readVInt(in);
     path = WritableUtils.readString(in);
     int bool = WritableUtils.readVInt(in);
     if (bool == 1) {
@@ -1097,6 +1094,7 @@ class RowTableSplit implements InputSpli
 
   @Override
   public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, tableIndex);
     WritableUtils.writeString(out, path);
     if (split == null) {
       WritableUtils.writeVInt(out, 0);
@@ -1116,4 +1114,8 @@ class RowTableSplit implements InputSpli
   public RowSplit getSplit() {
     return split;
   }
+  
+  public int getTableIndex() {
+    return tableIndex;
+  }
 }

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=947091&r1=947090&r2=947091&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
 Fri May 21 17:49:24 2010
@@ -53,8 +53,6 @@ public class TableRecordReader implement
                  InputSplit split, JobConf conf) throws IOException, 
ParseException {
          if( split instanceof RowTableSplit ) {
                  RowTableSplit rowSplit = (RowTableSplit)split;
-                 if( Projection.getVirtualColumnIndices( projection ) != null )
-                         throw new IllegalArgumentException("virtual column 
requires union of multiple sorted tables");
                  scanner = expr.getScanner(rowSplit, projection, conf);
          } else {
                  SortedTableSplit tblSplit = (SortedTableSplit)split;

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=947091&r1=947090&r2=947091&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
 Fri May 21 17:49:24 2010
@@ -157,6 +157,13 @@ class TableUnionExpr extends CompositeTa
       throw new IllegalArgumentException("virtual column requires union of 
multiple tables");
     return new SortedTableUnionScanner(scanners, 
Projection.getVirtualColumnIndices(projection));
   }
+
+  @Override
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    BasicTableExpr expr = (BasicTableExpr) 
composite.get(split.getTableIndex());
+    return expr.getScanner(split, projection, conf);
+  }
 }
 
 /**

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java?rev=947091&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestUnsortedTableIndex.java
 Fri May 21 17:49:24 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TestBasicTable;
+import org.apache.hadoop.zebra.mapred.RowTableSplit;
+import org.apache.hadoop.zebra.mapred.TableInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.data.Tuple;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestUnsortedTableIndex {
+       private static Configuration conf;
+       private static JobConf jobConf;
+       private static Path path1, path2;
+
+       @BeforeClass
+       public static void setUpOnce() throws IOException {
+               TestBasicTable.setUpOnce();
+               conf = TestBasicTable.conf;
+    jobConf = new JobConf(conf);
+               path1 = new Path(TestBasicTable.rootPath, 
"TestUnsortedTableIndex1");
+               path2 = new Path(TestBasicTable.rootPath, 
"TestUnsortedTableIndex2");
+       }
+
+       @AfterClass
+       public static void tearDown() throws IOException {
+               BasicTable.drop(path1, conf);
+               BasicTable.drop(path2, conf);
+       }
+
+       @Test
+       public void testUnsortedTableIndex() 
+       throws IOException, ParseException, InterruptedException {
+               BasicTable.drop(path1, conf);
+               BasicTable.drop(path2, conf);
+               int total1 = TestBasicTable.createBasicTable(1, 100, "a, b, c, 
d, e, f", "[a, b]; [c, d]", null, path1, true);    
+               int total2 = TestBasicTable.createBasicTable(1, 100, "a, b, c, 
d, e, f", "[a, b]; [c, d]", null, path2, true);    
+
+               TableInputFormat inputFormat = new TableInputFormat();
+               TableInputFormat.setInputPaths(jobConf, path1, path2);
+               TableInputFormat.setProjection(jobConf, "source_table");
+               InputSplit[] splits = inputFormat.getSplits(jobConf, -1);
+    Assert.assertEquals(splits.length, 2);
+    for (int i = 0; i < 2; i++)
+    {
+           int count = 0;
+               RowTableSplit split = (RowTableSplit) splits[i];
+           TableRecordReader rr = (TableRecordReader) 
inputFormat.getRecordReader(split, jobConf, null);
+           Tuple t = TypesUtils.createTuple(1);
+      BytesWritable key = new BytesWritable();
+           while (rr.next(key, t)) {
+        int idx= (Integer) t.get(0);
+        Assert.assertEquals(idx, i);
+             count++;
+           }
+           rr.close();
+      if (i == 0)
+           Assert.assertEquals(count, total1);
+      else
+           Assert.assertEquals(count, total2);
+    }
+       }
+}


Reply via email to