Author: yanz
Date: Tue Apr 20 15:51:38 2010
New Revision: 935968

URL: http://svn.apache.org/viewvc?rev=935968&view=rev
Log:
PIG-1375 Support of multiple Zebra table writing through Pig (chaow via yanz)

Added:
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=935968&r1=935967&r2=935968&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Tue Apr 20 15:51:38 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    PIG-1375 Support of multiple Zebra table writing through Pig (chaow via 
yanz)
+
     PIG-1351 Addition of type check when writing to basic table (chaow via 
yanz)
 
     PIG-1361 Zebra TableLoader.getSchema() should return the projectionSchema 
specified in the constructor of TableLoader instead of pruned proejction by pig 
(gauravj via daijy)

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java?rev=935968&r1=935967&r2=935968&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
 Tue Apr 20 15:51:38 2010
@@ -36,6 +36,7 @@ import org.apache.hadoop.zebra.io.TableI
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
 import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.tfile.TFile;
 import org.apache.pig.data.Tuple;
@@ -153,7 +154,7 @@ public class BasicTableOutputFormat exte
        private static final String OUTPUT_COMPARATOR =  
"mapreduce.lib.table.output.comparator";
        static final String IS_MULTI = "multi";
        public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS = 
"zebra.output.partitioner.class";
-
+       public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS = 
"zebra.output.partitioner.class.arguments";
 
        /**
         * Set the multiple output paths of the BasicTable in JobContext
@@ -164,9 +165,11 @@ public class BasicTableOutputFormat exte
         *          The comma separated output paths to the tables. 
         *          The path must either not existent, or must be an empty 
directory.
         * @param theClass
-        *            Zebra output partitoner class
+        *            Zebra output partitioner class
+        * 
+        * @deprecated Use {...@link #setMultipleOutputs(JobContext, class<? 
extends ZebraOutputPartition>, Path ...)} instead.
+        * 
         */
-
        public static void setMultipleOutputs(JobContext jobContext, String 
commaSeparatedLocations, Class<? extends ZebraOutputPartition> theClass)
        throws IOException {
                Configuration conf = jobContext.getConfiguration();
@@ -179,18 +182,17 @@ public class BasicTableOutputFormat exte
                setZebraOutputPartitionClass(jobContext, theClass);       
        }
 
-       /**
+  /**
         * Set the multiple output paths of the BasicTable in JobContext
         * 
         * @param jobContext
         *          The JobContext object.
+   * @param theClass
+   *        Zebra output partitioner class          
         * @param paths
         *          The list of paths 
         *          The path must either not existent, or must be an empty 
directory.
-        * @param theClass
-        *            Zebra output partitioner class
         */
-
        public static void setMultipleOutputs(JobContext jobContext, Class<? 
extends ZebraOutputPartition> theClass, Path... paths)
        throws IOException {
                Configuration conf = jobContext.getConfiguration();
@@ -209,9 +211,28 @@ public class BasicTableOutputFormat exte
                }
                conf.setBoolean(IS_MULTI, true);
                setZebraOutputPartitionClass(jobContext, theClass);
-
        }
