Author: yanz Date: Tue Apr 20 15:51:38 2010 New Revision: 935968 URL: http://svn.apache.org/viewvc?rev=935968&view=rev Log: PIG-1375 Support of multiple Zebra table writing through Pig (chaow via yanz)
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=935968&r1=935967&r2=935968&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Tue Apr 20 15:51:38 2010 @@ -18,6 +18,8 @@ Trunk (unreleased changes) IMPROVEMENTS + PIG-1375 Support of multiple Zebra table writing through Pig (chaow via yanz) + PIG-1351 Addition of type check when writing to basic table (chaow via yanz) PIG-1361 Zebra TableLoader.getSchema() should return the projectionSchema specified in the constructor of TableLoader instead of pruned proejction by pig (gauravj via daijy) Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java?rev=935968&r1=935967&r2=935968&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java Tue Apr 20 15:51:38 2010 @@ -36,6 +36,7 @@ import org.apache.hadoop.zebra.io.TableI import org.apache.hadoop.zebra.parser.ParseException; import org.apache.hadoop.zebra.types.Partition; import org.apache.hadoop.zebra.types.SortInfo; +import org.apache.hadoop.zebra.types.TypesUtils; import org.apache.hadoop.zebra.schema.Schema; import org.apache.hadoop.zebra.tfile.TFile; import org.apache.pig.data.Tuple; @@ -153,7 +154,7 @@ public class BasicTableOutputFormat exte private static final String OUTPUT_COMPARATOR = "mapreduce.lib.table.output.comparator"; static final String IS_MULTI = "multi"; public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS = "zebra.output.partitioner.class"; - + public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS = "zebra.output.partitioner.class.arguments"; /** * Set the multiple output paths of the BasicTable in JobContext @@ -164,9 +165,11 @@ public class BasicTableOutputFormat exte * The comma separated output paths to the tables. * The path must either not existent, or must be an empty directory. * @param theClass - * Zebra output partitoner class + * Zebra output partitioner class + * + * @deprecated Use {...@link #setMultipleOutputs(JobContext, class<? extends ZebraOutputPartition>, Path ...)} instead. + * */ - public static void setMultipleOutputs(JobContext jobContext, String commaSeparatedLocations, Class<? extends ZebraOutputPartition> theClass) throws IOException { Configuration conf = jobContext.getConfiguration(); @@ -179,18 +182,17 @@ public class BasicTableOutputFormat exte setZebraOutputPartitionClass(jobContext, theClass); } - /** + /** * Set the multiple output paths of the BasicTable in JobContext * * @param jobContext * The JobContext object. + * @param theClass + * Zebra output partitioner class * @param paths * The list of paths * The path must either not existent, or must be an empty directory. - * @param theClass - * Zebra output partitioner class */ - public static void setMultipleOutputs(JobContext jobContext, Class<? extends ZebraOutputPartition> theClass, Path... paths) throws IOException { Configuration conf = jobContext.getConfiguration(); @@ -209,9 +211,28 @@ public class BasicTableOutputFormat exte } conf.setBoolean(IS_MULTI, true); setZebraOutputPartitionClass(jobContext, theClass); - } - + + /** + * Set the multiple output paths of the BasicTable in JobContext + * + * @param jobContext + * The JobContext object. + * @param theClass + * Zebra output partitioner class + * @param arguments + * Arguments string to partitioner class + * @param paths + * The list of paths + * The path must either not existent, or must be an empty directory. + */ + public static void setMultipleOutputs(JobContext jobContext, Class<? extends ZebraOutputPartition> theClass, String arguments, Path... paths) + throws IOException { + setMultipleOutputs(jobContext, theClass, paths); + if (arguments != null) { + jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS, arguments); + } + } /** * Set the multiple output paths of the BasicTable in JobContext @@ -256,7 +277,6 @@ public class BasicTableOutputFormat exte } - private static void setZebraOutputPartitionClass( JobContext jobContext, Class<? extends ZebraOutputPartition> theClass) throws IOException { if (!ZebraOutputPartition.class.isAssignableFrom(theClass)) @@ -264,7 +284,6 @@ public class BasicTableOutputFormat exte jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS, theClass.getName()); } - public static Class<? extends ZebraOutputPartition> getZebraOutputPartitionClass(JobContext jobContext) throws IOException { Configuration conf = jobContext.getConfiguration(); @@ -287,8 +306,6 @@ public class BasicTableOutputFormat exte } - - /** * Set the output path of the BasicTable in JobContext * @@ -392,7 +409,6 @@ public class BasicTableOutputFormat exte } - /** * Generates a BytesWritable key for the input key * using keygenerate provided. Sort Key(s) are used to generate this object @@ -409,9 +425,6 @@ public class BasicTableOutputFormat exte return kg.generateKey(t); } - - - /** * Set the table storage hint in JobContext, should be called after setSchema is * called. @@ -737,7 +750,13 @@ class TableOutputCommitter extends Outpu class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> { private final TableInserter inserter[]; private org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition op = null; - + + // for Pig's call path; + final private BytesWritable KEY0 = new BytesWritable(new byte[0]); + private int[] sortColIndices = null; + private KeyGenerator builder = null; + private Tuple t = null; + public TableRecordWriter(String path, TaskAttemptContext context) throws IOException { Configuration conf = context.getConfiguration(); if(conf.getBoolean(BasicTableOutputFormat.IS_MULTI, false) == true) { @@ -753,8 +772,36 @@ class TableRecordWriter extends RecordWr BasicTable.Writer writer = new BasicTable.Writer(paths[i], conf); this.inserter[i] = writer.getInserter( inserterName, true, checkType); + + // Set up SortInfo related stuff only once; + if (i == 0) { + if (writer.getSortInfo() != null) + { + sortColIndices = writer.getSortInfo().getSortIndices(); + SortInfo sortInfo = writer.getSortInfo(); + String[] sortColNames = sortInfo.getSortColumnNames(); + org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema(); + + byte[] types = new byte[sortColNames.length]; + + for(int j =0 ; j < sortColNames.length; ++j){ + types[j] = schema.getColumn(sortColNames[j]).getType().pigDataType(); + } + t = TypesUtils.createTuple(sortColNames.length); + builder = makeKeyBuilder(types); + } + } } } + + private KeyGenerator makeKeyBuilder(byte[] elems) { + ComparatorExpr[] exprs = new ComparatorExpr[elems.length]; + for (int i = 0; i < elems.length; ++i) { + exprs[i] = ExprUtils.primitiveComparator(i, elems[i]); + } + return new KeyGenerator(ExprUtils.tupleComparator(exprs)); + } + @Override public void close(TaskAttemptContext context) throws IOException { @@ -765,6 +812,17 @@ class TableRecordWriter extends RecordWr @Override public void write(BytesWritable key, Tuple value) throws IOException { + if (key == null) { + if (sortColIndices != null) { // If this is a sorted table and key is null (Pig's call path); + for (int i =0; i < sortColIndices.length;++i) { + t.set(i, value.get(sortColIndices[i])); + } + key = builder.generateKey(t); + } else { // for unsorted table; + key = KEY0; + } + } + if(op != null ) { int idx = op.getOutputPartition(key, value); if(idx < 0 || (idx >= inserter.length)) { @@ -775,6 +833,5 @@ class TableRecordWriter extends RecordWr inserter[0].insert(key, value); } } - } Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=935968&r1=935967&r2=935968&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java Tue Apr 20 15:51:38 2010 @@ -26,20 +26,14 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.zebra.pig.comparator.ComparatorExpr; -import org.apache.hadoop.zebra.pig.comparator.ExprUtils; -import org.apache.hadoop.zebra.pig.comparator.KeyGenerator; -import org.apache.hadoop.zebra.io.BasicTable; -import org.apache.hadoop.zebra.io.TableInserter; +import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat; +import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition; +import org.apache.hadoop.zebra.mapreduce.ZebraSchema; +import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo; +import org.apache.hadoop.zebra.mapreduce.ZebraStorageHint; import org.apache.hadoop.zebra.parser.ParseException; -import org.apache.hadoop.zebra.types.SortInfo; -import org.apache.hadoop.zebra.types.TypesUtils; import org.apache.pig.LoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; @@ -57,282 +51,180 @@ public class TableStorer extends StoreFu private static final String UDFCONTEXT_SORT_INFO = "zebra.UDFContext.sortInfo"; private static final String UDFCONTEXT_OUTPUT_CHECKTYPE = "zebra.UDFContext.checkType"; - static final String OUTPUT_STORAGEHINT = "mapreduce.lib.table.output.storagehint"; - static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema"; - static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir"; - static final String SORT_INFO = "mapreduce.lib.table.sort.info"; static final String OUTPUT_CHECKTYPE = "mapreduce.lib.table.output.checktype"; private String storageHintString = null; private String udfContextSignature = null; - private TableRecordWriter tableRecordWriter = null; + private RecordWriter<BytesWritable, Tuple> tableRecordWriter = null; + private String partitionClassString = null; + Class<? extends ZebraOutputPartition> partitionClass = null; + private String partitionClassArgumentsString = null; public TableStorer() { } - public TableStorer(String storageHintStr) throws ParseException, IOException { - storageHintString = storageHintStr; + public TableStorer(String storageHintString) { + this.storageHintString = storageHintString; } + public TableStorer(String storageHintString, String partitionClassString) { + this.storageHintString = storageHintString; + this.partitionClassString = partitionClassString; + } + + public TableStorer(String storageHintString, String partitionClassString, String partitionClassArgumentsString) { + this.storageHintString = storageHintString; + this.partitionClassString = partitionClassString; + this.partitionClassArgumentsString = partitionClassArgumentsString; + } + + @Override public void putNext(Tuple tuple) throws IOException { + try { tableRecordWriter.write( null, tuple ); + } catch (InterruptedException e) { + throw new IOException(e.getMessage()); + } } @Override public void checkSchema(ResourceSchema schema) throws IOException { - // Get schemaStr and sortColumnNames from the given schema. In the process, we - // also validate the schema and sorting info. - ResourceSchema.Order[] orders = schema.getSortKeyOrders(); - boolean descending = false; - for (ResourceSchema.Order order : orders) + // Get schemaStr and sortColumnNames from the given schema. In the process, we + // also validate the schema and sorting info. + ResourceSchema.Order[] orders = schema.getSortKeyOrders(); + boolean descending = false; + for (ResourceSchema.Order order : orders) + { + if (order == ResourceSchema.Order.DESCENDING) { - if (order == ResourceSchema.Order.DESCENDING) - { - Log LOG = LogFactory.getLog(TableStorer.class); - LOG.warn("Sorting in descending order is not supported by Zebra and the table will be unsorted."); - descending = true; - break; - } - } - StringBuilder sortColumnNames = new StringBuilder(); - if (!descending) { - ResourceSchema.ResourceFieldSchema[] fields = schema.getFields(); - int[] index = schema.getSortKeys(); + Log LOG = LogFactory.getLog(TableStorer.class); + LOG.warn("Sorting in descending order is not supported by Zebra and the table will be unsorted."); + descending = true; + break; + } + } + StringBuilder sortColumnNames = new StringBuilder(); + if (!descending) { + ResourceSchema.ResourceFieldSchema[] fields = schema.getFields(); + int[] index = schema.getSortKeys(); + + for( int i = 0; i< index.length; i++ ) { + ResourceFieldSchema field = fields[index[i]]; + String name = field.getName(); + if( name == null ) + throw new IOException("Zebra does not support column positional reference yet"); + if( !org.apache.pig.data.DataType.isAtomic( field.getType() ) ) + throw new IOException( "Field [" + name + "] is not of simple type as required for a sort column now." ); + if( i > 0 ) + sortColumnNames.append( "," ); + sortColumnNames.append( name ); + } + } + + // Convert resource schema to zebra schema + org.apache.hadoop.zebra.schema.Schema zebraSchema; + try { + zebraSchema = SchemaConverter.convertFromResourceSchema( schema ); + } catch (ParseException ex) { + throw new IOException("Exception thrown from SchemaConverter: " + ex.getMessage() ); + } + + Properties properties = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[]{ udfContextSignature } ); + properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA, zebraSchema.toString() ); + properties.setProperty( UDFCONTEXT_SORT_INFO, sortColumnNames.toString() ); - for( int i = 0; i< index.length; i++ ) { - ResourceFieldSchema field = fields[index[i]]; - String name = field.getName(); - if( name == null ) - throw new IOException("Zebra does not support column positional reference yet"); - if( !org.apache.pig.data.DataType.isAtomic( field.getType() ) ) - throw new IOException( "Field [" + name + "] is not of simple type as required for a sort column now." ); - if( i > 0 ) - sortColumnNames.append( "," ); - sortColumnNames.append( name ); - } - } - - // Convert resource schema to zebra schema - org.apache.hadoop.zebra.schema.Schema zebraSchema; - try { - zebraSchema = SchemaConverter.convertFromResourceSchema( schema ); - } catch (ParseException ex) { - throw new IOException("Exception thrown from SchemaConverter: " + ex.getMessage() ); - } - - Properties properties = UDFContext.getUDFContext().getUDFProperties( - this.getClass(), new String[]{ udfContextSignature } ); - properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA, zebraSchema.toString() ); - properties.setProperty( UDFCONTEXT_SORT_INFO, sortColumnNames.toString() ); - - // This is to turn off type check for potential corner cases - for internal use only; - if (System.getenv("zebra_output_checktype") != null && System.getenv("zebra_output_checktype").equals("no")) { - properties.setProperty( UDFCONTEXT_OUTPUT_CHECKTYPE, "no"); - } + // This is to turn off type check for potential corner cases - for internal use only; + if (System.getenv("zebra_output_checktype") != null && System.getenv("zebra_output_checktype").equals("no")) { + properties.setProperty( UDFCONTEXT_OUTPUT_CHECKTYPE, "no"); + } } @SuppressWarnings("unchecked") @Override public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat() throws IOException { - return new TableOutputFormat(); + return new BasicTableOutputFormat(); } @SuppressWarnings("unchecked") @Override public void prepareToWrite(RecordWriter writer) throws IOException { - tableRecordWriter = (TableRecordWriter)writer; - if( tableRecordWriter == null ) { - throw new IOException( "Invalid type of writer. Expected type: TableRecordWriter." ); - } + tableRecordWriter = writer; + if( tableRecordWriter == null ) { + throw new IOException( "Invalid type of writer. Expected type: TableRecordWriter." ); + } } @Override public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { - return LoadFunc.getAbsolutePath( location, curDir ); + return LoadFunc.getAbsolutePath( location, curDir ); } @Override public void setStoreLocation(String location, Job job) throws IOException { - Configuration conf = job.getConfiguration(); - conf.set( OUTPUT_STORAGEHINT, storageHintString ); - conf.set( OUTPUT_PATH, location ); - - // Get schema string and sorting info from UDFContext and re-store them to - // job config. - Properties properties = UDFContext.getUDFContext().getUDFProperties( - this.getClass(), new String[]{ udfContextSignature } ); - conf.set( OUTPUT_SCHEMA, properties.getProperty( UDFCONTEXT_OUTPUT_SCHEMA ) ); - conf.set( SORT_INFO, properties.getProperty( UDFCONTEXT_SORT_INFO ) ); - - // Get checktype information from UDFContext and re-store it to job config; - if (properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE) != null && properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE).equals("no")) { - conf.setBoolean(OUTPUT_CHECKTYPE, false); + Configuration conf = job.getConfiguration(); + + String[] outputs = location.split(","); + + if (outputs.length == 1) { + BasicTableOutputFormat.setOutputPath(job, new Path(location)); + } else if (outputs.length > 1) { + if (partitionClass == null) { + try { + partitionClass = (Class<? extends ZebraOutputPartition>) conf.getClassByName(partitionClassString); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } } + + Path[] paths = new Path[outputs.length]; + for (int i=0; i<paths.length; i++) { + paths[i] = new Path(outputs[i]); + } + + BasicTableOutputFormat.setMultipleOutputs(job, partitionClass, partitionClassArgumentsString, paths); + } else { + throw new IOException( "Invalid location : " + location); + } + + // Get schema string and sorting info from UDFContext and re-store them to + // job config. + Properties properties = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[]{ udfContextSignature } ); + ZebraSchema zSchema = ZebraSchema.createZebraSchema(properties.getProperty(UDFCONTEXT_OUTPUT_SCHEMA)); + ZebraSortInfo zSortInfo = ZebraSortInfo.createZebraSortInfo(properties.getProperty(UDFCONTEXT_SORT_INFO), null); + ZebraStorageHint zStorageHint = ZebraStorageHint.createZebraStorageHint(storageHintString); + try { + BasicTableOutputFormat.setStorageInfo(job, zSchema, zStorageHint, zSortInfo); + } catch (ParseException e) { + throw new IOException("Invalid storage info: " + e.getMessage()); + } + + // Get checktype information from UDFContext and re-store it to job config; + if (properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE) != null && properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE).equals("no")) { + conf.setBoolean(OUTPUT_CHECKTYPE, false); + } } @Override public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException { - // no-op. We do close at cleanupJob(). - BasicTable.Writer write = new BasicTable.Writer( new Path( location ), - job.getConfiguration()); - write.close(); + //TODO: This is temporary - we will do close at cleanupJob() when OutputCommitter is ready. + BasicTableOutputFormat.close(job); } @Override public void setStoreFuncUDFContextSignature(String signature) { - udfContextSignature = signature; + udfContextSignature = signature; } @Override public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException { - // no-op - } - -} - -/** - * - * Table OutputFormat - * - */ -class TableOutputFormat extends OutputFormat<BytesWritable, Tuple> { - @Override - public void checkOutputSpecs(JobContext job) throws IOException, InterruptedException { - Configuration conf = job.getConfiguration(); - String location = conf.get( TableStorer.OUTPUT_PATH ); - String schemaStr = conf.get( TableStorer.OUTPUT_SCHEMA ); - String storageHint = conf.get( TableStorer.OUTPUT_STORAGEHINT ); - String sortColumnNames = conf.get( TableStorer.SORT_INFO ); - - BasicTable.Writer writer = new BasicTable.Writer( new Path( location ), - schemaStr, storageHint, sortColumnNames, null, conf ); - writer.finish(); - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext taContext) - throws IOException, InterruptedException { - return new TableOutputCommitter() ; - } - - @Override - public org.apache.hadoop.mapreduce.RecordWriter<BytesWritable, Tuple> getRecordWriter( - TaskAttemptContext taContext) throws IOException, InterruptedException { - return new TableRecordWriter( taContext ); - } - -} - -// TODO: make corresponding changes for commit and cleanup. Currently, no-ops. -class TableOutputCommitter extends OutputCommitter { - @Override - public void abortTask(TaskAttemptContext taContext) throws IOException { - // TODO Auto-generated method stub - } - - @Override - public void cleanupJob(JobContext jobContext) throws IOException { -// Configuration conf = jobContext.getConfiguration(); -// String location = conf.get( TableStorer.OUTPUT_PATH ); -// BasicTable.Writer write = new BasicTable.Writer( new Path( location ), conf ); -// write.close(); + // no-op } - - @Override - public void commitTask(TaskAttemptContext taContext) throws IOException { - int i = 0; - i++; - // TODO Auto-generated method stub - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext taContext) throws IOException { - return false; - } - - @Override - public void setupJob(JobContext jobContext) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void setupTask(TaskAttemptContext taContext) throws IOException { - // TODO Auto-generated method stub - } - -} - -/** - * - * Table RecordWriter - * - */ -class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> { - final private BytesWritable KEY0 = new BytesWritable(new byte[0]); - private BasicTable.Writer writer; - private TableInserter inserter; - private int[] sortColIndices = null; - KeyGenerator builder; - Tuple t; - - public TableRecordWriter(TaskAttemptContext taContext) throws IOException { - Configuration conf = taContext.getConfiguration(); - - String path = conf.get(TableStorer.OUTPUT_PATH); - writer = new BasicTable.Writer( new Path( path ), conf ); - - if (writer.getSortInfo() != null) - { - sortColIndices = writer.getSortInfo().getSortIndices(); - SortInfo sortInfo = writer.getSortInfo(); - String[] sortColNames = sortInfo.getSortColumnNames(); - org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema(); - - byte[] types = new byte[sortColNames.length]; - for(int i =0 ; i < sortColNames.length; ++i){ - types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType(); - } - t = TypesUtils.createTuple(sortColNames.length); - builder = makeKeyBuilder(types); - } - - boolean checkType = conf.getBoolean(TableStorer.OUTPUT_CHECKTYPE, true); - inserter = writer.getInserter("patition-" + taContext.getTaskAttemptID().getTaskID().getId(), false, checkType); - } - - @Override - public void close(TaskAttemptContext taContext) throws IOException { - inserter.close(); - writer.finish(); - } - - private KeyGenerator makeKeyBuilder(byte[] elems) { - ComparatorExpr[] exprs = new ComparatorExpr[elems.length]; - for (int i = 0; i < elems.length; ++i) { - exprs[i] = ExprUtils.primitiveComparator(i, elems[i]); - } - return new KeyGenerator(ExprUtils.tupleComparator(exprs)); - } - - @Override - public void write(BytesWritable key, Tuple value) throws IOException { - System.out.println( "Tuple: " + value.toDelimitedString(",") ); - if (sortColIndices != null) { - for(int i =0; i < sortColIndices.length;++i) { - t.set(i, value.get(sortColIndices[i])); - } - key = builder.generateKey(t); - } else if (key == null) { - key = KEY0; - } - inserter.insert(key, value); - } - } Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java?rev=935968&r1=935967&r2=935968&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java Tue Apr 20 15:51:38 2010 @@ -55,7 +55,7 @@ public class TestMixedType1 extends Base System.out.println("ONCE SETUP !! ---------"); init(); - path = getTableFullPath(""); + path = getTableFullPath("TestMixedType1"); removeDir(path); BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA, Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java?rev=935968&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java (added) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java Tue Apr 20 15:51:38 2010 @@ -0,0 +1,494 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.pig; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.zebra.BaseTestCase; +import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition; +import org.apache.hadoop.zebra.parser.ParseException; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.data.Tuple; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.util.Iterator; +import junit.framework.Assert; + +/** + * Assume the input files contain rows of word and count, separated by a space: + * + * <pre> + * us 2 + * japan 2 + * india 4 + * us 2 + * japan 1 + * india 3 + * nouse 5 + * nowhere 4 + * + */ +public class TestMultipleOutputs1 extends BaseTestCase implements Tool { + static String inputPath; + static String inputFileName = "multi-input.txt"; + public static String sortKey = null; + + @Before + public void setUp() throws Exception { + init(); + + inputPath = getTableFullPath(inputFileName).toString(); + + writeToFile(inputPath); + } + + @After + public void tearDown() throws Exception { + if (mode == TestMode.local) { + pigServer.shutdown(); + } + } + + public static void writeToFile (String inputFile) throws IOException{ + if (mode == TestMode.local) { + FileWriter fstream = new FileWriter(inputFile); + BufferedWriter out = new BufferedWriter(fstream); + out.write("us\t2\n"); + out.write("japan\t2\n"); + out.write("india\t4\n"); + out.write("us\t2\n"); + out.write("japan\t1\n"); + out.write("india\t3\n"); + out.write("nouse\t5\n"); + out.write("nowhere\t4\n"); + out.close(); + } + + if (mode == TestMode.cluster) { + FSDataOutputStream fout = fs.create(new Path (inputFile)); + fout.writeBytes("us\t2\n"); + fout.writeBytes("japan\t2\n"); + fout.writeBytes("india\t4\n"); + fout.writeBytes("us\t2\n"); + fout.writeBytes("japan\t1\n"); + fout.writeBytes("india\t3\n"); + fout.writeBytes("nouse\t5\n"); + fout.writeBytes("nowhere\t4\n"); + fout.close(); + } + } + + // test no sort key; + @Test + public void test1() throws ParseException, IOException, + org.apache.hadoop.zebra.parser.ParseException, Exception { + // Load data; + String query = "records = LOAD '" + inputPath + "' as (word:chararray, count:int);"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + Iterator<Tuple> it = pigServer.openIterator("records"); + while (it.hasNext()) { + Tuple cur = it.next(); + System.out.println(cur); + } + + // Store using multiple outputs; + String outputPaths = "us,india,japan"; + removeDir(getTableFullPath("us")); + removeDir(getTableFullPath("india")); + removeDir(getTableFullPath("japan")); + + query = "store records into '" + outputPaths + "' using org.apache.hadoop.zebra.pig.TableStorer('[word,count]'," + + "'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass');"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + // Validate results; + query = "records = LOAD '" + "us" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + int count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("us", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("us", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 3) { + Assert.assertEquals("nouse", RowValue.get(0)); + Assert.assertEquals(5, RowValue.get(1)); + } else if (count == 4) { + Assert.assertEquals("nowhere", RowValue.get(0)); + Assert.assertEquals(4, RowValue.get(1)); + } + } + Assert.assertEquals(count, 4); + + query = "records = LOAD '" + "india" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("india", RowValue.get(0)); + Assert.assertEquals(4, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("india", RowValue.get(0)); + Assert.assertEquals(3, RowValue.get(1)); + } + } + Assert.assertEquals(count, 2); + + query = "records = LOAD '" + "japan" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("japan", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("japan", RowValue.get(0)); + Assert.assertEquals(1, RowValue.get(1)); + } + } + Assert.assertEquals(count, 2); + } + + //Test sort key on word; + @Test + public void test2() throws ParseException, IOException, + org.apache.hadoop.zebra.parser.ParseException, Exception { + // Load data; + String query = "a = LOAD '" + inputPath + "' as (word:chararray, count:int);"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + query = "records = order a by word;"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + Iterator<Tuple> it = pigServer.openIterator("records"); + while (it.hasNext()) { + Tuple cur = it.next(); + System.out.println(cur); + } + + // Store using multiple outputs; + String outputPaths = "us,india,japan"; + removeDir(getTableFullPath("us")); + removeDir(getTableFullPath("india")); + removeDir(getTableFullPath("japan")); + ExecJob pigJob = pigServer + .store( + "records", + outputPaths, + TableStorer.class.getCanonicalName() + + "('[word,count]', 'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass')"); + + Assert.assertNull(pigJob.getException()); + + // Validate results; + query = "records = LOAD '" + "us" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + int count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("nouse", RowValue.get(0)); + Assert.assertEquals(5, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("nowhere", RowValue.get(0)); + Assert.assertEquals(4, RowValue.get(1)); + } else if (count == 3) { + Assert.assertEquals("us", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 4) { + Assert.assertEquals("us", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } + } + Assert.assertEquals(count, 4); + + query = "records = LOAD '" + "india" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("india", RowValue.get(0)); + Assert.assertEquals(4, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("india", RowValue.get(0)); + Assert.assertEquals(3, RowValue.get(1)); + } + } + Assert.assertEquals(count, 2); + + query = "records = LOAD '" + "japan" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("japan", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("japan", RowValue.get(0)); + Assert.assertEquals(1, RowValue.get(1)); + } + } + Assert.assertEquals(count, 2); + } + + //Test sort key on word and count; + @Test + public void test3() throws ParseException, IOException, + org.apache.hadoop.zebra.parser.ParseException, Exception { + // Load data; + String query = "a = LOAD '" + inputPath + "' as (word:chararray, count:int);"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + query = "records = order a by word, count;"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + Iterator<Tuple> it = pigServer.openIterator("records"); + while (it.hasNext()) { + Tuple cur = it.next(); + System.out.println(cur); + } + + // Store using multiple outputs; + String outputPaths = "us,india,japan"; + removeDir(getTableFullPath("us")); + removeDir(getTableFullPath("india")); + removeDir(getTableFullPath("japan")); + ExecJob pigJob = pigServer + .store( + "records", + outputPaths, + TableStorer.class.getCanonicalName() + + "('[word,count]', 'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass')"); + + Assert.assertNull(pigJob.getException()); + + // Validate results; + query = "records = LOAD '" + "us" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + int count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("nouse", RowValue.get(0)); + Assert.assertEquals(5, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("nowhere", RowValue.get(0)); + Assert.assertEquals(4, RowValue.get(1)); + } else if (count == 3) { + Assert.assertEquals("us", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 4) { + Assert.assertEquals("us", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } + } + Assert.assertEquals(count, 4); + + query = "records = LOAD '" + "india" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("india", RowValue.get(0)); + Assert.assertEquals(3, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("india", RowValue.get(0)); + Assert.assertEquals(4, RowValue.get(1)); + } + } + Assert.assertEquals(count, 2); + + query = "records = LOAD '" + "japan" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("japan", RowValue.get(0)); + Assert.assertEquals(1, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("japan", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } + } + Assert.assertEquals(count, 2); + } + + //Negative test case: invalid partition class; + @Test (expected = IOException.class) + public void testNegative1() throws ParseException, IOException, + org.apache.hadoop.zebra.parser.ParseException, Exception { + // Load data; + String query = "a = LOAD '" + inputPath + "' as (word:chararray, count:int);"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + query = "records = order a by word, count;"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + Iterator<Tuple> it = pigServer.openIterator("records"); + while (it.hasNext()) { + Tuple cur = it.next(); + System.out.println(cur); + } + + // Store using multiple outputs; + String outputPaths = "us,india,japan"; + removeDir(getTableFullPath("us")); + removeDir(getTableFullPath("india")); + removeDir(getTableFullPath("japan")); + pigServer + .store( + "records", + outputPaths, + TableStorer.class.getCanonicalName() + + "('[word,count]', 'org.apache.hadoop.zebra.pig.notexistingclass')"); + } + + public static class OutputPartitionerClass extends ZebraOutputPartition { + + @Override + public int getOutputPartition(BytesWritable key, Tuple value) { + String reg = null; + try { + reg = (String) (value.get(0)); + } catch (Exception e) { + // + } + + if (reg.equals("us")) + return 0; + if (reg.equals("india")) + return 1; + if (reg.equals("japan")) + return 2; + + return 0; + } + } + + @Override + public int run(String[] args) throws Exception { + TestMultipleOutputs1 test = new TestMultipleOutputs1(); + + test.setUp(); + test.test1(); + test.tearDown(); + + test.setUp(); + test.test2(); + test.tearDown(); + + test.setUp(); + test.test3(); + test.tearDown(); + + test.setUp(); + test.testNegative1(); + test.tearDown(); + + return 0; + } + + public static void main(String[] args) throws Exception { + conf = new Configuration(); + + int res = ToolRunner.run(conf, new TestMultipleOutputs1(), args); + System.out.println("PASS"); + System.exit(res); + } +} Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java?rev=935968&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java (added) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java Tue Apr 20 15:51:38 2010 @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.pig; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.zebra.BaseTestCase; +import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat; +import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition; +import org.apache.hadoop.zebra.parser.ParseException; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.data.Tuple; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.util.Iterator; +import junit.framework.Assert; + +/** + * Assume the input files contain rows of word and count, separated by a space: + * + * <pre> + * us 2 + * japan 2 + * india 4 + * us 2 + * japan 1 + * india 3 + * nouse 5 + * nowhere 4 + * + */ +public class TestMultipleOutputs2 extends BaseTestCase implements Tool { + static String inputPath; + static String inputFileName = "multi-input.txt"; + public static String sortKey = null; + + @Before + public void setUp() throws Exception { + init(); + + inputPath = getTableFullPath(inputFileName).toString(); + + writeToFile(inputPath); + } + + @After + public void tearDown() throws Exception { + if (mode == TestMode.local) { + pigServer.shutdown(); + } + } + + public static void writeToFile (String inputFile) throws IOException{ + if (mode == TestMode.local) { + FileWriter fstream = new FileWriter(inputFile); + BufferedWriter out = new BufferedWriter(fstream); + out.write("us\t2\n"); + out.write("japan\t2\n"); + out.write("india\t4\n"); + out.write("us\t2\n"); + out.write("japan\t1\n"); + out.write("india\t3\n"); + out.write("nouse\t5\n"); + out.write("nowhere\t4\n"); + out.close(); + } + + if (mode == TestMode.cluster) { + FSDataOutputStream fout = fs.create(new Path (inputFile)); + fout.writeBytes("us\t2\n"); + fout.writeBytes("japan\t2\n"); + fout.writeBytes("india\t4\n"); + fout.writeBytes("us\t2\n"); + fout.writeBytes("japan\t1\n"); + fout.writeBytes("india\t3\n"); + fout.writeBytes("nouse\t5\n"); + fout.writeBytes("nowhere\t4\n"); + fout.close(); + } + } + + @Test + public void test1() throws ParseException, IOException, + org.apache.hadoop.zebra.parser.ParseException, Exception { + // Load data; + String query = "records = LOAD '" + inputPath + "' as (word:chararray, count:int);"; + System.out.println("query = " + query); + pigServer.registerQuery(query); + + Iterator<Tuple> it = pigServer.openIterator("records"); + while (it.hasNext()) { + Tuple cur = it.next(); + System.out.println(cur); + } + + // Store using multiple outputs; + String outputPaths = "us_0,india_1,japan_2"; + removeDir(getTableFullPath("us_0")); + removeDir(getTableFullPath("india_1")); + removeDir(getTableFullPath("japan_2")); + ExecJob pigJob = pigServer + .store( + "records", + outputPaths, + TableStorer.class.getCanonicalName() + + "('[word,count]', 'org.apache.hadoop.zebra.pig.TestMultipleOutputs2$OutputPartitionerClass', 'us,india,japan')"); + + Assert.assertNull(pigJob.getException()); + + // Validate results; + query = "records = LOAD '" + "us_0" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + int count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("us", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("us", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 3) { + Assert.assertEquals("nouse", RowValue.get(0)); + Assert.assertEquals(5, RowValue.get(1)); + } else if (count == 4) { + Assert.assertEquals("nowhere", RowValue.get(0)); + Assert.assertEquals(4, RowValue.get(1)); + } + } + Assert.assertEquals(count, 4); + + query = "records = LOAD '" + "india_1" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("india", RowValue.get(0)); + Assert.assertEquals(4, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("india", RowValue.get(0)); + Assert.assertEquals(3, RowValue.get(1)); + } + } + Assert.assertEquals(count, 2); + + query = "records = LOAD '" + "japan_2" + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + + count = 0; + System.out.println(query); + pigServer.registerQuery(query); + it = pigServer.openIterator("records"); + while (it.hasNext()) { + count ++; + Tuple RowValue = it.next(); + System.out.println(RowValue); + if (count == 1) { + Assert.assertEquals("japan", RowValue.get(0)); + Assert.assertEquals(2, RowValue.get(1)); + } else if (count == 2) { + Assert.assertEquals("japan", RowValue.get(0)); + Assert.assertEquals(1, RowValue.get(1)); + } + } + Assert.assertEquals(count, 2); + } + + public static class OutputPartitionerClass extends ZebraOutputPartition { + + @Override + public int getOutputPartition(BytesWritable key, Tuple value) { + String reg = null; + + try { + reg = (String) (value.get(0)); + } catch (Exception e) { + // + } + + String argumentsString = conf.get(BasicTableOutputFormat.ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS); + + String[] arguments = argumentsString.split(","); + + if (reg.equals(arguments[0])) + return 0; + if (reg.equals(arguments[1])) + return 1; + if (reg.equals(arguments[2])) + return 2; + + return 0; + } + } + + @Override + public int run(String[] args) throws Exception { + TestMultipleOutputs2 test = new TestMultipleOutputs2(); + + test.setUp(); + test.test1(); + test.tearDown(); + + return 0; + } + + public static void main(String[] args) throws Exception { + conf = new Configuration(); + + int res = ToolRunner.run(conf, new TestMultipleOutputs2(), args); + System.out.println("PASS"); + System.exit(res); + } +}