Author: yanz
Date: Thu Apr 8 22:45:42 2010
New Revision: 932157
URL: http://svn.apache.org/viewvc?rev=932157&view=rev
Log:
PIG-1291 Support of virtual column, source_table, on unsorted table (yanz)
Added:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=932157&r1=932156&r2=932157&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Thu Apr 8
22:45:42 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ PIG-1291 Support of virtual column "source_table" on unsorted table (yanz)
+
PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader
(xuefux via yanz)
PIG-1306 Support of locally sorted input splits (yanz)
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=932157&r1=932156&r2=932157&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
Thu Apr 8 22:45:42 2010
@@ -265,10 +265,6 @@ public class TableInputFormat extends In
*/
private static void setProjection(Configuration conf, String projection)
throws ParseException {
conf.set(INPUT_PROJ, Schema.normalize(projection));
-
- // virtual source_table columns require sorted table
- if (Projection.getVirtualColumnIndices(projection) != null && !getSorted(
conf ))
- throw new ParseException("The source_table virtual column is only
availabe for sorted table unions.");
}
/**
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=932157&r1=932156&r2=932157&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
Thu Apr 8 22:45:42 2010
@@ -56,10 +56,6 @@ public class TableRecordReader extends R
Configuration conf = jobContext.getConfiguration();
if( split instanceof RowTableSplit ) {
RowTableSplit rowSplit = (RowTableSplit)split;
- if( Projection.getVirtualColumnIndices( projection ) != null
&&
- ( !expr.sortedSplitRequired() || rowSplit.getTableIndex()
== -1 ) ) {
- throw new IllegalArgumentException("virtual column
requires union of multiple sorted tables");
- }
scanner = expr.getScanner(rowSplit, projection, conf);
} else {
SortedTableSplit tblSplit = (SortedTableSplit)split;
Added:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java?rev=932157&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
(added)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java
Thu Apr 8 22:45:42 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.HashMap;
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.BaseTestCase;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestBasicTableSourceTableIndex extends BaseTestCase {
+ private static Path pathTable1, pathTable2;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ init();
+
+ pathTable1 = getTableFullPath("TestBasicTableSourceTableIndex1");
+ pathTable2 = getTableFullPath("TestBasicTableSourceTableIndex2");
+ removeDir(pathTable1);
+ removeDir(pathTable2);
+
+ /*
+ * create 1st basic table;
+ */
+
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable1,
+ "a:string,b,c:string", "[a,b];[c]", conf);
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 2;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + i + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key1" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+
+ /*
+ * create 2nd basic table;
+ */
+
+ writer = new BasicTable.Writer(pathTable2, "a:string,b,d:string",
+ "[a,b];[d]", conf);
+ schema = writer.getSchema();
+ tuple = TypesUtils.createTuple(schema);
+
+ inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ int j;
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ j = i + 2;
+ for (int k = 0; k < tuple.size(); ++k) {
+ try {
+ tuple.set(k, b + "_" + j + "" + k);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+ }
+ inserters[i].insert(new BytesWritable(("key2" + i).getBytes()), tuple);
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ @Test
+ public void testReader() throws ExecException, IOException {
+ String str1 = pathTable1.toString();
+ String str2 = pathTable2.toString();
+
+ String query = "records = LOAD '" + str1 + "," + str2
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('source_table, a,
b, c, d');";
+ System.out.println(query);
+ pigServer.registerQuery(query);
+
+ // Verify union table
+ HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable
+ = new HashMap<Integer, ArrayList<ArrayList<Object>>>();
+
+ ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>();
+ for (int i = 0; i < 10; i++)
+ addResultRow(rows, 0, i+"_00", i+"_01", i+"_02");
+ for (int i = 0; i < 10; i++)
+ addResultRow(rows, 0, i+"_10", i+"_11", i+"_12");
+ resultTable.put(0, rows);
+
+ rows = new ArrayList<ArrayList<Object>>();
+ for (int i = 0; i < 10; i++)
+ addResultRow(rows, 0, i+"_20", i+"_21", i+"_22");
+ for (int i = 0; i < 10; i++)
+ addResultRow(rows, 0, i+"_30", i+"_31", i+"_32");
+ resultTable.put(1, rows);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records");
+ int numbRows = verifyTable(resultTable, 1, 0, it);
+
+ Assert.assertEquals(numbRows, 40);
+ }
+
+ /**
+ * Test source table index from a single table
+ * @throws ExecException
+ * @throws IOException
+ */
+ @Test
+ public void testSingleReader() throws ExecException, IOException {
+ String str1 = pathTable1.toString();
+
+ String query = "records = LOAD '" + str1
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('source_table, a,
b, c, d');";
+ System.out.println(query);
+ pigServer.registerQuery(query);
+
+ // Verify union table
+ HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable
+ = new HashMap<Integer, ArrayList<ArrayList<Object>>>();
+
+ ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>();
+ for (int i = 0; i < 10; i++)
+ addResultRow(rows, 0, i+"_00", i+"_01", i+"_02");
+ for (int i = 0; i < 10; i++)
+ addResultRow(rows, 0, i+"_10", i+"_11", i+"_12");
+ resultTable.put(0, rows);
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records");
+ int numbRows = verifyTable(resultTable, 1, 0, it);
+
+ Assert.assertEquals(numbRows, 20);
+ }
+
+ /**
+ *Add a row to expected results table
+ *
+ */
+ private void addResultRow(ArrayList<ArrayList<Object>> resultTable, Object
... values) {
+ ArrayList<Object> resultRow = new ArrayList<Object>();
+
+ for (int i = 0; i < values.length; i++) {
+ resultRow.add(values[i]);
+ }
+ resultTable.add(resultRow);
+ }
+}
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java?rev=932157&r1=932156&r2=932157&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
Thu Apr 8 22:45:42 2010
@@ -19,17 +19,13 @@
package org.apache.hadoop.zebra.pig;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.StringTokenizer;
import junit.framework.Assert;
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.zebra.io.BasicTable;
@@ -37,15 +33,9 @@ import org.apache.hadoop.zebra.io.TableI
import org.apache.hadoop.zebra.pig.TableStorer;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.types.TypesUtils;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
-import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.hadoop.zebra.BaseTestCase;
@@ -208,6 +198,32 @@ public class TestSortedTableUnion extend
Assert.assertEquals(index+"_02", RowValue3.get(2));
}
Assert.assertEquals(20, row);
+
+ /*
+ * Test source table index on a single sorted table
+ */
+ String query6 = "records4 = LOAD '"
+ + newPath.toString() + "1"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b, SF_c," +
+ "source_table, SF_d, SF_e, SF_f, SF_g', 'sorted');";
+ pigServer.registerQuery(query6);
+ Iterator<Tuple> it4 = pigServer.openIterator("records4");
+ row = 0;
+ while (it4.hasNext()) {
+ RowValue3 = it4.next();
+ Assert.assertEquals(8, RowValue3.size());
+ row++;
+ index = row-1;
+ Assert.assertEquals(0, RowValue3.get(3));
+ Assert.assertEquals(index+"_01", RowValue3.get(1));
+ Assert.assertEquals(index+"_00", RowValue3.get(0));
+ Assert.assertEquals(index+"_06", RowValue3.get(7));
+ Assert.assertEquals(index+"_05", RowValue3.get(6));
+ Assert.assertEquals(index+"_04", RowValue3.get(5));
+ Assert.assertEquals(index+"_03", RowValue3.get(4));
+ Assert.assertEquals(index+"_02", RowValue3.get(2));
+ }
+ Assert.assertEquals(10, row);
}
}