-
+       
+       /**
+   * Set the multiple output paths of the BasicTable in JobContext
+   * 
+   * @param jobContext
+   *          The JobContext object.
+   * @param theClass
+   *          Zebra output partitioner class
+   * @param arguments
+   *          Arguments string to partitioner class
+   * @param paths
+   *          The list of paths 
+   *          The path must either not existent, or must be an empty directory.
+   */
+  public static void setMultipleOutputs(JobContext jobContext, Class<? extends 
ZebraOutputPartition> theClass, String arguments, Path... paths)
+  throws IOException {
+    setMultipleOutputs(jobContext, theClass, paths);
+    if (arguments != null) {
+      
jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS, 
arguments);
+    }
+  }
 
        /**
         * Set the multiple output paths of the BasicTable in JobContext
@@ -256,7 +277,6 @@ public class BasicTableOutputFormat exte
 
        }
 
-
        private static void setZebraOutputPartitionClass(
                        JobContext jobContext, Class<? extends 
ZebraOutputPartition> theClass) throws IOException {
                if (!ZebraOutputPartition.class.isAssignableFrom(theClass))
@@ -264,7 +284,6 @@ public class BasicTableOutputFormat exte
                
jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS, 
theClass.getName());
        }
 
-
        public static Class<? extends ZebraOutputPartition> 
getZebraOutputPartitionClass(JobContext jobContext) throws IOException {
                Configuration conf = jobContext.getConfiguration();
 
@@ -287,8 +306,6 @@ public class BasicTableOutputFormat exte
 
        }
 
-
-
        /**
         * Set the output path of the BasicTable in JobContext
         * 
@@ -392,7 +409,6 @@ public class BasicTableOutputFormat exte
 
        }
 
-
        /**
         * Generates a BytesWritable key for the input key
         * using keygenerate provided. Sort Key(s) are used to generate this 
object
@@ -409,9 +425,6 @@ public class BasicTableOutputFormat exte
                return kg.generateKey(t);
        }
 
-
-
-
        /**
         * Set the table storage hint in JobContext, should be called after 
setSchema is
         * called.
@@ -737,7 +750,13 @@ class TableOutputCommitter extends Outpu
 class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> {
        private final TableInserter inserter[];
        private org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition op = 
null;
-
+       
+       // for Pig's call path;
+       final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
+   private int[] sortColIndices = null;
+   private KeyGenerator builder = null;
+   private Tuple t = null;
+   
        public TableRecordWriter(String path, TaskAttemptContext context) 
throws IOException {  
                Configuration conf = context.getConfiguration();
                if(conf.getBoolean(BasicTableOutputFormat.IS_MULTI, false) == 
true) {     
@@ -753,8 +772,36 @@ class TableRecordWriter extends RecordWr
                        BasicTable.Writer writer =
                                new BasicTable.Writer(paths[i], conf);
                        this.inserter[i] = writer.getInserter( inserterName, 
true, checkType);
+
+                       // Set up SortInfo related stuff only once;
+                       if (i == 0) {
+             if (writer.getSortInfo() != null)
+             {
+          sortColIndices = writer.getSortInfo().getSortIndices();
+          SortInfo sortInfo =  writer.getSortInfo();
+          String[] sortColNames = sortInfo.getSortColumnNames();
+          org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
+
+          byte[] types = new byte[sortColNames.length];
+          
+          for(int j =0 ; j < sortColNames.length; ++j){
+              types[j] = 
schema.getColumn(sortColNames[j]).getType().pigDataType();
+          }
+          t = TypesUtils.createTuple(sortColNames.length);
+          builder = makeKeyBuilder(types);
+             }
+                       }
                }
        }
+       
+  private KeyGenerator makeKeyBuilder(byte[] elems) {
+    ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+    for (int i = 0; i < elems.length; ++i) {
+        exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+    }
+    return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+  }
+
 
        @Override
        public void close(TaskAttemptContext context) throws IOException {
@@ -765,6 +812,17 @@ class TableRecordWriter extends RecordWr
 
        @Override
        public void write(BytesWritable key, Tuple value) throws IOException {
+    if (key == null) {
+      if (sortColIndices != null) { // If this is a sorted table and key is 
null (Pig's call path);
+        for (int i =0; i < sortColIndices.length;++i) {
+          t.set(i, value.get(sortColIndices[i]));
+        }
+        key = builder.generateKey(t);        
+      } else { // for unsorted table;
+        key = KEY0;
+      }
+    }
+                 
                if(op != null ) {         
                        int idx = op.getOutputPartition(key, value);
                        if(idx < 0 || (idx >= inserter.length)) {
@@ -775,6 +833,5 @@ class TableRecordWriter extends RecordWr
                        inserter[0].insert(key, value);
                }
        }
-
 }
 

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=935968&r1=935967&r2=935968&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
 Tue Apr 20 15:51:38 2010
@@ -26,20 +26,14 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.zebra.pig.comparator.ComparatorExpr;
-import org.apache.hadoop.zebra.pig.comparator.ExprUtils;
-import org.apache.hadoop.zebra.pig.comparator.KeyGenerator;
-import org.apache.hadoop.zebra.io.BasicTable;
-import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.mapreduce.ZebraSchema;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
+import org.apache.hadoop.zebra.mapreduce.ZebraStorageHint;
 import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.SortInfo;
-import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
@@ -57,282 +51,180 @@ public class TableStorer extends StoreFu
     private static final String UDFCONTEXT_SORT_INFO = 
"zebra.UDFContext.sortInfo";
     private static final String UDFCONTEXT_OUTPUT_CHECKTYPE = 
"zebra.UDFContext.checkType";
 
-    static final String OUTPUT_STORAGEHINT = 
"mapreduce.lib.table.output.storagehint";
-    static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema";
-    static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir";
-    static final String SORT_INFO = "mapreduce.lib.table.sort.info";
     static final String OUTPUT_CHECKTYPE = 
"mapreduce.lib.table.output.checktype";
 
     private String storageHintString = null;
     private String udfContextSignature = null;
-    private TableRecordWriter tableRecordWriter = null;
+    private RecordWriter<BytesWritable, Tuple> tableRecordWriter = null;
+    private String partitionClassString = null;
+    Class<? extends ZebraOutputPartition> partitionClass = null;
+    private String partitionClassArgumentsString = null;
 
     public TableStorer() {
     }
 
-    public TableStorer(String storageHintStr) throws ParseException, 
IOException {
-        storageHintString = storageHintStr;
+    public TableStorer(String storageHintString) {
+      this.storageHintString = storageHintString;
     }
 
+    public TableStorer(String storageHintString, String partitionClassString) {
+      this.storageHintString = storageHintString;
+      this.partitionClassString = partitionClassString;
+    }
+
+    public TableStorer(String storageHintString, String partitionClassString, 
String partitionClassArgumentsString) {
+      this.storageHintString = storageHintString;
+      this.partitionClassString = partitionClassString;
+      this.partitionClassArgumentsString = partitionClassArgumentsString;
+    }
+
+
     @Override
     public void putNext(Tuple tuple) throws IOException {
+      try {
         tableRecordWriter.write( null, tuple );
+      } catch (InterruptedException e) {
+        throw new IOException(e.getMessage());
+      }
     }
 
     @Override
     public void checkSchema(ResourceSchema schema) throws IOException {
-        // Get schemaStr and sortColumnNames from the given schema. In the 
process, we
-        // also validate the schema and sorting info.
-        ResourceSchema.Order[] orders = schema.getSortKeyOrders();
-        boolean descending = false;
-        for (ResourceSchema.Order order : orders)
+      // Get schemaStr and sortColumnNames from the given schema. In the 
process, we
+      // also validate the schema and sorting info.
+      ResourceSchema.Order[] orders = schema.getSortKeyOrders();
+      boolean descending = false;
+      for (ResourceSchema.Order order : orders)
+      {
+        if (order == ResourceSchema.Order.DESCENDING)
         {
-          if (order == ResourceSchema.Order.DESCENDING)
-          {
-            Log LOG = LogFactory.getLog(TableStorer.class);
-            LOG.warn("Sorting in descending order is not supported by Zebra 
and the table will be unsorted.");
-            descending = true;
-            break;
-          }
-        }
-        StringBuilder sortColumnNames = new StringBuilder();
-        if (!descending) {
-          ResourceSchema.ResourceFieldSchema[] fields = schema.getFields();
-          int[] index = schema.getSortKeys();
+          Log LOG = LogFactory.getLog(TableStorer.class);
+          LOG.warn("Sorting in descending order is not supported by Zebra and 
the table will be unsorted.");
+          descending = true;
+          break;
+        }
+      }
+      StringBuilder sortColumnNames = new StringBuilder();
+      if (!descending) {
+        ResourceSchema.ResourceFieldSchema[] fields = schema.getFields();
+        int[] index = schema.getSortKeys();
+      
+        for( int i = 0; i< index.length; i++ ) {
+          ResourceFieldSchema field = fields[index[i]];
+          String name = field.getName();
+          if( name == null )
+              throw new IOException("Zebra does not support column positional 
reference yet");
+          if( !org.apache.pig.data.DataType.isAtomic( field.getType() ) )
+              throw new IOException( "Field [" + name + "] is not of simple 
type as required for a sort column now." );
+          if( i > 0 )
+              sortColumnNames.append( "," );
+          sortColumnNames.append( name );
+        }
+      }
+
+      // Convert resource schema to zebra schema
+      org.apache.hadoop.zebra.schema.Schema zebraSchema;
+      try {
+          zebraSchema = SchemaConverter.convertFromResourceSchema( schema );
+      } catch (ParseException ex) {
+          throw new IOException("Exception thrown from SchemaConverter: " + 
ex.getMessage() );
+      }
+
+      Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+              this.getClass(), new String[]{ udfContextSignature } );
+      properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA, zebraSchema.toString() 
);
+      properties.setProperty( UDFCONTEXT_SORT_INFO, sortColumnNames.toString() 
);
         
-          for( int i = 0; i< index.length; i++ ) {
-            ResourceFieldSchema field = fields[index[i]];
-            String name = field.getName();
-            if( name == null )
-                throw new IOException("Zebra does not support column 
positional reference yet");
-            if( !org.apache.pig.data.DataType.isAtomic( field.getType() ) )
-                throw new IOException( "Field [" + name + "] is not of simple 
type as required for a sort column now." );
-            if( i > 0 )
-                sortColumnNames.append( "," );
-            sortColumnNames.append( name );
-          }
-        }
-
-        // Convert resource schema to zebra schema
-        org.apache.hadoop.zebra.schema.Schema zebraSchema;
-        try {
-            zebraSchema = SchemaConverter.convertFromResourceSchema( schema );
-        } catch (ParseException ex) {
-            throw new IOException("Exception thrown from SchemaConverter: " + 
ex.getMessage() );
-        }
-
-        Properties properties = UDFContext.getUDFContext().getUDFProperties( 
-                this.getClass(), new String[]{ udfContextSignature } );
-        properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA, 
zebraSchema.toString() );
-        properties.setProperty( UDFCONTEXT_SORT_INFO, 
sortColumnNames.toString() );
-        
-        // This is to turn off type check for potential corner cases - for 
internal use only;
-        if (System.getenv("zebra_output_checktype") != null && 
System.getenv("zebra_output_checktype").equals("no")) {
-          properties.setProperty( UDFCONTEXT_OUTPUT_CHECKTYPE, "no");
-        }
+      // This is to turn off type check for potential corner cases - for 
internal use only;
+      if (System.getenv("zebra_output_checktype") != null && 
System.getenv("zebra_output_checktype").equals("no")) {
+        properties.setProperty( UDFCONTEXT_OUTPUT_CHECKTYPE, "no");
+      }
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
     throws IOException {
-        return new TableOutputFormat();
+      return new BasicTableOutputFormat();
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void prepareToWrite(RecordWriter writer)
     throws IOException {
-        tableRecordWriter = (TableRecordWriter)writer;
-        if( tableRecordWriter == null ) {
-            throw new IOException( "Invalid type of writer. Expected type: 
TableRecordWriter." );
-        }
+      tableRecordWriter = writer;
+      if( tableRecordWriter == null ) {
+          throw new IOException( "Invalid type of writer. Expected type: 
TableRecordWriter." );
+      }
     }
 
     @Override
     public String relToAbsPathForStoreLocation(String location, Path curDir)
     throws IOException {
-        return LoadFunc.getAbsolutePath( location, curDir );
+      return LoadFunc.getAbsolutePath( location, curDir );
     }
 
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
-        Configuration conf = job.getConfiguration();
-        conf.set( OUTPUT_STORAGEHINT, storageHintString );
-        conf.set( OUTPUT_PATH, location );
-
-        // Get schema string and sorting info from UDFContext and re-store 
them to
-        // job config.
-        Properties properties = UDFContext.getUDFContext().getUDFProperties( 
-                this.getClass(), new String[]{ udfContextSignature } );
-        conf.set( OUTPUT_SCHEMA, properties.getProperty( 
UDFCONTEXT_OUTPUT_SCHEMA ) );
-        conf.set( SORT_INFO, properties.getProperty( UDFCONTEXT_SORT_INFO ) );
-        
-        // Get checktype information from UDFContext and re-store it to job 
config;
-        if (properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE) != null && 
properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE).equals("no")) {
-          conf.setBoolean(OUTPUT_CHECKTYPE, false);
+      Configuration conf = job.getConfiguration();
+      
+      String[] outputs = location.split(",");
+      
+      if (outputs.length == 1) {
+        BasicTableOutputFormat.setOutputPath(job, new Path(location));
+      } else if (outputs.length > 1) {
+        if (partitionClass == null) {
+          try {
+            partitionClass = (Class<? extends ZebraOutputPartition>) 
conf.getClassByName(partitionClassString);
+          } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+          } 
         }
+        
+        Path[] paths = new Path[outputs.length];
+        for (int i=0; i<paths.length; i++) {
+          paths[i] = new Path(outputs[i]);
+        }
+
+        BasicTableOutputFormat.setMultipleOutputs(job, partitionClass, 
partitionClassArgumentsString, paths);
+      } else {
+        throw new IOException( "Invalid location : " + location);
+      }
+
+      // Get schema string and sorting info from UDFContext and re-store them 
to
+      // job config.
+      Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+              this.getClass(), new String[]{ udfContextSignature } );
+      ZebraSchema zSchema = 
ZebraSchema.createZebraSchema(properties.getProperty(UDFCONTEXT_OUTPUT_SCHEMA));
+      ZebraSortInfo zSortInfo = 
ZebraSortInfo.createZebraSortInfo(properties.getProperty(UDFCONTEXT_SORT_INFO), 
null);
+      ZebraStorageHint zStorageHint = 
ZebraStorageHint.createZebraStorageHint(storageHintString);
+      try {
+        BasicTableOutputFormat.setStorageInfo(job, zSchema, zStorageHint, 
zSortInfo);
+      } catch (ParseException e) {
+        throw new IOException("Invalid storage info: " + e.getMessage());
+      }
+        
+      // Get checktype information from UDFContext and re-store it to job 
config;
+      if (properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE) != null && 
properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE).equals("no")) {
+        conf.setBoolean(OUTPUT_CHECKTYPE, false);
+      }
     }
 
     @Override
     public void storeSchema(ResourceSchema schema, String location, Job job)
     throws IOException {
-       // no-op. We do close at cleanupJob().
-        BasicTable.Writer write = new BasicTable.Writer( new Path( location ), 
-                job.getConfiguration());
-        write.close();
+      //TODO: This is temporary - we will do close at cleanupJob() when 
OutputCommitter is ready.
+      BasicTableOutputFormat.close(job);
     }
 
     @Override
     public void setStoreFuncUDFContextSignature(String signature) {
-        udfContextSignature = signature;
+      udfContextSignature = signature;
     }
 
     @Override
     public void storeStatistics(ResourceStatistics stats, String location,
             Job job) throws IOException {
-        // no-op
-    }
-
-}
-
-/**
- * 
- * Table OutputFormat
- * 
- */
-class TableOutputFormat extends OutputFormat<BytesWritable, Tuple> {
-    @Override
-    public void checkOutputSpecs(JobContext job) throws IOException, 
InterruptedException {
-        Configuration conf = job.getConfiguration();
-        String location = conf.get( TableStorer.OUTPUT_PATH );
-        String schemaStr = conf.get( TableStorer.OUTPUT_SCHEMA );
-        String storageHint = conf.get( TableStorer.OUTPUT_STORAGEHINT );
-        String sortColumnNames = conf.get( TableStorer.SORT_INFO );
-
-        BasicTable.Writer writer = new BasicTable.Writer( new Path( location 
), 
-                schemaStr, storageHint, sortColumnNames, null, conf );
-        writer.finish();
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext taContext)
-    throws IOException, InterruptedException {
-        return new TableOutputCommitter() ;
-    }
-
-    @Override
-    public org.apache.hadoop.mapreduce.RecordWriter<BytesWritable, Tuple> 
getRecordWriter(
-            TaskAttemptContext taContext) throws IOException, 
InterruptedException {
-        return new TableRecordWriter( taContext );
-    }
-
-}
-
-// TODO: make corresponding changes for commit and cleanup. Currently, no-ops.
-class TableOutputCommitter extends OutputCommitter {
-    @Override
-    public void abortTask(TaskAttemptContext taContext) throws IOException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void cleanupJob(JobContext jobContext) throws IOException {
-//     Configuration conf = jobContext.getConfiguration();
-//        String location = conf.get( TableStorer.OUTPUT_PATH );
-//        BasicTable.Writer write = new BasicTable.Writer( new Path( location 
), conf );
-//        write.close();
+      // no-op
     }
