Author: yanz
Date: Thu Apr 22 22:17:21 2010
New Revision: 937086

URL: http://svn.apache.org/viewvc?rev=937086&view=rev
Log:
PIG-1342 Avoid making unnecessary name node calls for writes in Zebra (chaow 
via yanz)

Added:
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraConf.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableUnion.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=937086&r1=937085&r2=937086&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Thu Apr 22 22:17:21 2010
@@ -82,6 +82,8 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    PIG-1342 Avoid making unnecessary name node calls for writes in Zebra 
(chaow via yanz) 
+
     PIG-1356 TableLoader makes unnecessary calls to build a Job instance that 
create a new JobClient in the hadoop 0.20.9 (yanz)
 
     PIG-1349 Hubson test failure in test case TestBasicUnion (xuefuz via yanz)

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=937086&r1=937085&r2=937086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 Thu Apr 22 22:17:21 2010
@@ -53,11 +53,14 @@ import org.apache.hadoop.zebra.io.Column
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRowSplit;
 import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGScanner;
 import org.apache.hadoop.zebra.types.CGSchema;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
 import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.ZebraConf;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.parser.TableSchemaParser;
+import org.apache.hadoop.zebra.pig.TableStorer;
 import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.hadoop.zebra.types.SortInfo;
 import org.apache.pig.data.Tuple;
