Author: yanz Date: Wed Feb 17 23:37:10 2010 New Revision: 911227 URL: http://svn.apache.org/viewvc?rev=911227&view=rev Log: PIG-1206 Storing descendingly sorted PIG table as unsorted table (yanz)
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt?rev=911227&r1=911226&r2=911227&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt Wed Feb 17 23:37:10 2010 @@ -12,6 +12,8 @@ IMPROVEMENTS + PIG-1206 Storing descendingly sorted PIG table as unsorted table (yanz) + PIG-1240 zebra manifest file enhancement (gauravj via yanz) PIG-1140 Support of Hadoop 2.0 API (xuefuz via yanz) Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=911227&r1=911226&r2=911227&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original) +++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Wed Feb 17 23:37:10 2010 @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.conf.Configuration; @@ -79,10 +81,24 @@ public void checkSchema(ResourceSchema schema) throws IOException { // Get schemaStr and sortColumnNames from the given schema. In the process, we // also validate the schema and sorting info. - ResourceSchema.ResourceFieldSchema[] fields = schema.getFields(); - int[] index = schema.getSortKeys(); + ResourceSchema.Order[] orders = schema.getSortKeyOrders(); + boolean descending = false; + for (ResourceSchema.Order order : orders) + { + if (order == ResourceSchema.Order.DESCENDING) + { + Log LOG = LogFactory.getLog(TableStorer.class); + LOG.warn("Sorting in descending order is not supported by Zebra and the table will be unsorted."); + descending = true; + break; + } + } StringBuilder sortColumnNames = new StringBuilder(); - for( int i = 0; i< index.length; i++ ) { + if (!descending) { + ResourceSchema.ResourceFieldSchema[] fields = schema.getFields(); + int[] index = schema.getSortKeys(); + + for( int i = 0; i< index.length; i++ ) { ResourceFieldSchema field = fields[index[i]]; String name = field.getName(); if( name == null ) @@ -92,6 +108,7 @@ if( i > 0 ) sortColumnNames.append( "," ); sortColumnNames.append( name ); + } } // Convert resource schema to zebra schema Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java?rev=911227&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java (added) +++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableSortStorerDesc.java Wed Feb 17 23:37:10 2010 @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.pig; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Iterator; +import java.util.StringTokenizer; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.zebra.io.BasicTable; +import org.apache.hadoop.zebra.io.TableInserter; +import org.apache.hadoop.zebra.pig.TableStorer; +import org.apache.hadoop.zebra.schema.Schema; +import org.apache.hadoop.zebra.types.TypesUtils; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.MiniCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Note: + * + * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the + * app/debug configuration, when run this from inside the Eclipse. + * + */ +public class TestTableSortStorerDesc { + protected static ExecType execType = ExecType.MAPREDUCE; + private static MiniCluster cluster; + protected static PigServer pigServer; + private static Path pathTable; + + @BeforeClass + public static void setUp() throws Exception { + if (System.getProperty("hadoop.log.dir") == null) { + String base = new File(".").getPath(); // getAbsolutePath(); + System + .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs"); + } + + if (execType == ExecType.MAPREDUCE) { + cluster = MiniCluster.buildCluster(); + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } else { + pigServer = new PigServer(ExecType.LOCAL); + } + + Configuration conf = new Configuration(); + FileSystem fs = cluster.getFileSystem(); + Path pathWorking = fs.getWorkingDirectory(); + pathTable = new Path(pathWorking, "TestTableSortStorerDesc"); + System.out.println("pathTable =" + pathTable); + BasicTable.Writer writer = new BasicTable.Writer(pathTable, + "SF_a:string,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g", + "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf); + Schema schema = writer.getSchema(); + Tuple tuple = TypesUtils.createTuple(schema); + + final int numsBatch = 10; + final int numsInserters = 1; + TableInserter[] inserters = new TableInserter[numsInserters]; + for (int i = 0; i < numsInserters; i++) { + inserters[i] = writer.getInserter("ins" + i, false); + } + + for (int b = 0; b < numsBatch; b++) { + for (int i = 0; i < numsInserters; i++) { + TypesUtils.resetTuple(tuple); + for (int k = 0; k < tuple.size(); ++k) { + try { + tuple.set(k, (9-b) + "_" + i + "" + k); + } catch (ExecException e) { + e.printStackTrace(); + } + } + inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple); + } + } + for (int i = 0; i < numsInserters; i++) { + inserters[i].close(); + } + writer.close(); + } + + @AfterClass + public static void tearDown() throws Exception { + pigServer.shutdown(); + } + + /** + * Return the name of the routine that called getCurrentMethodName + * + */ + public String getCurrentMethodName() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter pw = new PrintWriter(baos); + (new Throwable()).printStackTrace(pw); + pw.flush(); + String stackTrace = baos.toString(); + pw.close(); + + StringTokenizer tok = new StringTokenizer(stackTrace, "\n"); + tok.nextToken(); // 'java.lang.Throwable' + tok.nextToken(); // 'at ...getCurrentMethodName' + String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>' + // Parse line 3 + tok = new StringTokenizer(l.trim(), " <("); + String t = tok.nextToken(); // 'at' + t = tok.nextToken(); // '...<caller to getCurrentRoutine>' + return t; + } + + @Test + public void testStorer() throws ExecException, IOException { + /* + * Use pig LOAD to load testing data for store + */ + String query = "records = LOAD '" + pathTable.toString() + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + pigServer.registerQuery(query); + + Iterator<Tuple> it2 = pigServer.openIterator("records"); + int row0 = 0; + Tuple RowValue2 = null; + while (it2.hasNext()) { + // Last row value + RowValue2 = it2.next(); + row0++; + if (row0 == 10) { + Assert.assertEquals("0_01", RowValue2.get(1)); + Assert.assertEquals("0_00", RowValue2.get(0).toString()); + } + } + Assert.assertEquals(10, row0); + + String orderby = "srecs = ORDER records BY SF_a DESC;"; + pigServer.registerQuery(orderby); + + /* + * Use pig STORE to store testing data BasicTable.Writer writer = new + * BasicTable.Writer(pathTable, "SF_a,SF_b,SF_c,SF_d,SF_e,SF_f,SF_g", + * "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", false, conf); + */ + Path newPath = new Path(getCurrentMethodName()); + + pigServer + .store( + "srecs", + newPath.toString(), + TableStorer.class.getCanonicalName() + + "('[SF_a, SF_b, SF_c]; [SF_e]')"); + + // check new table content + String query3 = "newRecords = LOAD '" + + newPath.toString() + + "' USING org.apache.hadoop.zebra.pig.TableLoader('SF_a, SF_b');"; + pigServer.registerQuery(query3); + + Iterator<Tuple> it3 = pigServer.openIterator("newRecords"); + int row = 0; + Tuple RowValue3 = null; + while (it3.hasNext()) { + // Last row value + RowValue3 = it3.next(); + row++; + } + Assert.assertEquals(10, row); + } +}