-
-    @Override
-    public void commitTask(TaskAttemptContext taContext) throws IOException {
-       int i = 0;
-       i++;
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public boolean needsTaskCommit(TaskAttemptContext taContext) throws 
IOException {
-        return false;
-    }
-
-    @Override
-    public void setupJob(JobContext jobContext) throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void setupTask(TaskAttemptContext taContext) throws IOException {
-        // TODO Auto-generated method stub
-    }
-
-}
-
-/**
- * 
- * Table RecordWriter
- * 
- */
-class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> {
-    final private BytesWritable KEY0 = new BytesWritable(new byte[0]); 
-    private BasicTable.Writer writer;
-    private TableInserter inserter;
-    private int[] sortColIndices = null;
-    KeyGenerator builder;
-    Tuple t;
-
-    public TableRecordWriter(TaskAttemptContext taContext) throws IOException {
-        Configuration conf = taContext.getConfiguration();
-
-        String path = conf.get(TableStorer.OUTPUT_PATH);
-        writer = new BasicTable.Writer( new Path( path ), conf );
-
-        if (writer.getSortInfo() != null)
-        {
-            sortColIndices = writer.getSortInfo().getSortIndices();
-            SortInfo sortInfo =  writer.getSortInfo();
-            String[] sortColNames = sortInfo.getSortColumnNames();
-            org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
-
-            byte[] types = new byte[sortColNames.length];
-            for(int i =0 ; i < sortColNames.length; ++i){
-                types[i] = 
schema.getColumn(sortColNames[i]).getType().pigDataType();
-            }
-            t = TypesUtils.createTuple(sortColNames.length);
-            builder = makeKeyBuilder(types);
-        }
-
-        boolean checkType = conf.getBoolean(TableStorer.OUTPUT_CHECKTYPE, 
true);
-        inserter = writer.getInserter("patition-" + 
taContext.getTaskAttemptID().getTaskID().getId(), false, checkType);
-    }
-
-    @Override
-    public void close(TaskAttemptContext taContext) throws IOException {
-        inserter.close();
-        writer.finish();
-    }
-
-    private KeyGenerator makeKeyBuilder(byte[] elems) {
-        ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
-        for (int i = 0; i < elems.length; ++i) {
-            exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
-        }
-        return new KeyGenerator(ExprUtils.tupleComparator(exprs));
-    }
-
-    @Override
-    public void write(BytesWritable key, Tuple value) throws IOException {
-        System.out.println( "Tuple: " + value.toDelimitedString(",") );
-        if (sortColIndices != null)    {
-            for(int i =0; i < sortColIndices.length;++i) {
-                t.set(i, value.get(sortColIndices[i]));
-            }
-            key = builder.generateKey(t);
-        } else if (key == null) {
-            key = KEY0;
-        }
-        inserter.insert(key, value);
-    }
-
 }