@@ -1284,7 +1287,7 @@ public class BasicTable {
         String comparator, Configuration conf) throws IOException {
       try {
        actualOutputPath = path;
-       writerConf = conf;        
+         writerConf = conf;              
         schemaFile =
             new SchemaFile(path, btSchemaString, btStorageString, sortColumns,
                 comparator, conf);
@@ -1353,7 +1356,6 @@ public class BasicTable {
     }
 
     /**
-    /**
      * Reopen an already created BasicTable for writing. Exception will be
      * thrown if the table is already closed, or is in the process of being
      * closed.
@@ -1361,9 +1363,13 @@ public class BasicTable {
     public Writer(Path path, Configuration conf) throws IOException {
       try {
        actualOutputPath = path;
-       writerConf = conf;        
-        // fake an empty deleted cg list as no cg should have been deleted now
-        schemaFile = new SchemaFile(path, new String[0], conf);
+         writerConf = conf;
+         
+         if (ZebraConf.getOutputSchema(conf) != null) { 
+           schemaFile = new SchemaFile(conf);  // Read out schemaFile from 
conf, instead of from hdfs;
+         } else { // This is only for io test cases and it cannot happen for 
m/r and pig cases; 
+           schemaFile = new SchemaFile(path, new String[0], conf); // fake an 
empty deleted cg list as no cg should have been deleted now
+         }
         int numCGs = schemaFile.getNumOfPhysicalSchemas();
         partition = schemaFile.getPartition();
         sorted = schemaFile.isSorted();
@@ -1371,11 +1377,16 @@ public class BasicTable {
         cgTuples = new Tuple[numCGs];
         Path tmpWorkPath = new Path(path, "_temporary");       
         for (int nx = 0; nx < numCGs; nx++) {
+          CGSchema cgschema = new CGSchema(schemaFile.getPhysicalSchema(nx), 
sorted, 
+              schemaFile.getComparator(), schemaFile.getName(nx), 
schemaFile.getSerializer(nx), schemaFile.getCompressor(nx),
+              schemaFile.getOwner(nx), schemaFile.getGroup(nx), 
schemaFile.getPerm(nx));
+          
           colGroups[nx] =
             new ColumnGroup.Writer(
                        new Path(path, partition.getCGSchema(nx).getName()),
                        new Path(tmpWorkPath, 
partition.getCGSchema(nx).getName()),
-                  conf);
+                  cgschema,conf);
+          
           cgTuples[nx] = TypesUtils.createTuple(colGroups[nx].getSchema());
         }
         partition.setSource(cgTuples);
@@ -1751,6 +1762,49 @@ public class BasicTable {
     public SchemaFile(Path path, String[] deletedCGs, Configuration conf) 
throws IOException {
       readSchemaFile(path, deletedCGs, conf);
     }
+    
+    // ctor for reading from a job configuration object; we do not need a 
table path; 
+    // all information is held in the job configuration object.
+    public SchemaFile(Configuration conf) throws IOException {
+      String logicalStr = ZebraConf.getOutputSchema(conf);
+      storage = ZebraConf.getOutputStorageHint(conf);
+      String sortColumns = ZebraConf.getOutputSortColumns(conf) != null ? 
ZebraConf.getOutputSortColumns(conf) : "";
+      comparator = ZebraConf.getOutputComparator(conf) != null ? 
ZebraConf.getOutputComparator(conf) : "";      
+      
+      version = SCHEMA_VERSION;
+            
+      try {
+        logical = new Schema(logicalStr);
+      } catch (Exception e) {
+        throw new IOException("Schema build failed :" + e.getMessage());
+      }
+      
+      try {
+        partition = new Partition(logicalStr, storage, comparator, 
sortColumns);
+       
+      } catch (Exception e) {
+        throw new IOException("Partition constructor failed :" + 
e.getMessage());
+      }
+ 
+      cgschemas = partition.getCGSchemas();
+      physical = new Schema[cgschemas.length];
+      //cgDeletedFlags = new boolean[physical.length];
+      
+      for (int nx = 0; nx < cgschemas.length; nx++) {
+        physical[nx] = cgschemas[nx].getSchema();
+      }
+      
+      this.sortInfo = partition.getSortInfo();
+      this.sorted = partition.isSorted();
+      this.comparator = (this.sortInfo == null ? null : 
this.sortInfo.getComparator());
+      if (this.comparator == null)
+        this.comparator = "";
+           
+      String[] sortColumnStr = sortColumns.split(",");
+      if (sortColumnStr.length > 0) {
+        sortInfo = SortInfo.parse(SortInfo.toSortString(sortColumnStr), 
logical, comparator);
+      }
+    }
 
     public Schema[] getPhysicalSchema() {
       return physical;
@@ -1932,6 +1986,7 @@ public class BasicTable {
       cgDeletedFlags = new boolean[physical.length];
       TableSchemaParser parser;
       String cgschemastr;
+      
       try {
         for (int nx = 0; nx < numCGs; nx++) {
           cgschemastr = WritableUtils.readString(in);
@@ -1942,6 +1997,7 @@ public class BasicTable {
       catch (Exception e) {
         throw new IOException("parser.RecordSchema failed :" + e.getMessage());
       }
+      
       sorted = WritableUtils.readVInt(in) == 1 ? true : false;
       if (deletedCGs == null)
         setCGDeletedFlags(path, conf);
@@ -1955,6 +2011,7 @@ public class BasicTable {
           }
         }
       }
+      
       if (version.compareTo(new Version((short)1, (short)0)) > 0)
       {
         int numSortColumns = WritableUtils.readVInt(in);

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=937086&r1=937085&r2=937086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
 Thu Apr 22 22:17:21 2010
@@ -1741,17 +1741,22 @@ class ColumnGroup {
         fs.delete(path, true);
       }
 
+      // create final output path and temporary output path
       checkPath(path, true);
-
+      
+      Path parent = path.getParent();
+      Path tmpPath1 = new Path(parent, "_temporary");
+      Path tmpPath2 = new Path(tmpPath1, name);
+      checkPath(tmpPath2, true);
+      
       cgschema = new CGSchema(schema, sorted, comparator, name, serializer, 
compressor, owner, group, perm);
       CGSchema sfNew = CGSchema.load(fs, path);
       if (sfNew != null) {
-        // compare input with on-disk schema.
+        // sanity check - compare input with on-disk schema.
         if (!sfNew.equals(cgschema)) {
-          throw new IOException("Schemes are different.");
+          throw new IOException("Schema passed in is different from the one on 
disk");
         }
-      }
-      else {
+      } else {
         // create the schema file in FS
         cgschema.create(fs, path);
       }
@@ -1773,9 +1778,21 @@ class ColumnGroup {
       checkPath(path, true);
       checkMetaFile(finalOutputPath);
       cgschema = CGSchema.load(fs, finalOutputPath);
+    }
+
+    /*
+     * Reopen an already created ColumnGroup for writing.
+     * It takes in a CGSchema to set its own cgschema instead of going
+     * to disk to fetch this information. 
+     */
+    public Writer(Path finalPath, Path workPath, CGSchema cgschema, 
Configuration conf) throws IOException, ParseException {
+      this.path = workPath;
+      finalOutputPath = finalPath;      
+      this.conf = conf;
+      fs = path.getFileSystem(conf);
+      this.cgschema = cgschema;
+    }
 
-    }    
-    
     /**
      * Reopen an already created ColumnGroup for writing. RuntimeException will
      * be thrown if the table is already closed, or if createMetaBlock() is

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java?rev=937086&r1=937085&r2=937086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
 Thu Apr 22 22:17:21 2010
@@ -17,6 +17,9 @@
 package org.apache.hadoop.zebra.mapred;
 
 import java.io.IOException;
+
+import junit.framework.Assert;
+
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -147,7 +150,7 @@ import org.apache.hadoop.util.Reflection
 public class BasicTableOutputFormat implements
     OutputFormat<BytesWritable, Tuple> {
   private static final String OUTPUT_PATH = "mapred.lib.table.output.dir";
-  public static final String MULTI_OUTPUT_PATH = 
"mapred.lib.table.multi.output.dirs";
+  private static final String MULTI_OUTPUT_PATH = 
"mapred.lib.table.multi.output.dirs";
   private static final String OUTPUT_SCHEMA = "mapred.lib.table.output.schema";
   private static final String OUTPUT_STORAGEHINT =
       "mapred.lib.table.output.storagehint";
@@ -156,7 +159,7 @@ public class BasicTableOutputFormat impl
   private static final String OUTPUT_COMPARATOR =
       "mapred.lib.table.output.comparator";
   static final String IS_MULTI = "multi";
-  public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS = 
"zebra.output.partitioner.class";
+  private static final String ZEBRA_OUTPUT_PARTITIONER_CLASS = 
"zebra.output.partitioner.class";
   
   
   /**
@@ -443,7 +446,7 @@ public class BasicTableOutputFormat impl
    *         defined in the conf object at the time of the call, an empty 
string
    *         will be returned.
    */
-  public static String getStorageHint(JobConf conf) throws ParseException {
+  public static String getStorageHint(JobConf conf) {
     String storehint = conf.get(OUTPUT_STORAGEHINT);
     return storehint == null ? "" : storehint;
   }
@@ -609,13 +612,13 @@ public class BasicTableOutputFormat impl
    * @throws IOException
    */
   private static BasicTable.Writer[] getOutput(JobConf conf) throws 
IOException {
-       Path[] paths = getOutputPaths(conf);
-       BasicTable.Writer[] writers = new BasicTable.Writer[paths.length]; 
-       for(int i = 0; i < paths.length; i++) {
-               writers[i] = new BasicTable.Writer(paths[i], conf);
-       }
-       
-       return writers;
+    Path[] paths = getOutputPaths(conf);
+    BasicTable.Writer[] writers = new BasicTable.Writer[paths.length]; 
+    for(int i = 0; i < paths.length; i++) {
+      writers[i] = new BasicTable.Writer(paths[i], conf);
+    }
+    
+    return writers;
   }
 
   /**
@@ -628,28 +631,22 @@ public class BasicTableOutputFormat impl
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf conf)
       throws IOException {
-
     String schema = conf.get(OUTPUT_SCHEMA);
     if (schema == null) {
       throw new IllegalArgumentException("Cannot find output schema");
     }
+
     String storehint, sortColumns, comparator;
-    try {
-      storehint = getStorageHint(conf);
-      sortColumns = (getSortInfo(conf) == null ? null : 
SortInfo.toSortString(getSortInfo(conf).getSortColumnNames()));
-      comparator = getComparator(conf);
-    }
-    catch (ParseException e) {
-      throw new IOException(e);
-    }
-    
+    storehint = getStorageHint(conf);
+    sortColumns = (getSortInfo(conf) == null ? null : 
SortInfo.toSortString(getSortInfo(conf).getSortColumnNames()));
+    comparator = getComparator(conf);
+
     Path [] paths = getOutputPaths(conf);
-    
-    for(int i = 0; i < paths.length; ++i) {
-       BasicTable.Writer writer =
-               new BasicTable.Writer(paths[i], schema, storehint, sortColumns, 
comparator, conf);
-       writer.finish();
-    }  
+    for (Path path : paths) {
+      BasicTable.Writer writer =
+        new BasicTable.Writer(path, schema, storehint, sortColumns, 
comparator, conf);
+      writer.finish();
+    }
   }
 
   /**
@@ -673,11 +670,9 @@ public class BasicTableOutputFormat impl
    */
   public static void close(JobConf conf) throws IOException {
     BasicTable.Writer tables[] = getOutput(conf);
-    
     for(int i =0; i < tables.length; ++i) {
         tables[i].close();     
     }
-    
   }
 }
 

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java?rev=937086&r1=937085&r2=937086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
 Thu Apr 22 22:17:21 2010
@@ -36,6 +36,7 @@ import org.apache.hadoop.zebra.io.TableI
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Partition;
 import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.types.ZebraConf;
 import org.apache.hadoop.zebra.types.TypesUtils;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.tfile.TFile;
@@ -145,17 +146,6 @@ import org.apache.hadoop.zebra.pig.compa
  * </pre>
  */
 public class BasicTableOutputFormat extends OutputFormat<BytesWritable, Tuple> 
{
-       private static final String OUTPUT_PATH = 
"mapreduce.lib.table.output.dir";
-       public static final String MULTI_OUTPUT_PATH = 
"mapreduce.lib.table.multi.output.dirs";
-       private static final String OUTPUT_SCHEMA = 
"mapreduce.lib.table.output.schema";
-       static final String OUTPUT_CHECKTYPE = 
"mapreduce.lib.table.output.checktype";
-       private static final String OUTPUT_STORAGEHINT = 
"mapreduce.lib.table.output.storagehint";
-       private static final String OUTPUT_SORTCOLUMNS = 
"mapreduce.lib.table.output.sortcolumns";
-       private static final String OUTPUT_COMPARATOR =  
"mapreduce.lib.table.output.comparator";
-       static final String IS_MULTI = "multi";
-       public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS = 
"zebra.output.partitioner.class";
-       public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS = 
"zebra.output.partitioner.class.arguments";
-
        /**
         * Set the multiple output paths of the BasicTable in JobContext
         * 
@@ -173,12 +163,13 @@ public class BasicTableOutputFormat exte
        public static void setMultipleOutputs(JobContext jobContext, String 
commaSeparatedLocations, Class<? extends ZebraOutputPartition> theClass)
        throws IOException {
                Configuration conf = jobContext.getConfiguration();
-               conf.set(MULTI_OUTPUT_PATH, commaSeparatedLocations);
+               ZebraConf.setMultiOutputPath(conf, commaSeparatedLocations);
 
-               if(conf.getBoolean(IS_MULTI, true) == false) {
+               if (ZebraConf.getIsMulti(conf, true) == false) {
                        throw new IllegalArgumentException("Job has been setup 
as single output path");
                }
-               conf.setBoolean(IS_MULTI, true);
+
+               ZebraConf.setIsMulti(conf, true);
                setZebraOutputPartitionClass(jobContext, theClass);       
        }
 
@@ -204,12 +195,13 @@ public class BasicTableOutputFormat exte
                        path = paths[i].makeQualified(fs);
                        str.append(StringUtils.escapeString(path.toString()));
                }         
-               conf.set(MULTI_OUTPUT_PATH, str.toString());
+               ZebraConf.setMultiOutputPath(conf, str.toString());
 
-               if(conf.getBoolean(IS_MULTI, true) == false) {
+               if (ZebraConf.getIsMulti(conf, true) == false) {
                        throw new IllegalArgumentException("Job has been setup 
as single output path");
                }
-               conf.setBoolean(IS_MULTI, true);
+
+               ZebraConf.setIsMulti(conf, true);
                setZebraOutputPartitionClass(jobContext, theClass);
        }
        
@@ -230,12 +222,23 @@ public class BasicTableOutputFormat exte
   throws IOException {
     setMultipleOutputs(jobContext, theClass, paths);
     if (arguments != null) {
-      
jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS, 
arguments);
+      
ZebraConf.setOutputPartitionClassArguments(jobContext.getConfiguration(), 
arguments);
     }
   }
+  
+  /**
+   * Get the output partition class arguments string from job configuration
+   * 
+   * @param conf
+   *          The job configuration object.
+   * @return the output partition class arguments string.
+   */
+  public static String getOutputPartitionClassArguments(Configuration conf) {
+    return ZebraConf.getOutputPartitionClassArguments(conf);
+  }  
 
        /**
-        * Set the multiple output paths of the BasicTable in JobContext
+        * Get the multiple output paths of the BasicTable from JobContext
         * 
         * @param jobContext
         *          The JobContext object.
@@ -243,20 +246,19 @@ public class BasicTableOutputFormat exte
         *          The comma separated output paths to the tables. 
         *          The path must either not existent, or must be an empty 
directory.
         */
-
        public static Path[] getOutputPaths(JobContext jobContext)
        throws IOException {
                Configuration conf = jobContext.getConfiguration();
 
                Path[] result;
-               String paths = conf.get(MULTI_OUTPUT_PATH);
-               String path = conf.get(OUTPUT_PATH);
+               String paths = ZebraConf.getMultiOutputPath(conf);
+               String path = ZebraConf.getOutputPath(conf);
 
                if(paths != null && path != null) {
                        throw new IllegalArgumentException("Illegal output 
paths specs. Both multi and single output locs are set");
                }       
 
-               if(conf.getBoolean(IS_MULTI, false) == true) {    
+               if (ZebraConf.getIsMulti(conf, false) == true) {
                        if (paths == null || paths.equals("")) {
                                throw new IllegalArgumentException("Illegal 
multi output paths");
                        }           
@@ -281,14 +283,14 @@ public class BasicTableOutputFormat exte
                        JobContext jobContext, Class<? extends 
ZebraOutputPartition> theClass) throws IOException {
                if (!ZebraOutputPartition.class.isAssignableFrom(theClass))
                        throw new IOException(theClass+" not 
"+ZebraOutputPartition.class.getName());
-               
jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS, 
theClass.getName());
+               
ZebraConf.setZebraOutputPartitionerClass(jobContext.getConfiguration(), 
theClass.getName());
        }
 
        public static Class<? extends ZebraOutputPartition> 
getZebraOutputPartitionClass(JobContext jobContext) throws IOException {
                Configuration conf = jobContext.getConfiguration();
 
                Class<?> theClass;        
-               String valueString = conf.get(ZEBRA_OUTPUT_PARTITIONER_CLASS);
+               String valueString = 
ZebraConf.getZebraOutputPartitionerClass(conf);
                if (valueString == null)
                        throw new IOException("zebra output partitioner class 
not found");
                try {
@@ -317,11 +319,11 @@ public class BasicTableOutputFormat exte
         */
        public static void setOutputPath(JobContext jobContext, Path path) {
                Configuration conf = jobContext.getConfiguration();
-               conf.set(OUTPUT_PATH, path.toString());
-               if(conf.getBoolean(IS_MULTI, false) == true) {
+               ZebraConf.setOutputPath(conf, path.toString());
+               if (ZebraConf.getIsMulti(conf, false) == true) {
                        throw new IllegalArgumentException("Job has been setup 
as multi output paths");
                }
-               conf.setBoolean(IS_MULTI, false);
+               ZebraConf.setIsMulti(conf, false);
 
        }
 
@@ -334,7 +336,7 @@ public class BasicTableOutputFormat exte
         */
        public static Path getOutputPath(JobContext jobContext) {
                Configuration conf = jobContext.getConfiguration();
-               String path = conf.get(OUTPUT_PATH);
+               String path = ZebraConf.getOutputPath(conf);
                return (path == null) ? null : new Path(path);
        }
 
@@ -352,11 +354,11 @@ public class BasicTableOutputFormat exte
         */
        public static void setSchema(JobContext jobContext, String schema) {
                Configuration conf = jobContext.getConfiguration();
-               conf.set(OUTPUT_SCHEMA, Schema.normalize(schema));
+               ZebraConf.setOutputSchema(conf, Schema.normalize(schema));
                
     // This is to turn off type check for potential corner cases - for 
internal use only;
                if (System.getenv("zebra_output_checktype")!= null && 
System.getenv("zebra_output_checktype").equals("no")) {
-      conf.setBoolean(OUTPUT_CHECKTYPE, false);
+                 ZebraConf.setCheckType(conf, false);
     }
        }
 
@@ -370,7 +372,7 @@ public class BasicTableOutputFormat exte
         */
        public static Schema getSchema(JobContext jobContext) throws 
ParseException {
                Configuration conf = jobContext.getConfiguration();
-               String schema = conf.get(OUTPUT_SCHEMA);
+               String schema = ZebraConf.getOutputSchema(conf);
                if (schema == null) {
                        return null;
                }
@@ -443,7 +445,7 @@ public class BasicTableOutputFormat exte
         */
        public static void setStorageHint(JobContext jobContext, String 
storehint) throws ParseException, IOException {
                Configuration conf = jobContext.getConfiguration();
-               String schema = conf.get(OUTPUT_SCHEMA);
+               String schema = ZebraConf.getOutputSchema(conf);
 
                if (schema == null)
                        throw new ParseException("Schema has not been set");
@@ -451,7 +453,7 @@ public class BasicTableOutputFormat exte
                // for sanity check purpose only
                new Partition(schema, storehint, null);
 
-               conf.set(OUTPUT_STORAGEHINT, storehint);
+               ZebraConf.setOutputStorageHint(conf, storehint);
        }
 
        /**
@@ -463,9 +465,9 @@ public class BasicTableOutputFormat exte
         *         defined in the jobContext object at the time of the call, an 
empty string
         *         will be returned.
         */
-       public static String getStorageHint(JobContext jobContext) throws 
ParseException {
+       public static String getStorageHint(JobContext jobContext) {
                Configuration conf = jobContext.getConfiguration();
-               String storehint = conf.get(OUTPUT_STORAGEHINT);
+               String storehint = ZebraConf.getOutputStorageHint(conf);
                return storehint == null ? "" : storehint;
        }
 
@@ -485,9 +487,9 @@ public class BasicTableOutputFormat exte
         */
        public static void setSortInfo(JobContext jobContext, String 
sortColumns, Class<? extends RawComparator<Object>> comparatorClass) {
                Configuration conf = jobContext.getConfiguration();
-               conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+               ZebraConf.setOutputSortColumns(conf, sortColumns);
                if (comparatorClass != null)
-                       conf.set(OUTPUT_COMPARATOR, 
TFile.COMPARATOR_JCLASS+comparatorClass.getName());
+                 ZebraConf.setOutputComparator(conf, 
TFile.COMPARATOR_JCLASS+comparatorClass.getName());
        }
 
        /**
@@ -502,7 +504,7 @@ public class BasicTableOutputFormat exte
         * @deprecated Use {...@link #setStorageInfo(JobContext, ZebraSchema, 
ZebraStorageHint, ZebraSortInfo)} instead.          
         */
        public static void setSortInfo(JobContext jobContext, String 
sortColumns) {
-               jobContext.getConfiguration().set(OUTPUT_SORTCOLUMNS, 
sortColumns);
+         ZebraConf.setOutputSortColumns(jobContext.getConfiguration(), 
sortColumns);
        }  
 
        /**
@@ -553,8 +555,8 @@ public class BasicTableOutputFormat exte
                }
 
                Configuration conf = jobContext.getConfiguration();
-               conf.set(OUTPUT_SCHEMA, schemaStr);
-               conf.set(OUTPUT_STORAGEHINT, storageHintStr);
+               ZebraConf.setOutputSchema(conf, schemaStr);
+               ZebraConf.setOutputStorageHint(conf, storageHintStr);           
 
                /* validity check on sort info if user specifies it */
                if (zSortInfo != null) {
@@ -578,9 +580,9 @@ public class BasicTableOutputFormat exte
                        }
 
                        if (sortColumnsStr != null)
-                               conf.set(OUTPUT_SORTCOLUMNS, sortColumnsStr);
+                         ZebraConf.setOutputSortColumns(conf, sortColumnsStr);
                        if (comparatorStr != null)
-                               conf.set(OUTPUT_COMPARATOR, comparatorStr);
+                         ZebraConf.setOutputComparator(conf, comparatorStr);
                }
        }
 
@@ -595,7 +597,7 @@ public class BasicTableOutputFormat exte
        public static SortInfo getSortInfo(JobContext jobContext)throws 
IOException
        {
                Configuration conf = jobContext.getConfiguration();
-               String sortColumns = conf.get(OUTPUT_SORTCOLUMNS);
+               String sortColumns = ZebraConf.getOutputSortColumns(conf);
                if (sortColumns == null)
                        return null;
                Schema schema = null;
@@ -620,7 +622,7 @@ public class BasicTableOutputFormat exte
         */
        private static String getComparator(JobContext jobContext)
        {
-               return jobContext.getConfiguration().get(OUTPUT_COMPARATOR);
+         return ZebraConf.getOutputComparator(jobContext.getConfiguration());
        }
 
        /**
@@ -653,27 +655,23 @@ public class BasicTableOutputFormat exte
        public void checkOutputSpecs(JobContext jobContext)
        throws IOException {
                Configuration conf = jobContext.getConfiguration();
-               String schema = conf.get(OUTPUT_SCHEMA);
+               String schema = ZebraConf.getOutputSchema(conf);
                if (schema == null) {
                        throw new IllegalArgumentException("Cannot find output 
schema");
                }
+               
                String storehint, sortColumns, comparator;
-               try {
-                       storehint = getStorageHint(jobContext);
-                       sortColumns = (getSortInfo(jobContext) == null ? null : 
SortInfo.toSortString(getSortInfo(jobContext).getSortColumnNames()));
-                       comparator = getComparator( jobContext );
-               }
-               catch (ParseException e) {
-                       throw new IOException(e);
-               }
-
-               Path [] paths = getOutputPaths(jobContext);
+       storehint = getStorageHint(jobContext);
+         sortColumns = (getSortInfo(jobContext) == null ? null : 
SortInfo.toSortString(getSortInfo(jobContext).getSortColumnNames()));
+               comparator = getComparator( jobContext );
+               
+               Path[] paths = getOutputPaths(jobContext);
 
-               for(int i = 0; i < paths.length; ++i) {
-                       BasicTable.Writer writer =
-                               new BasicTable.Writer(paths[i], schema, 
storehint, sortColumns, comparator, conf);
-                       writer.finish();
-               }       
+         for (Path path : paths) {
+      BasicTable.Writer writer =
+        new BasicTable.Writer(path, schema, storehint, sortColumns, 
comparator, conf);
+      writer.finish();
+         }
        }
 
        /**
@@ -682,7 +680,7 @@ public class BasicTableOutputFormat exte
        @Override
        public RecordWriter<BytesWritable, Tuple> 
getRecordWriter(TaskAttemptContext taContext)
        throws IOException {
-               String path = taContext.getConfiguration().get(OUTPUT_PATH);
+         String path = ZebraConf.getOutputPath(taContext.getConfiguration());
                return new TableRecordWriter(path, taContext);
        }
 
@@ -759,12 +757,12 @@ class TableRecordWriter extends RecordWr
    
        public TableRecordWriter(String path, TaskAttemptContext context) 
throws IOException {  
                Configuration conf = context.getConfiguration();
-               if(conf.getBoolean(BasicTableOutputFormat.IS_MULTI, false) == 
true) {     
+    if(ZebraConf.getIsMulti(conf, false) == true) {
                        op = 
(org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition) 
                        
ReflectionUtils.newInstance(BasicTableOutputFormat.getZebraOutputPartitionClass(context),
 conf);
                }
                
-               boolean checkType = 
conf.getBoolean(BasicTableOutputFormat.OUTPUT_CHECKTYPE, true);
+    boolean checkType = ZebraConf.getCheckType(conf, true);
                Path [] paths = BasicTableOutputFormat.getOutputPaths(context);
                inserter = new TableInserter[paths.length];
                 String inserterName = "part-" + 
context.getTaskAttemptID().getTaskID().getId();

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=937086&r1=937085&r2=937086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
 Thu Apr 22 22:17:21 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.zebra.mapreduce
 import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
 import org.apache.hadoop.zebra.mapreduce.ZebraStorageHint;
 import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.ZebraConf;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
@@ -51,8 +52,6 @@ public class TableStorer extends StoreFu
     private static final String UDFCONTEXT_SORT_INFO = 
"zebra.UDFContext.sortInfo";
     private static final String UDFCONTEXT_OUTPUT_CHECKTYPE = 
"zebra.UDFContext.checkType";
 
-    static final String OUTPUT_CHECKTYPE = 
"mapreduce.lib.table.output.checktype";
-
     private String storageHintString = null;
     private String udfContextSignature = null;
     private RecordWriter<BytesWritable, Tuple> tableRecordWriter = null;
@@ -206,7 +205,7 @@ public class TableStorer extends StoreFu
         
       // Get checktype information from UDFContext and re-store it to job 
config;
       if (properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE) != null && 
properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE).equals("no")) {
-        conf.setBoolean(OUTPUT_CHECKTYPE, false);
+        ZebraConf.setCheckType(conf, false);
       }
     }
 

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java?rev=937086&r1=937085&r2=937086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
 Thu Apr 22 22:17:21 2010
@@ -77,11 +77,12 @@ public class CGSchema {
 
    public static CGSchema load(FileSystem fs, Path parent) throws IOException, 
ParseException {
      if (!exists(fs, parent)) return null;
+     
      CGSchema ret = new CGSchema();
      ret.read(fs, parent);
      return ret;
    }
-
+      
    public CGSchema() {
      this.version = SCHEMA_VERSION;
    }
@@ -196,7 +197,7 @@ public class CGSchema {
 
  public  void read(FileSystem fs, Path parent) throws IOException, 
ParseException {
      FSDataInputStream in = fs.open(makeFilePath(parent));
-         version = new Version(in);
+          version = new Version(in);
      // verify compatibility against SCHEMA_VERSION
      if (!version.compatibleWith(SCHEMA_VERSION)) {
        new IOException("Incompatible versions, expecting: " + SCHEMA_VERSION

Added: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraConf.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraConf.java?rev=937086&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraConf.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/ZebraConf.java
 Thu Apr 22 22:17:21 2010
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Zebra's implementation of Pig's Tuple.
+ * It's derived from Pig's DefaultTuple implementation.
+ * 
+ */
+public class ZebraConf {
+  // input configurations
+  
+  // output configurations
+  private static final String MAPREDUCE_OUTPUT_PATH = 
"mapreduce.lib.table.output.dir";
+  private static final String MAPREDUCE_MULTI_OUTPUT_PATH = 
"mapreduce.lib.table.multi.output.dirs";
+  private static final String MAPREDUCE_OUTPUT_SCHEMA = 
"mapreduce.lib.table.output.schema";
+  private static final String MAPREDUCE_OUTPUT_STORAGEHINT = 
"mapreduce.lib.table.output.storagehint";
+  private static final String MAPREDUCE_OUTPUT_SORTCOLUMNS = 
"mapreduce.lib.table.output.sortcolumns";
+  private static final String MAPREDUCE_OUTPUT_COMPARATOR =  
"mapreduce.lib.table.output.comparator";
+  private static final String IS_MULTI = "multi";
+  private static final String ZEBRA_OUTPUT_PARTITIONER_CLASS = 
"zebra.output.partitioner.class";
+  private static final String ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS = 
"zebra.output.partitioner.class.arguments";
+  private static final String MAPREDUCE_OUTPUT_CHECKTYPE = 
"mapreduce.lib.table.output.checktype";
+  
+  private static final String MAPRED_OUTPUT_PATH = 
"mapred.lib.table.output.dir";
+  private static final String MAPRED_MULTI_OUTPUT_PATH = 
"mapred.lib.table.multi.output.dirs";
+  private static final String MAPRED_OUTPUT_SCHEMA = 
"mapred.lib.table.output.schema";
+  private static final String MAPRED_OUTPUT_STORAGEHINT = 
"mapred.lib.table.output.storagehint";
+  private static final String MAPRED_OUTPUT_SORTCOLUMNS = 
"mapred.lib.table.output.sortcolumns";
+  private static final String MAPRED_OUTPUT_COMPARATOR =  
"mapred.lib.table.output.comparator";
+  
+
+    
+  static public String getOutputPath(Configuration conf) {
+    return conf.get(MAPREDUCE_OUTPUT_PATH) != null? 
conf.get(MAPREDUCE_OUTPUT_PATH): conf.get(MAPRED_OUTPUT_PATH);
+  }
+  
+  static public void setOutputPath(Configuration conf, String value) {
+    conf.set(MAPREDUCE_OUTPUT_PATH, value);
+  }
+  
+  static public String getMultiOutputPath(Configuration conf) {
+    return conf.get(MAPREDUCE_MULTI_OUTPUT_PATH) != null? 
conf.get(MAPREDUCE_MULTI_OUTPUT_PATH) : conf.get(MAPRED_MULTI_OUTPUT_PATH);
+  }
+  
+  static public void setMultiOutputPath(Configuration conf, String value) {
+    conf.set(MAPREDUCE_MULTI_OUTPUT_PATH, value);
+  }
+  
+  static public String getOutputSchema(Configuration conf) {
+    return conf.get(MAPREDUCE_OUTPUT_SCHEMA) != null? 
conf.get(MAPREDUCE_OUTPUT_SCHEMA): conf.get(MAPRED_OUTPUT_SCHEMA); 
+  }
+  
+  static public void setOutputSchema(Configuration conf, String value) {
+    conf.set(MAPREDUCE_OUTPUT_SCHEMA, value);
+  }
+  
+  static public String getOutputStorageHint(Configuration conf) {
+    return conf.get(MAPREDUCE_OUTPUT_STORAGEHINT) != null? 
conf.get(MAPREDUCE_OUTPUT_STORAGEHINT) : 
+      conf.get(MAPRED_OUTPUT_STORAGEHINT) != null? 
conf.get(MAPRED_OUTPUT_STORAGEHINT) : "";   
+  }
+  
+  static public void setOutputStorageHint(Configuration conf, String value) {
+    conf.set(MAPREDUCE_OUTPUT_STORAGEHINT, value);
+  }
+  
+  static public String getOutputSortColumns(Configuration conf) {
+    return conf.get(MAPREDUCE_OUTPUT_SORTCOLUMNS) != null? 
conf.get(MAPREDUCE_OUTPUT_SORTCOLUMNS) : conf.get(MAPRED_OUTPUT_SORTCOLUMNS);
+  }
+  
+  static public void setOutputSortColumns(Configuration conf, String value) {
+    if (value != null) {
+      conf.set(MAPREDUCE_OUTPUT_SORTCOLUMNS, value);
+    }
+  }
+  
+  static public String getOutputComparator(Configuration conf) {
+    return conf.get(MAPREDUCE_OUTPUT_COMPARATOR) != null? 
conf.get(MAPREDUCE_OUTPUT_COMPARATOR): conf.get(MAPRED_OUTPUT_COMPARATOR);
+  }
+  
+  static public void setOutputComparator(Configuration conf, String value) {
+    conf.set(MAPREDUCE_OUTPUT_COMPARATOR, value);
+  }
+  
+  static public Boolean getIsMulti(Configuration conf, boolean defaultValue) {
+    return conf.getBoolean(IS_MULTI, defaultValue);
+  }
+  
+  static public void setIsMulti(Configuration conf, boolean value) {
+    conf.setBoolean(IS_MULTI, value);
+  }
+  
+  static public Boolean getCheckType(Configuration conf, boolean defaultValue) 
{
+    return conf.getBoolean(MAPREDUCE_OUTPUT_CHECKTYPE, defaultValue);
+  }
+  
+  static public void setCheckType(Configuration conf, boolean value) {
+    conf.setBoolean(MAPREDUCE_OUTPUT_CHECKTYPE, value);
+  }
+  
+  static public String getZebraOutputPartitionerClass(Configuration conf) {
+    return conf.get(ZEBRA_OUTPUT_PARTITIONER_CLASS);
+  }
+  
+  static public void setZebraOutputPartitionerClass(Configuration conf, String 
value) {
+    conf.set(ZEBRA_OUTPUT_PARTITIONER_CLASS, value);
+  }
+  
+  static public String getOutputPartitionClassArguments(Configuration conf) {
+    return conf.get(ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS);
+  }
+  
+  static public void setOutputPartitionClassArguments(Configuration conf, 
String value) {
+    conf.set(ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS, value);
+  }  
+}
\ No newline at end of file

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableUnion.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableUnion.java?rev=937086&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableUnion.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestBasicTableUnion.java
 Thu Apr 22 22:17:21 2010
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+import java.net.URI;
+
+import junit.framework.Assert;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.zebra.BaseTestCase;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.ZebraTuple;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This is a sample a complete MR sample code for Table. It doens't contain
+ * 'read' part. But, it should be similar and easier to write. Refer to test
+ * cases in the same directory.
+ * 
+ * Assume the input files contain rows of word and count, separated by a space:
+ * 
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ * 
+ */
+public class TestBasicTableUnion extends BaseTestCase implements Tool {
+
+  static String inputPath;
+  
+  final static String STR_SCHEMA1 = "a:string,b:string";
+  final static String STR_STORAGE1 = "[a];[b]";
+  final static String STR_SCHEMA2 = "a:string,c:string";
+  final static String STR_STORAGE2 = "[a];[c]";
+  
+  static Path path1, path2, path3;
+
+  @BeforeClass
+  public static void setUpOnce() throws Exception {
+    init();
+
+    path1 = getTableFullPath("t1");
+    path2 = getTableFullPath("t2");
+    path3 = getTableFullPath("t3");
+    removeDir(path1);
+    removeDir(path2);
+    removeDir(path3);
+
+    /*
+     * create 1st basic table;
+     */
+    BasicTable.Writer writer = new BasicTable.Writer(path1, STR_SCHEMA1, 
STR_STORAGE1, conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 1;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, b + "_" + i + "" + k);
+          } catch (ExecException e) {
+
+          }
+        }// k
+        inserters[i].insert(new BytesWritable(("key1" + i).getBytes()), tuple);
+      }// i
+    }// b
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+
+    /*
+     * create 2nd basic table;
+     */
+    writer = new BasicTable.Writer(path2, STR_SCHEMA2, STR_STORAGE2, conf);
+    schema = writer.getSchema();
+    tuple = TypesUtils.createTuple(schema);
+
+    inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, b + "_" + i + "" + k);
+          } catch (ExecException e) {
+
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key2" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();  
+  }
+  
+  @AfterClass
+  public static void tearDown() throws Exception {
+    removeDir(path1);
+    removeDir(path2);
+    removeDir(path3);
+  }
+ 
+  static class MapClass extends Mapper<BytesWritable, Tuple, BytesWritable, 
Tuple> {    
+    @Override
+    public void map(BytesWritable key, Tuple value, Context context) throws 
IOException, InterruptedException {      
+      System.out.println("key = " + key);
+      System.out.println("value = " + value);
+      
+      context.write(key, value);
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+    }
+  }
+
+  @Test
+  public void test1() throws ParseException, IOException,
+      org.apache.hadoop.zebra.parser.ParseException, Exception {
+
+    Job job = new Job(conf);
+    job.setJobName("Test1");
+    job.setJarByClass(TestBasicTableUnion.class);
+    
+   
+    // input settings
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setMapperClass(MapClass.class);
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(ZebraTuple.class);
+    job.setOutputFormatClass(BasicTableOutputFormat.class);
+    
+    TableInputFormat.setInputPaths(job, path1, path2);
+    TableInputFormat.setProjection(job, "a,b,c");
+    BasicTableOutputFormat.setOutputPath(job, path3);
+
+    BasicTableOutputFormat.setSchema(job, "a:string, b:string, c:string");
+    BasicTableOutputFormat.setStorageHint(job, "[a,b,c]");
+    
+    job.submit();
+    job.waitForCompletion( true );
+    
+    BasicTableOutputFormat.close(job);    
+  }
+  
+  @Test(expected = IOException.class)
+  public void testNegative1() throws ParseException, IOException, 
InterruptedException, ClassNotFoundException {
+    Job job = new Job(conf);
+    job.setJobName("Test1");
+    job.setJarByClass(TestBasicTableUnion.class);
+   
+    // input settings
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setMapperClass(MapClass.class);
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(ZebraTuple.class);
+    job.setOutputFormatClass(BasicTableOutputFormat.class);
+    
+    TableInputFormat.setInputPaths(job, path1, path2);
+    TableInputFormat.setProjection(job, "a,b,c");
+    BasicTableOutputFormat.setOutputPath(job, path3);
+
+    BasicTableOutputFormat.setSchema(job, "a:string, b:string, c:string");
+    BasicTableOutputFormat.setStorageHint(job, "[a,b,c]");
+    
+    job.submit();
+    job.waitForCompletion( true );
+    
+    BasicTableOutputFormat.close(job);        
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    TestBasicTableUnion test = new TestBasicTableUnion();
+    TestBasicTableUnion.setUpOnce();
+    System.out.println("after setup");
+
+    test.test1();
+
+    return 0;
+  }
+
+  
+  public static void main(String[] args) throws Exception {
+    conf = new Configuration();
+    
+    int res = ToolRunner.run(conf, new TestBasicTableUnion(), args);
+    System.out.println("PASS");
+    System.exit(res);
+  }
+}

Modified: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java?rev=937086&r1=937085&r2=937086&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
 Thu Apr 22 22:17:21 2010
@@ -214,7 +214,7 @@ public class TestMultipleOutputs2 extend
         //
       }
       
-      String argumentsString = 
conf.get(BasicTableOutputFormat.ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS);
+      String argumentsString = 
BasicTableOutputFormat.getOutputPartitionClassArguments(conf);
       
       String[] arguments = argumentsString.split(",");
 


Reply via email to