Author: yanz
Date: Thu Apr  8 22:46:17 2010
New Revision: 932158

URL: http://svn.apache.org/viewvc?rev=932158&view=rev
Log:
PIG-1291 Support of virtual column, source_table, on unsorted table (yanz)

Added:
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=932158&r1=932157&r2=932158&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Thu Apr  8 22:46:17 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    PIG-1291 Support of virtual column "source_table" on unsorted table (yanz)
+
     PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader 
(xuefux via yanz)
 
     PIG-1306 Support of locally sorted input splits (yanz)

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=932158&r1=932157&r2=932158&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
 Thu Apr  8 22:46:17 2010
@@ -265,10 +265,6 @@ public class TableInputFormat extends In
    */
   private static void setProjection(Configuration conf, String projection) 
throws ParseException {
       conf.set(INPUT_PROJ, Schema.normalize(projection));
-
-    // virtual source_table columns require sorted table
-    if (Projection.getVirtualColumnIndices(projection) != null && !getSorted( 
conf ))
-        throw new ParseException("The source_table virtual column is only 
availabe for sorted table unions.");
   }
 
   /**

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=932158&r1=932157&r2=932158&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
 Thu Apr  8 22:46:17 2010
@@ -56,10 +56,6 @@ public class TableRecordReader extends R
          Configuration conf = jobContext.getConfiguration();
          if( split instanceof RowTableSplit ) {
                  RowTableSplit rowSplit = (RowTableSplit)split;
-                 if( Projection.getVirtualColumnIndices( projection ) != null 
&& 
-                     ( !expr.sortedSplitRequired() || rowSplit.getTableIndex() 
== -1 )  ) {
-                         throw new IllegalArgumentException("virtual column 
requires union of multiple sorted tables");
-                 }
                  scanner = expr.getScanner(rowSplit, projection, conf);
          } else {
                  SortedTableSplit tblSplit = (SortedTableSplit)split;

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java?rev=932158&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
 Thu Apr  8 22:46:17 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.HashMap;
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.BaseTestCase;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestBasicTableSourceTableIndex extends BaseTestCase {
+  private static Path pathTable1, pathTable2;
+  
+  @BeforeClass
+  public static void setUp() throws Exception {
+    init();
+    
+    pathTable1 = getTableFullPath("TestBasicTableSourceTableIndex1");
+    pathTable2 = getTableFullPath("TestBasicTableSourceTableIndex2");    
+    removeDir(pathTable1);
+    removeDir(pathTable2);
+
+    /*
+     * create 1st basic table;
+     */
+
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable1,
+        "a:string,b,c:string", "[a,b];[c]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 2;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, b + "_" + i + "" + k);
+          } catch (ExecException e) {
+            e.printStackTrace();
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key1" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+    
+    /*
+     * create 2nd basic table;
+     */
+
+    writer = new BasicTable.Writer(pathTable2, "a:string,b,d:string",
+        "[a,b];[d]", conf);
+    schema = writer.getSchema();
+    tuple = TypesUtils.createTuple(schema);
+
+    inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    int j;
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        j = i + 2;
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, b + "_" + j + "" + k);
+          } catch (ExecException e) {
+            e.printStackTrace();
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key2" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+         writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  @Test
+  public void testReader() throws ExecException, IOException {
+    String str1 = pathTable1.toString();
+    String str2 = pathTable2.toString();    
+
+    String query = "records = LOAD '" + str1 + "," + str2
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('source_table, a, 
b, c, d');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    
+ // Verify union table
+    HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable
+        = new HashMap<Integer, ArrayList<ArrayList<Object>>>();
+    
+    ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>();
+    for (int i = 0; i < 10; i++)
+      addResultRow(rows, 0,  i+"_00",  i+"_01",  i+"_02");
+    for (int i = 0; i < 10; i++)
+      addResultRow(rows, 0,  i+"_10",  i+"_11",  i+"_12");
+    resultTable.put(0, rows);
+    
+    rows = new ArrayList<ArrayList<Object>>();
+    for (int i = 0; i < 10; i++)
+      addResultRow(rows, 0,  i+"_20",  i+"_21",  i+"_22");
+    for (int i = 0; i < 10; i++)
+      addResultRow(rows, 0,  i+"_30",  i+"_31",  i+"_32");
+    resultTable.put(1, rows);
+    
+    // Verify union table
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    int numbRows = verifyTable(resultTable, 1, 0, it);
+    
+    Assert.assertEquals(numbRows, 40);
+  }
+  
+  /**
+   * Test source table index from a single table
+   * @throws ExecException
+   * @throws IOException
+   */
+  @Test
+  public void testSingleReader() throws ExecException, IOException {
+    String str1 = pathTable1.toString();    
+
+    String query = "records = LOAD '" + str1
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader('source_table, a, 
b, c, d');";
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    
+ // Verify union table
+    HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable
+        = new HashMap<Integer, ArrayList<ArrayList<Object>>>();
+    
+    ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>();
+    for (int i = 0; i < 10; i++)
+      addResultRow(rows, 0,  i+"_00",  i+"_01",  i+"_02");
+    for (int i = 0; i < 10; i++)
+      addResultRow(rows, 0,  i+"_10",  i+"_11",  i+"_12");
+    resultTable.put(0, rows);
+    
+    // Verify union table
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    int numbRows = verifyTable(resultTable, 1, 0, it);
+    
+    Assert.assertEquals(numbRows, 20);
+  }
+  
+  /**
+   *Add a row to expected results table
+   * 
+   */
+  private void addResultRow(ArrayList<ArrayList<Object>> resultTable, Object 
... values) {
+    ArrayList<Object> resultRow = new ArrayList<Object>();
+    
+    for (int i = 0; i < values.length; i++) {
+      resultRow.add(values[i]);
+    }
+    resultTable.add(resultRow);
+  }
+}

Modified: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java?rev=932158&r1=932157&r2=932158&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
 Thu Apr  8 22:46:17 2010
@@ -19,17 +19,13 @@
 package org.apache.hadoop.zebra.pig;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Iterator;
 import java.util.StringTokenizer;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.zebra.io.BasicTable;
@@ -37,15 +33,9 @@ import org.apache.hadoop.zebra.io.TableI
 import org.apache.hadoop.zebra.pig.TableStorer;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.types.TypesUtils;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.hadoop.zebra.BaseTestCase;
@@ -208,6 +198,32 @@ public class TestSortedTableUnion extend
       Assert.assertEquals(index+"_02", RowValue3.get(2));
     }
     Assert.assertEquals(20, row);
+    
+    /*
+     * Test source table index on a single sorted table
+     */
+    String query6 = "records4 = LOAD '"
+      + newPath.toString() + "1"
+      + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b, SF_c," +
+               "source_table, SF_d, SF_e, SF_f, SF_g', 'sorted');";
+    pigServer.registerQuery(query6);
+    Iterator<Tuple> it4 = pigServer.openIterator("records4");
+    row = 0;
+    while (it4.hasNext()) {
+      RowValue3 = it4.next();
+      Assert.assertEquals(8, RowValue3.size());
+      row++;
+      index = row-1;
+      Assert.assertEquals(0, RowValue3.get(3));
+      Assert.assertEquals(index+"_01", RowValue3.get(1));
+      Assert.assertEquals(index+"_00", RowValue3.get(0));
+      Assert.assertEquals(index+"_06", RowValue3.get(7));
+      Assert.assertEquals(index+"_05", RowValue3.get(6));
+      Assert.assertEquals(index+"_04", RowValue3.get(5));
+      Assert.assertEquals(index+"_03", RowValue3.get(4));
+      Assert.assertEquals(index+"_02", RowValue3.get(2));
+    }
+    Assert.assertEquals(10, row);
   }
   
 }


Reply via email to