Modified: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java?rev=935968&r1=935967&r2=935968&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
 Tue Apr 20 15:51:38 2010
@@ -55,7 +55,7 @@ public class TestMixedType1 extends Base
     System.out.println("ONCE SETUP !! ---------");
     init();
     
-    path = getTableFullPath("");  
+    path = getTableFullPath("TestMixedType1");  
     removeDir(path);
     
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java?rev=935968&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java
 Tue Apr 20 15:51:38 2010
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.zebra.BaseTestCase;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.Iterator;
+import junit.framework.Assert;
+
+/**
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ * 
+ */
+public class TestMultipleOutputs1 extends BaseTestCase implements Tool {
+  static String inputPath;
+  static String inputFileName = "multi-input.txt";
+  public static String sortKey = null;
+
+  @Before
+  public void setUp() throws Exception {
+    init();
+    
+    inputPath = getTableFullPath(inputFileName).toString();
+    
+    writeToFile(inputPath);
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    if (mode == TestMode.local) {
+      pigServer.shutdown();
+    }
+  }
+  
+  public static void writeToFile (String inputFile) throws IOException{
+    if (mode == TestMode.local) {
+      FileWriter fstream = new FileWriter(inputFile);
+      BufferedWriter out = new BufferedWriter(fstream);
+      out.write("us\t2\n");
+      out.write("japan\t2\n");
+      out.write("india\t4\n");
+      out.write("us\t2\n");
+      out.write("japan\t1\n");
+      out.write("india\t3\n");
+      out.write("nouse\t5\n");
+      out.write("nowhere\t4\n");
+      out.close();
+    }
+
+    if (mode == TestMode.cluster) {
+      FSDataOutputStream fout = fs.create(new Path (inputFile));
+      fout.writeBytes("us\t2\n");
+      fout.writeBytes("japan\t2\n");
+      fout.writeBytes("india\t4\n");
+      fout.writeBytes("us\t2\n");
+      fout.writeBytes("japan\t1\n");
+      fout.writeBytes("india\t3\n");
+      fout.writeBytes("nouse\t5\n");
+      fout.writeBytes("nowhere\t4\n");
+      fout.close();
+    }
+  }
+  
+  // test no sort key;
+  @Test
+  public void test1() throws ParseException, IOException,
+      org.apache.hadoop.zebra.parser.ParseException, Exception {
+    // Load data;
+    String query = "records = LOAD '" + inputPath + "' as (word:chararray, 
count:int);";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+    
+    // Store using multiple outputs;
+    String outputPaths = "us,india,japan";
+    removeDir(getTableFullPath("us"));
+    removeDir(getTableFullPath("india"));
+    removeDir(getTableFullPath("japan"));
+
+    query = "store records into '" + outputPaths + "' using 
org.apache.hadoop.zebra.pig.TableStorer('[word,count]'," +
+        
"'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass');";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);    
+    
+    // Validate results;
+    query = "records = LOAD '" + "us"
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    int count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("us", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("us", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));                 
+      } else if (count == 3) {
+        Assert.assertEquals("nouse", RowValue.get(0));
+        Assert.assertEquals(5, RowValue.get(1));       
+      } else if (count == 4) {
+        Assert.assertEquals("nowhere", RowValue.get(0));
+        Assert.assertEquals(4, RowValue.get(1));       
+      }
+    }
+    Assert.assertEquals(count, 4);
+
+    query = "records = LOAD '" + "india"
+      + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("india", RowValue.get(0));
+        Assert.assertEquals(4, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("india", RowValue.get(0));
+        Assert.assertEquals(3, RowValue.get(1));                 
+      } 
+    }
+    Assert.assertEquals(count, 2);
+
+    query = "records = LOAD '" + "japan"
+    + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("japan", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("japan", RowValue.get(0));
+        Assert.assertEquals(1, RowValue.get(1));                 
+      } 
+    }
+    Assert.assertEquals(count, 2);
+  }    
+
+  //Test sort key on word;
+  @Test
+  public void test2() throws ParseException, IOException,
+      org.apache.hadoop.zebra.parser.ParseException, Exception {
+    // Load data;
+    String query = "a = LOAD '" + inputPath + "' as (word:chararray, 
count:int);";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);
+    
+    query = "records = order a by word;";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+    
+    // Store using multiple outputs;
+    String outputPaths = "us,india,japan";
+    removeDir(getTableFullPath("us"));
+    removeDir(getTableFullPath("india"));
+    removeDir(getTableFullPath("japan"));
+    ExecJob pigJob = pigServer
+      .store(
+        "records",
+        outputPaths,
+        TableStorer.class.getCanonicalName() +
+             "('[word,count]', 
'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass')");   
 
