Author: yanz Date: Thu Apr 8 22:46:17 2010 New Revision: 932158 URL: http://svn.apache.org/viewvc?rev=932158&view=rev Log: PIG-1291 Support of virtual column, source_table, on unsorted table (yanz)
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=932158&r1=932157&r2=932158&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Thu Apr 8 22:46:17 2010 @@ -18,6 +18,8 @@ Trunk (unreleased changes) IMPROVEMENTS + PIG-1291 Support of virtual column "source_table" on unsorted table (yanz) + PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader (xuefux via yanz) PIG-1306 Support of locally sorted input splits (yanz) Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=932158&r1=932157&r2=932158&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Thu Apr 8 22:46:17 2010 @@ -265,10 +265,6 @@ public class TableInputFormat extends In */ private static void setProjection(Configuration conf, String projection) throws ParseException { conf.set(INPUT_PROJ, Schema.normalize(projection)); - - // virtual source_table columns require sorted table - if (Projection.getVirtualColumnIndices(projection) != null && !getSorted( conf )) - throw new ParseException("The source_table virtual column is only availabe for sorted table unions."); } /** Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=932158&r1=932157&r2=932158&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java Thu Apr 8 22:46:17 2010 @@ -56,10 +56,6 @@ public class TableRecordReader extends R Configuration conf = jobContext.getConfiguration(); if( split instanceof RowTableSplit ) { RowTableSplit rowSplit = (RowTableSplit)split; - if( Projection.getVirtualColumnIndices( projection ) != null && - ( !expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1 ) ) { - throw new IllegalArgumentException("virtual column requires union of multiple sorted tables"); - } scanner = expr.getScanner(rowSplit, projection, conf); } else { SortedTableSplit tblSplit = (SortedTableSplit)split; Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java?rev=932158&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java (added) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableSourceTableIndex.java Thu Apr 8 22:46:17 2010 @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.pig; + +import java.io.IOException; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.HashMap; +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.zebra.BaseTestCase; +import org.apache.hadoop.zebra.io.BasicTable; +import org.apache.hadoop.zebra.io.TableInserter; +import org.apache.hadoop.zebra.schema.Schema; +import org.apache.hadoop.zebra.types.TypesUtils; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestBasicTableSourceTableIndex extends BaseTestCase { + private static Path pathTable1, pathTable2; + + @BeforeClass + public static void setUp() throws Exception { + init(); + + pathTable1 = getTableFullPath("TestBasicTableSourceTableIndex1"); + pathTable2 = getTableFullPath("TestBasicTableSourceTableIndex2"); + removeDir(pathTable1); + removeDir(pathTable2); + + /* + * create 1st basic table; + */ + + BasicTable.Writer writer = new BasicTable.Writer(pathTable1, + "a:string,b,c:string", "[a,b];[c]", conf); + Schema schema = writer.getSchema(); + Tuple tuple = TypesUtils.createTuple(schema); + + final int numsBatch = 10; + final int numsInserters = 2; + TableInserter[] inserters = new TableInserter[numsInserters]; + for (int i = 0; i < numsInserters; i++) { + inserters[i] = writer.getInserter("ins" + i, false); + } + + for (int b = 0; b < numsBatch; b++) { + for (int i = 0; i < numsInserters; i++) { + TypesUtils.resetTuple(tuple); + for (int k = 0; k < tuple.size(); ++k) { + try { + tuple.set(k, b + "_" + i + "" + k); + } catch (ExecException e) { + e.printStackTrace(); + } + } + inserters[i].insert(new BytesWritable(("key1" + i).getBytes()), tuple); + } + } + for (int i = 0; i < numsInserters; i++) { + inserters[i].close(); + } + writer.close(); + + /* + * create 2nd basic table; + */ + + writer = new BasicTable.Writer(pathTable2, "a:string,b,d:string", + "[a,b];[d]", conf); + schema = writer.getSchema(); + tuple = TypesUtils.createTuple(schema); + + inserters = new TableInserter[numsInserters]; + for (int i = 0; i < numsInserters; i++) { + inserters[i] = writer.getInserter("ins" + i, false); + } + + int j; + for (int b = 0; b < numsBatch; b++) { + for (int i = 0; i < numsInserters; i++) { + TypesUtils.resetTuple(tuple); + j = i + 2; + for (int k = 0; k < tuple.size(); ++k) { + try { + tuple.set(k, b + "_" + j + "" + k); + } catch (ExecException e) { + e.printStackTrace(); + } + } + inserters[i].insert(new BytesWritable(("key2" + i).getBytes()), tuple); + } + } + for (int i = 0; i < numsInserters; i++) { + inserters[i].close(); + } + writer.close(); + } + + @AfterClass + public static void tearDown() throws Exception { + pigServer.shutdown(); + } + + @Test + public void testReader() throws ExecException, IOException { + String str1 = pathTable1.toString(); + String str2 = pathTable2.toString(); + + String query = "records = LOAD '" + str1 + "," + str2 + + "' USING org.apache.hadoop.zebra.pig.TableLoader('source_table, a, b, c, d');"; + System.out.println(query); + pigServer.registerQuery(query); + + // Verify union table + HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable + = new HashMap<Integer, ArrayList<ArrayList<Object>>>(); + + ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>(); + for (int i = 0; i < 10; i++) + addResultRow(rows, 0, i+"_00", i+"_01", i+"_02"); + for (int i = 0; i < 10; i++) + addResultRow(rows, 0, i+"_10", i+"_11", i+"_12"); + resultTable.put(0, rows); + + rows = new ArrayList<ArrayList<Object>>(); + for (int i = 0; i < 10; i++) + addResultRow(rows, 0, i+"_20", i+"_21", i+"_22"); + for (int i = 0; i < 10; i++) + addResultRow(rows, 0, i+"_30", i+"_31", i+"_32"); + resultTable.put(1, rows); + + // Verify union table + Iterator<Tuple> it = pigServer.openIterator("records"); + int numbRows = verifyTable(resultTable, 1, 0, it); + + Assert.assertEquals(numbRows, 40); + } + + /** + * Test source table index from a single table + * @throws ExecException + * @throws IOException + */ + @Test + public void testSingleReader() throws ExecException, IOException { + String str1 = pathTable1.toString(); + + String query = "records = LOAD '" + str1 + + "' USING org.apache.hadoop.zebra.pig.TableLoader('source_table, a, b, c, d');"; + System.out.println(query); + pigServer.registerQuery(query); + + // Verify union table + HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable + = new HashMap<Integer, ArrayList<ArrayList<Object>>>(); + + ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>(); + for (int i = 0; i < 10; i++) + addResultRow(rows, 0, i+"_00", i+"_01", i+"_02"); + for (int i = 0; i < 10; i++) + addResultRow(rows, 0, i+"_10", i+"_11", i+"_12"); + resultTable.put(0, rows); + + // Verify union table + Iterator<Tuple> it = pigServer.openIterator("records"); + int numbRows = verifyTable(resultTable, 1, 0, it); + + Assert.assertEquals(numbRows, 20); + } + + /** + *Add a row to expected results table + * + */ + private void addResultRow(ArrayList<ArrayList<Object>> resultTable, Object ... values) { + ArrayList<Object> resultRow = new ArrayList<Object>(); + + for (int i = 0; i < values.length; i++) { + resultRow.add(values[i]); + } + resultTable.add(resultRow); + } +} Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java?rev=932158&r1=932157&r2=932158&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java Thu Apr 8 22:46:17 2010 @@ -19,17 +19,13 @@ package org.apache.hadoop.zebra.pig; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.util.Iterator; import java.util.StringTokenizer; import junit.framework.Assert; -import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.zebra.io.BasicTable; @@ -37,15 +33,9 @@ import org.apache.hadoop.zebra.io.TableI import org.apache.hadoop.zebra.pig.TableStorer; import org.apache.hadoop.zebra.schema.Schema; import org.apache.hadoop.zebra.types.TypesUtils; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.data.Tuple; -import org.apache.pig.test.MiniCluster; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.apache.hadoop.zebra.BaseTestCase; @@ -208,6 +198,32 @@ public class TestSortedTableUnion extend Assert.assertEquals(index+"_02", RowValue3.get(2)); } Assert.assertEquals(20, row); + + /* + * Test source table index on a single sorted table + */ + String query6 = "records4 = LOAD '" + + newPath.toString() + "1" + + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b, SF_c," + + "source_table, SF_d, SF_e, SF_f, SF_g', 'sorted');"; + pigServer.registerQuery(query6); + Iterator<Tuple> it4 = pigServer.openIterator("records4"); + row = 0; + while (it4.hasNext()) { + RowValue3 = it4.next(); + Assert.assertEquals(8, RowValue3.size()); + row++; + index = row-1; + Assert.assertEquals(0, RowValue3.get(3)); + Assert.assertEquals(index+"_01", RowValue3.get(1)); + Assert.assertEquals(index+"_00", RowValue3.get(0)); + Assert.assertEquals(index+"_06", RowValue3.get(7)); + Assert.assertEquals(index+"_05", RowValue3.get(6)); + Assert.assertEquals(index+"_04", RowValue3.get(5)); + Assert.assertEquals(index+"_03", RowValue3.get(4)); + Assert.assertEquals(index+"_02", RowValue3.get(2)); + } + Assert.assertEquals(10, row); } }