+    
+    Assert.assertNull(pigJob.getException());
+    
+    // Validate results;
+    query = "records = LOAD '" + "us"
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    int count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("nouse", RowValue.get(0));
+        Assert.assertEquals(5, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("nowhere", RowValue.get(0));
+        Assert.assertEquals(4, RowValue.get(1));                 
+      } else if (count == 3) {
+        Assert.assertEquals("us", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      } else if (count == 4) {
+        Assert.assertEquals("us", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      }
+    }
+    Assert.assertEquals(count, 4);
+
+    query = "records = LOAD '" + "india"
+      + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("india", RowValue.get(0));
+        Assert.assertEquals(4, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("india", RowValue.get(0));
+        Assert.assertEquals(3, RowValue.get(1));                 
+      } 
+    }
+    Assert.assertEquals(count, 2);
+
+    query = "records = LOAD '" + "japan"
+    + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("japan", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("japan", RowValue.get(0));
+        Assert.assertEquals(1, RowValue.get(1));                 
+      } 
+    }
+    Assert.assertEquals(count, 2);
+  }
+  
+  //Test sort key on word and count;
+  @Test
+  public void test3() throws ParseException, IOException,
+      org.apache.hadoop.zebra.parser.ParseException, Exception {
+    // Load data;
+    String query = "a = LOAD '" + inputPath + "' as (word:chararray, 
count:int);";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);
+    
+    query = "records = order a by word, count;";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+    
+    // Store using multiple outputs;
+    String outputPaths = "us,india,japan";
+    removeDir(getTableFullPath("us"));
+    removeDir(getTableFullPath("india"));
+    removeDir(getTableFullPath("japan"));
+    ExecJob pigJob = pigServer
+      .store(
+        "records",
+        outputPaths,
+        TableStorer.class.getCanonicalName() +
+             "('[word,count]', 
'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass')");   
 
+    
+    Assert.assertNull(pigJob.getException());
+    
+    // Validate results;
+    query = "records = LOAD '" + "us"
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    int count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("nouse", RowValue.get(0));
+        Assert.assertEquals(5, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("nowhere", RowValue.get(0));
+        Assert.assertEquals(4, RowValue.get(1));                 
+      } else if (count == 3) {
+        Assert.assertEquals("us", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      } else if (count == 4) {
+        Assert.assertEquals("us", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      }
+    }
+    Assert.assertEquals(count, 4);
+
+    query = "records = LOAD '" + "india"
+      + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("india", RowValue.get(0));
+        Assert.assertEquals(3, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("india", RowValue.get(0));
+        Assert.assertEquals(4, RowValue.get(1));                 
+      } 
+    }
+    Assert.assertEquals(count, 2);
+
+    query = "records = LOAD '" + "japan"
+    + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("japan", RowValue.get(0));
+        Assert.assertEquals(1, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("japan", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));                 
+      } 
+    }
+    Assert.assertEquals(count, 2);
+  }
+  
+  //Negative test case: invalid partition class;
+  @Test (expected = IOException.class)
+  public void testNegative1() throws ParseException, IOException,
+      org.apache.hadoop.zebra.parser.ParseException, Exception {
+    // Load data;
+    String query = "a = LOAD '" + inputPath + "' as (word:chararray, 
count:int);";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);
+    
+    query = "records = order a by word, count;";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+    
+    // Store using multiple outputs;
+    String outputPaths = "us,india,japan";
+    removeDir(getTableFullPath("us"));
+    removeDir(getTableFullPath("india"));
+    removeDir(getTableFullPath("japan"));
+    pigServer
+      .store(
+        "records",
+        outputPaths,
+        TableStorer.class.getCanonicalName() +
+             "('[word,count]', 
'org.apache.hadoop.zebra.pig.notexistingclass')");    
+  }
+  
+  public static class OutputPartitionerClass extends ZebraOutputPartition {
+
+    @Override
+    public int getOutputPartition(BytesWritable key, Tuple value) {
+      String reg = null;
+      try {
+        reg = (String) (value.get(0));
+      } catch (Exception e) {
+        //
+      }
+
+      if (reg.equals("us"))
+        return 0;
+      if (reg.equals("india"))
+        return 1;
+      if (reg.equals("japan"))
+        return 2;
+
+      return 0;
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    TestMultipleOutputs1 test = new TestMultipleOutputs1();
+    
+    test.setUp();
+    test.test1();
+    test.tearDown();
+    
+    test.setUp();
+    test.test2();
+    test.tearDown();    
+
+    test.setUp();
+    test.test3();
+    test.tearDown();    
+
+    test.setUp();
+    test.testNegative1();
+    test.tearDown();
+    
+    return 0;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    conf = new Configuration();
+    
+    int res = ToolRunner.run(conf, new TestMultipleOutputs1(), args);
+    System.out.println("PASS");
+    System.exit(res);
+  }
+}

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java?rev=935968&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
 Tue Apr 20 15:51:38 2010
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.zebra.BaseTestCase;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.Iterator;
+import junit.framework.Assert;
+
+/**
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ * 
+ */
+public class TestMultipleOutputs2 extends BaseTestCase implements Tool {
+  static String inputPath;
+  static String inputFileName = "multi-input.txt";
+  public static String sortKey = null;
+
+  @Before
+  public void setUp() throws Exception {
+    init();
+    
+    inputPath = getTableFullPath(inputFileName).toString();
+    
+    writeToFile(inputPath);
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    if (mode == TestMode.local) {
+      pigServer.shutdown();
+    }
+  }
+  
+  public static void writeToFile (String inputFile) throws IOException{
+    if (mode == TestMode.local) {
+      FileWriter fstream = new FileWriter(inputFile);
+      BufferedWriter out = new BufferedWriter(fstream);
+      out.write("us\t2\n");
+      out.write("japan\t2\n");
+      out.write("india\t4\n");
+      out.write("us\t2\n");
+      out.write("japan\t1\n");
+      out.write("india\t3\n");
+      out.write("nouse\t5\n");
+      out.write("nowhere\t4\n");
+      out.close();
+    }
+
+    if (mode == TestMode.cluster) {
+      FSDataOutputStream fout = fs.create(new Path (inputFile));
+      fout.writeBytes("us\t2\n");
+      fout.writeBytes("japan\t2\n");
+      fout.writeBytes("india\t4\n");
+      fout.writeBytes("us\t2\n");
+      fout.writeBytes("japan\t1\n");
+      fout.writeBytes("india\t3\n");
+      fout.writeBytes("nouse\t5\n");
+      fout.writeBytes("nowhere\t4\n");
+      fout.close();
+    }
+  }
+
+  @Test
+  public void test1() throws ParseException, IOException,
+      org.apache.hadoop.zebra.parser.ParseException, Exception {
+    // Load data;
+    String query = "records = LOAD '" + inputPath + "' as (word:chararray, 
count:int);";
+    System.out.println("query = " + query);
+    pigServer.registerQuery(query);    
+
+    Iterator<Tuple> it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      Tuple cur = it.next();
+      System.out.println(cur);
+    }
+    
+    // Store using multiple outputs;
+    String outputPaths = "us_0,india_1,japan_2";
+    removeDir(getTableFullPath("us_0"));
+    removeDir(getTableFullPath("india_1"));
+    removeDir(getTableFullPath("japan_2"));
+    ExecJob pigJob = pigServer
+      .store(
+        "records",
+        outputPaths,
+        TableStorer.class.getCanonicalName() +
+             "('[word,count]', 
'org.apache.hadoop.zebra.pig.TestMultipleOutputs2$OutputPartitionerClass', 
'us,india,japan')");    
+    
+    Assert.assertNull(pigJob.getException());
+    
+    // Validate results;
+    query = "records = LOAD '" + "us_0"
+          + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    int count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("us", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("us", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));                 
+      } else if (count == 3) {
+        Assert.assertEquals("nouse", RowValue.get(0));
+        Assert.assertEquals(5, RowValue.get(1));       
+      } else if (count == 4) {
+        Assert.assertEquals("nowhere", RowValue.get(0));
+        Assert.assertEquals(4, RowValue.get(1));       
+      }
+    }
+    Assert.assertEquals(count, 4);
+
+    query = "records = LOAD '" + "india_1"
+      + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("india", RowValue.get(0));
+        Assert.assertEquals(4, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("india", RowValue.get(0));
+        Assert.assertEquals(3, RowValue.get(1));                 
+      } 
+    }
+    Assert.assertEquals(count, 2);
+
+    query = "records = LOAD '" + "japan_2"
+    + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+    count = 0;
+    System.out.println(query);
+    pigServer.registerQuery(query);
+    it = pigServer.openIterator("records");
+    while (it.hasNext()) {
+      count ++;
+      Tuple RowValue = it.next();
+      System.out.println(RowValue);
+      if (count == 1) {
+        Assert.assertEquals("japan", RowValue.get(0));
+        Assert.assertEquals(2, RowValue.get(1));       
+      } else if (count == 2) {
+        Assert.assertEquals("japan", RowValue.get(0));
+        Assert.assertEquals(1, RowValue.get(1));                 
+      } 
+    }
+    Assert.assertEquals(count, 2);
+  }
+
+  public static class OutputPartitionerClass extends ZebraOutputPartition {
+
+    @Override
+    public int getOutputPartition(BytesWritable key, Tuple value) {
+      String reg = null;
+      
+      try {
+        reg = (String) (value.get(0));
+      } catch (Exception e) {
+        //
+      }
+      
+      String argumentsString = 
conf.get(BasicTableOutputFormat.ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS);
+      
+      String[] arguments = argumentsString.split(",");
+
+      if (reg.equals(arguments[0]))
+        return 0;
+      if (reg.equals(arguments[1]))
+        return 1;
+      if (reg.equals(arguments[2]))
+        return 2;
+
+      return 0;
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    TestMultipleOutputs2 test = new TestMultipleOutputs2();
+
+    test.setUp();
+    test.test1();
+    test.tearDown();
+
+    return 0;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    conf = new Configuration();
+    
+    int res = ToolRunner.run(conf, new TestMultipleOutputs2(), args);
+    System.out.println("PASS");
+    System.exit(res);
+  }
+}


Reply via email to