Author: yanz
Date: Sun Mar 28 12:16:54 2010
New Revision: 928385

URL: http://svn.apache.org/viewvc?rev=928385&view=rev
Log:
PIG-1306 Support of locally sorted input splits (yanz)

Modified:
    hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveSimple.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveUnion.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveVariableTable.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java
    
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionSourceTableProj.java

Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Sun Mar 28 
12:16:54 2010
@@ -16,6 +16,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    PIG-1306 Support of locally sorted input splits (yanz)
+
     PIG-1268 Need an ant target that runs all pig-related tests in Zebra 
(xuefuz via yanz)
 
     PIG-1207 Data sanity check should be performed at the end of writing 
instead of later at query time (yanz)

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
 Sun Mar 28 12:16:54 2010
@@ -29,6 +29,8 @@ import org.apache.hadoop.zebra.io.BasicT
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.pig.data.Tuple;
 
 /**
  * Table expression for reading a BasicTable.
@@ -117,6 +119,12 @@ class BasicTableExpr extends TableExpr {
   } 
 
   @Override
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    return new BasicTableScanner(split, projection, conf);
+  }
+  
+  @Override
   public Schema getSchema(Configuration conf) throws IOException {
     return BasicTable.Reader.getSchema(path, conf);
   }
@@ -131,4 +139,75 @@ class BasicTableExpr extends TableExpr {
   {
     BasicTable.dumpInfo(path.toString(), ps, conf, indent);
   }
+  
+  /**
+   * Basic Table Scanner
+   */
+  class BasicTableScanner implements TableScanner {
+    private int tableIndex = -1;
+    private Integer[] virtualColumnIndices = null;
+    private TableScanner scanner = null;
+    
+    BasicTableScanner(RowTableSplit split, String projection,
+        Configuration conf) throws IOException, ParseException, ParseException 
{
+      tableIndex = split.getTableIndex();
+      virtualColumnIndices = Projection.getVirtualColumnIndices(projection);
+      BasicTable.Reader reader =
+        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), 
conf);
+      reader.setProjection(projection);
+      scanner = reader.getScanner(true, split.getSplit());
+    }
+    
+    @Override
+    public boolean advance() throws IOException {
+      return scanner.advance();
+    }
+    
+    @Override
+    public boolean atEnd() throws IOException {
+      return scanner.atEnd();
+    }
+    
+    @Override
+    public Schema getSchema() {
+      return scanner.getSchema();
+    }
+    
+    @Override
+    public void getKey(BytesWritable key) throws IOException {
+      scanner.getKey(key);
+    }
+    
+    @Override
+    public void getValue(Tuple row) throws IOException {
+      scanner.getValue(row);
+      if (virtualColumnIndices != null)
+      {
+        for (int i = 0; i < virtualColumnIndices.length; i++)
+        {
+          row.set(virtualColumnIndices[i], tableIndex);
+        }
+      }
+    }
+    
+    @Override
+    public boolean seekTo(BytesWritable key) throws IOException {
+      return scanner.seekTo(key);
+    }
+    
+    @Override
+    public void seekToEnd() throws IOException {
+      scanner.seekToEnd();
+    }
+    
+    @Override 
+    public void close() throws IOException {
+      scanner.close();
+    }
+    
+    @Override
+    public String getProjection() {
+      return scanner.getProjection();
+    }
+  }
 }

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
 Sun Mar 28 12:16:54 2010
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.schema.Schema;
 
 /**
@@ -132,10 +131,10 @@ abstract class TableExpr {
   }
   
   /**
-   * Get a scanner with an unsorted split.
+   * Get a scanner with an row split.
    * 
    * @param split
-   *          The range split.
+   *          The row split.
    * @param projection
    *          The projection schema. It should never be null.
    * @param conf
@@ -144,11 +143,8 @@ abstract class TableExpr {
    * @throws IOException
    */
   public TableScanner getScanner(RowTableSplit split, String projection,
-      Configuration conf) throws IOException, ParseException, ParseException {
-    BasicTable.Reader reader =
-        new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), 
conf);
-    reader.setProjection(projection);
-    return reader.getScanner(true, split.getSplit());
+      Configuration conf) throws IOException, ParseException {
+    return null;
   }
   
   /**

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
 Sun Mar 28 12:16:54 2010
@@ -144,6 +144,12 @@ import org.apache.pig.data.Tuple;
  * </UL>
  */
 public class TableInputFormat extends InputFormat<BytesWritable, Tuple> {
+  public enum SplitMode {
+    UNSORTED, /* Data is not sorted. Default split mode*/
+    LOCALLY_SORTED, /* Output by the each mapper is sorted */
+    GLOBALLY_SORTED /* Output is locally sorted and the key ranges are not 
overlapped, not even on boundary */
+  };
+    
   static Log LOG = LogFactory.getLog(TableInputFormat.class);
   
   private static final String INPUT_EXPR = "mapreduce.lib.table.input.expr";
@@ -151,6 +157,10 @@ public class TableInputFormat extends In
   private static final String INPUT_SORT = "mapreduce.lib.table.input.sort";
   static final String INPUT_FE = "mapreduce.lib.table.input.fe";
   static final String INPUT_DELETED_CGS = 
"mapreduce.lib.table.input.deleted_cgs";
+  private static final String INPUT_SPLIT_MODE = 
"mapreduce.lib.table.input.split_mode";
+  private static final String UNSORTED = "unsorted";
+  private static final String GLOBALLY_SORTED = "globally_sorted";
+  private static final String LOCALLY_SORTED = "locally_sorted";
   static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
 
   /**
@@ -316,16 +326,28 @@ public class TableInputFormat extends In
     return null;
   }
       
-  /**
-   * Set requirement for sorted table
-   *
-   *...@param conf
-   *          Configuration object.
-   */
-  private static void setSorted(Configuration conf) {
+  private static boolean globalOrderingRequired(JobContext jobContext)
+  {
+    Configuration conf = jobContext.getConfiguration();
+    String result = conf.get(INPUT_SPLIT_MODE, UNSORTED);
+    return result.equalsIgnoreCase(GLOBALLY_SORTED);
+  }
+
+  private static void setSorted(JobContext jobContext) {
+    Configuration conf = jobContext.getConfiguration();
     conf.setBoolean(INPUT_SORT, true);
   }
   
+  private static void setSorted(JobContext jobContext, SplitMode sm)
+  {
+    setSorted(jobContext);
+    Configuration conf = jobContext.getConfiguration();
+         if (sm == SplitMode.GLOBALLY_SORTED)
+      conf.set(INPUT_SPLIT_MODE, GLOBALLY_SORTED);
+    else if (sm == SplitMode.LOCALLY_SORTED)
+      conf.set(INPUT_SPLIT_MODE, LOCALLY_SORTED);
+  }
+  
   /**
    * Get the SortInfo object regarding a Zebra table
    *
@@ -368,13 +390,33 @@ public class TableInputFormat extends In
   /**
    * Requires sorted table or table union
    * 
+   * @deprecated
+   * 
    * @param jobContext
    *          JobContext object.
    * @param sortInfo
    *          ZebraSortInfo object containing sorting information.
    *        
    */
-  public static void requireSortedTable(JobContext jobContext, ZebraSortInfo 
sortInfo) throws IOException {
+  
+   public static void requireSortedTable(JobContext jobContext, ZebraSortInfo 
sortInfo) throws IOException {
+     setSplitMode(jobContext, SplitMode.GLOBALLY_SORTED, sortInfo);
+   }
+
+  /**
+   * 
+   * @param conf
+   *          JonConf object
+   * @param sm
+   *          Split mode: unsorted, globally sorted, locally sorted. Default 
is unsorted
+   * @param sortInfo
+   *          ZebraSortInfo object containing sorting information. Will be 
ignored if
+   *          the split mode is null or unsorted
+   * @throws IOException
+   */
+  public static void setSplitMode(JobContext jobContext, SplitMode sm, 
ZebraSortInfo sortInfo) throws IOException {
+   if (sm == null || sm == SplitMode.UNSORTED)
+    return;
         TableExpr expr = getInputExpr( jobContext );
      Configuration conf = jobContext.getConfiguration();
         String comparatorName = null;
@@ -428,8 +470,7 @@ public class TableInputFormat extends In
        }
                 }
         }
-   // need key range input splits for sorted table union
-        setSorted(conf);
+        setSorted(jobContext, sm);
   }
   
   /**
@@ -599,6 +640,7 @@ public class TableInputFormat extends In
       SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
       splits.add(split);
     }
+    LOG.info("getSplits : returning " + splits.size() + " sorted splits.");
     return splits;
   }
   
@@ -716,7 +758,8 @@ public class TableInputFormat extends In
               FileStatus[] fileStatuses = fs.listStatus(globStat.getPath(), 
inputFilter);
               // reorder according to CG index
               BasicTable.Reader reader = readers.get(index);
-              reader.rearrangeFileIndices(fileStatuses);
+              if (fileStatuses.length > 1)
+                reader.rearrangeFileIndices(fileStatuses);
               for(FileStatus stat: fileStatuses) {
                 if (stat != null)
                   result.add(stat);
@@ -758,6 +801,7 @@ public class TableInputFormat extends In
     boolean first = true;
     PathFilter filter = null;
     List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
+    int[] realReaderIndices = new int[readers.size()];
 
     for (int i = 0; i < readers.size(); ++i) {
       BasicTable.Reader reader = readers.get(i);
@@ -766,7 +810,8 @@ public class TableInputFormat extends In
       
       /* We can create input splits only if there does exist a valid column 
group for split.
        * Otherwise, we do not create input splits. */
-      if (splitCGIndex >= 0) {        
+      if (splitCGIndex >= 0) {
+        realReaderIndices[realReaders.size()] = i;
         realReaders.add(reader);
          if (first)
          {
@@ -862,11 +907,13 @@ public class TableInputFormat extends In
         if (splitLen > 0)
           batches[++numBatches] = splitLen;
         
-        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, 
splitCGIndex, batches, numBatches);
+        List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, 
splitCGIndex,
+            batches, numBatches);
         
+        int realTableIndex = realReaderIndices[tableIndex];
         for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
           RowSplit subSplit = it.next();
-          RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
+          RowTableSplit split = new RowTableSplit(reader, subSplit, 
realTableIndex, conf);
           ret.add(split);
         }
       }
@@ -884,7 +931,7 @@ public class TableInputFormat extends In
            return getSplits( jobContext, false );
     }
 
-    private List<InputSplit> getSplits(JobContext jobContext, boolean 
singleSplit) throws IOException {
+    List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) 
throws IOException {
        Configuration conf = jobContext.getConfiguration();
        TableExpr expr = getInputExpr( conf );
        if( getSorted(conf) ) {
@@ -938,9 +985,20 @@ public class TableInputFormat extends In
                        return new ArrayList<InputSplit>();
                }
 
-               return sorted ? 
-                               singleSplit ? getSortedSplits( conf, 1, expr, 
readers, status) : getSortedSplits(conf, -1, expr, readers, status) : 
-                                       getRowSplits( conf, expr, readers, 
status);
+        List<InputSplit> result;
+        if (sorted)
+        {
+          if (singleSplit)
+            result = getSortedSplits( conf, 1, expr, readers, status);
+          else if (globalOrderingRequired(jobContext))
+            result = getSortedSplits(conf, -1, expr, readers, status);
+          else
+            result = getRowSplits( conf, expr, readers, status);
+        } else
+          result = getRowSplits( conf, expr, readers, status);
+
+        return result;
+
        } catch (ParseException e) {
                throw new IOException("Projection parsing failed : 
"+e.getMessage());
        } finally {
@@ -1166,15 +1224,17 @@ class RowTableSplit extends InputSplit i
   /**
      * 
      */
-String path = null;
+  String path = null;
+  int tableIndex = 0;
   RowSplit split = null;
   String[] hosts = null;
   long length = 1;
 
-  public RowTableSplit(Reader reader, RowSplit split, Configuration conf)
+  public RowTableSplit(Reader reader, RowSplit split, int tableIndex, 
Configuration conf)
       throws IOException {
     this.path = reader.getPath();
     this.split = split;
+    this.tableIndex = tableIndex;
     BlockDistribution dataDist = reader.getBlockDistribution(split);
     if (dataDist != null) {
       length = dataDist.getLength();
@@ -1199,6 +1259,7 @@ String path = null;
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    tableIndex = WritableUtils.readVInt(in); 
     path = WritableUtils.readString(in);
     int bool = WritableUtils.readVInt(in);
     if (bool == 1) {
@@ -1214,6 +1275,7 @@ String path = null;
 
   @Override
   public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, tableIndex);
     WritableUtils.writeString(out, path);
     if (split == null) {
       WritableUtils.writeVInt(out, 0);
@@ -1233,4 +1295,8 @@ String path = null;
   public RowSplit getSplit() {
     return split;
   }
+  
+  public int getTableIndex() {
+    return tableIndex;
+  }
 }

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
 Sun Mar 28 12:16:54 2010
@@ -53,19 +53,18 @@ public class TableRecordReader extends R
    */
   public TableRecordReader(TableExpr expr, String projection,
       InputSplit split, JobContext jobContext) throws IOException, 
ParseException {
-       Configuration conf = jobContext.getConfiguration();
-    if (expr.sortedSplitRequired()) {
+         Configuration conf = jobContext.getConfiguration();
+         if (split instanceof RowTableSplit) {
+      RowTableSplit rowSplit = (RowTableSplit) split;
+      if ((!expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1) &&
+          Projection.getVirtualColumnIndices(projection) != null)
+        throw new IllegalArgumentException("virtual column requires union of 
multiple sorted tables");
+      scanner = expr.getScanner(rowSplit, projection, conf);
+         } else {
       SortedTableSplit tblSplit = (SortedTableSplit) split;
       scanner =
           expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
               conf);
-    } else if (split != null && split instanceof RowTableSplit) {
-      RowTableSplit rowSplit = (RowTableSplit) split;
-      scanner = expr.getScanner(rowSplit, projection, conf);
-    }
-    else {
-      UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
-      scanner = expr.getScanner(tblSplit, projection, conf);
     }
   }
   

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
 Sun Mar 28 12:16:54 2010
@@ -29,7 +29,6 @@ import java.util.concurrent.PriorityBloc
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.zebra.io.BasicTable;
-import org.apache.hadoop.zebra.io.BasicTableStatus;
 import org.apache.hadoop.zebra.io.TableScanner;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.types.Projection;
@@ -157,6 +156,13 @@ class TableUnionExpr extends CompositeTa
       throw new IllegalArgumentException("virtual column requires union of 
multiple tables");
     return new SortedTableUnionScanner(scanners, 
Projection.getVirtualColumnIndices(projection));
   }
+  
+  @Override
+  public TableScanner getScanner(RowTableSplit split, String projection,
+      Configuration conf) throws IOException, ParseException {
+    BasicTableExpr expr = (BasicTableExpr) 
composite.get(split.getTableIndex());
+    return expr.getScanner(split, projection, conf);
+  }
 }
 
 /**

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
 Sun Mar 28 12:16:54 2010
@@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
 import org.apache.hadoop.zebra.mapreduce.TableRecordReader;
+import org.apache.hadoop.zebra.mapreduce.TableInputFormat.SplitMode;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.schema.ColumnType;
 import org.apache.hadoop.zebra.schema.Schema;
@@ -57,15 +59,17 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.hadoop.zebra.pig.comparator.*;
 import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.CollectableLoadFunc;
 
 /**
  * Pig IndexableLoadFunc and Slicer for Zebra Table
  */
 public class TableLoader extends LoadFunc implements LoadMetadata, 
LoadPushDown,
-        IndexableLoadFunc{
+        IndexableLoadFunc, CollectableLoadFunc {
     static final Log LOG = LogFactory.getLog(TableLoader.class);
 
     private static final String UDFCONTEXT_PROJ_STRING = 
"zebra.UDFContext.projectionString";
+    private static final String UDFCONTEXT_GLOBAL_SORTING = 
"zebra.UDFContext.globalSorting";
 
     private String projectionString;
 
@@ -170,14 +174,19 @@ public class TableLoader extends LoadFun
      * @throws IOException
      */
     private void setProjection(Job job) throws IOException {
+      Properties properties = UDFContext.getUDFContext().getUDFProperties( 
+          this.getClass(), new String[]{ udfContextSignature } );
+      boolean requireGlobalOrder = "true".equals(properties.getProperty( 
UDFCONTEXT_GLOBAL_SORTING));
+      if (requireGlobalOrder && !sorted)
+        throw new IOException("Global sorting can be only asked on table 
loaded as sorted");
         if( sorted ) {
-            TableInputFormat.requireSortedTable( job, null );
+            SplitMode splitMode = 
+              requireGlobalOrder ? SplitMode.GLOBALLY_SORTED : 
SplitMode.LOCALLY_SORTED;
+            TableInputFormat.setSplitMode(job, splitMode, null);
             sortInfo = TableInputFormat.getSortInfo( job );
         }
         
         try {
-            Properties properties = 
UDFContext.getUDFContext().getUDFProperties( 
-                    this.getClass(), new String[]{ udfContextSignature } );
             String prunedProjStr = properties.getProperty( 
UDFCONTEXT_PROJ_STRING );
             
             if( prunedProjStr != null ) {
@@ -414,4 +423,11 @@ public class TableLoader extends LoadFun
         public void setUDFContextSignature(String signature) {
             udfContextSignature = signature;
         }
+        
+    @Override
+    public void ensureAllKeyInstancesInSameSplit() throws IOException {
+      Properties properties = 
UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+          new String[] { udfContextSignature } );
+      properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, "true");
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java
 Sun Mar 28 12:16:54 2010
@@ -20,6 +20,9 @@ package org.apache.hadoop.zebra;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 
 import junit.framework.Assert;
 
@@ -31,6 +34,7 @@ import org.apache.hadoop.conf.Configured
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.Tuple;
 
 public class BaseTestCase extends Configured {
   protected static PigServer pigServer = null;
@@ -126,6 +130,45 @@ public class BaseTestCase extends Config
     }
   }
   
+  /**
+   * Verify union output table with expected results
+   * 
+   */
+  protected int verifyTable(HashMap<Integer, ArrayList<ArrayList<Object>>> 
resultTable,
+      int keyColumn, int tblIdxCol, Iterator<Tuple> it) throws IOException {
+    int numbRows = 0;
+    int index = 0, rowIndex = -1, rowCount = -1, prevIndex = -1;
+    Object value;
+    boolean first = true;
+    ArrayList<ArrayList<Object>> rows = null;
+    
+    while (it.hasNext()) {
+      Tuple rowValues = it.next();
+      
+      if (first) {
+        index = (Integer) rowValues.get(tblIdxCol);
+        Assert.assertNotSame(prevIndex, index);
+        rows = resultTable.get(index);
+        rowIndex = 0;
+        rowCount = rows.size();
+        first = false;
+      }
+      value = rows.get(rowIndex++).get(keyColumn);
+      Assert.assertEquals("Table comparison error for row : " + numbRows + " - 
no key found for : "
+          + rowValues.get(keyColumn), value, rowValues.get(keyColumn));
+      
+      if (rowIndex == rowCount)
+      {
+        // current table is run out; start on a new table for next iteration
+        first = true;
+        prevIndex = index;
+      }
+      
+      ++numbRows;
+    }
+    return numbRows;
+  }
+  
   public static void checkTableExists(boolean expected, String strDir) throws 
IOException {  
     File theDir = null; 
     boolean actual = false;

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java
 Sun Mar 28 12:16:54 2010
@@ -37,7 +37,7 @@ public class ToolTestComparator extends 
 
   final static String TABLE_SCHEMA = 
"count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes,"
       + 
"byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string,
 f2:string),"
-      + "c1:collection(a:string, b:string)";
+      + "c1:collection(record(a:string, b:string))";
   final static String TABLE_STORAGE = 
"[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]";
 
   private static Random generator = new Random();
@@ -399,13 +399,15 @@ public class ToolTestComparator extends 
       tuple.set(12, tupRecord1);
 
       // insert collection1
-      tupColl1.set(0, "c1 a " + seed);
-      tupColl1.set(1, "c1 a " + i);
-      bag1.add(tupColl1); // first collection item
-
-      tupColl2.set(0, "c1 b " + seed);
-      tupColl2.set(1, "c1 b " + i);
-      bag1.add(tupColl2); // second collection item
+      // tupColl1.set(0, "c1 a " + seed);
+      // tupColl1.set(1, "c1 a " + i);
+      // bag1.add(tupColl1); // first collection item
+      bag1.add(tupRecord1); // first collection item
+      bag1.add(tupRecord1); // second collection item
+
+      // tupColl2.set(0, "c1 b " + seed);
+      // tupColl2.set(1, "c1 b " + i);
+      // bag1.add(tupColl2); // second collection item
 
       tuple.set(13, bag1);
 

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java
 Sun Mar 28 12:16:54 2010
@@ -19,45 +19,32 @@
 package org.apache.hadoop.zebra.mapreduce;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.TableInserter;
 import org.apache.hadoop.zebra.io.TableScanner;
-import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
 import org.apache.hadoop.zebra.mapreduce.SortedTableSplit;
 import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.pig.TableStorer;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.types.TypesUtils;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
+import org.apache.hadoop.zebra.BaseTestCase;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
 import org.junit.Assert;
 
 /**
@@ -66,25 +53,15 @@ import org.junit.Assert;
  * Utility for verifying tables created during Zebra Stress Testing
  * 
  */
-public class ToolTestComparator {
+public class ToolTestComparator extends BaseTestCase {
        final static String TABLE_SCHEMA = 
"count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes,"
                + 
"byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string,
 f2:string),"
-               + "c1:collection(a:string, b:string)";
+               + "c1:collection(record(a:string, b:string))";
        final static String TABLE_STORAGE = 
"[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]";
 
        private static Random generator = new Random();
 
-       private static Configuration conf;
-       private static FileSystem fs;
-
-       protected static ExecType execType = ExecType.MAPREDUCE;
-       private static MiniCluster cluster;
-       protected static PigServer pigServer;
        protected static ExecJob pigJob;
-       private static Path path;
-
-       private static String zebraJar;
-       private static String whichCluster;
 
        private static int totalNumbCols;
        private static long totalNumbVerifiedRows;
@@ -93,65 +70,7 @@ public class ToolTestComparator {
         * Setup and initialize environment
         */
        public static void setUp() throws Exception {
-               System.out.println("setUp()");
-               if (System.getProperty("hadoop.log.dir") == null) {
-                       String base = new File(".").getPath(); // 
getAbsolutePath();
-                       System
-                       .setProperty("hadoop.log.dir", new 
Path(base).toString() + "./logs");
-               }
-
-
-               if (System.getProperty("whichCluster") == null) {
-                       System.setProperty("whichCluster", "miniCluster");
-                       whichCluster = System.getProperty("whichCluster");
-               } else {
-                       whichCluster = System.getProperty("whichCluster");
-               }
-
-               System.out.println("cluster: " + whichCluster);
-               if (whichCluster.equalsIgnoreCase("realCluster")
-                               && System.getenv("HADOOP_HOME") == null) {
-                       System.out.println("Please set HADOOP_HOME");
-                       System.exit(0);
-               }
-
-               conf = new Configuration();
-
-               if (whichCluster.equalsIgnoreCase("realCluster")
-                               && System.getenv("USER") == null) {
-                       System.out.println("Please set USER");
-                       System.exit(0);
-               }
-               zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
-               File file = new File(zebraJar);
-               if (!file.exists() && 
whichCluster.equalsIgnoreCase("realCulster")) {
-                       System.out.println("Please put zebra.jar at 
hadoop_home/../jars");
-                       System.exit(0);
-               }
-
-               if (whichCluster.equalsIgnoreCase("realCluster")) {
-                       System.out.println("Running realCluster");
-                       pigServer = new PigServer(ExecType.MAPREDUCE, 
ConfigurationUtil
-                                       .toProperties(conf));
-                       pigServer.registerJar(zebraJar);
-                       path = new Path("/user/" + System.getenv("USER") + 
"/TestComparator");
-                       // removeDir(path);
-                       fs = path.getFileSystem(conf);
-               }
-
-               if (whichCluster.equalsIgnoreCase("miniCluster")) {
-                       System.out.println("Running miniCluster");
-                       if (execType == ExecType.MAPREDUCE) {
-                               cluster = MiniCluster.buildCluster();
-                               pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
-                               fs = cluster.getFileSystem();
-                               path = new Path(fs.getWorkingDirectory() + 
"/TestComparator");
-                               // removeDir(path);
-                               System.out.println("path1 =" + path);
-                       } else {
-                               pigServer = new PigServer(ExecType.LOCAL);
-                       }
-               }
+               init();
        }
 
        /**
@@ -389,7 +308,7 @@ public class ToolTestComparator {
 
                        while (it1.hasNext()) {
                                ++numbRows1; // increment row count
-                               Tuple rowValue = it1.next();
+                               it1.next();
                        }
                        numbRows.add(numbRows1);
                        numbUnionRows += numbRows1;
@@ -499,13 +418,15 @@ public class ToolTestComparator {
                        tuple.set(12, tupRecord1);
 
                        // insert collection1
-                       tupColl1.set(0, "c1 a " + seed);
-                       tupColl1.set(1, "c1 a " + i);
-                       bag1.add(tupColl1); // first collection item
-
-                       tupColl2.set(0, "c1 b " + seed);
-                       tupColl2.set(1, "c1 b " + i);
-                       bag1.add(tupColl2); // second collection item
+                       // tupColl1.set(0, "c1 a " + seed);
+                       // tupColl1.set(1, "c1 a " + i);
+                       // bag1.add(tupColl1); // first collection item
+      bag1.add(tupRecord1); // first collection item
+      bag1.add(tupRecord1); // second collection item
+
+                       // tupColl2.set(0, "c1 b " + seed);
+                       // tupColl2.set(1, "c1 b " + i);
+                       // bag1.add(tupColl2); // second collection item
 
                        tuple.set(13, bag1);
 
@@ -604,7 +525,7 @@ public class ToolTestComparator {
                TableInputFormat.requireSortedTable(job, null);
                TableInputFormat tif = new TableInputFormat();
 
-               SortedTableSplit split = (SortedTableSplit) 
tif.getSplits(job).get(0);
+               SortedTableSplit split = (SortedTableSplit) tif.getSplits(job, 
true).get(0);
 
                TableScanner scanner = reader.getScanner(split.getBegin(), 
split.getEnd(), true);
                BytesWritable key = new BytesWritable();
@@ -731,12 +652,9 @@ public class ToolTestComparator {
                TableInputFormat.setInputPaths(job, new Path(pathTable1));
 
                TableInputFormat.requireSortedTable(job, null);
-               TableInputFormat tif = new TableInputFormat();
-
 
                TableScanner scanner = reader.getScanner(null, null, true);
                BytesWritable key = new BytesWritable();
-               Tuple rowValue = TypesUtils.createTuple(scanner.getSchema());
 
                while (!scanner.atEnd()) {
                        ++numbRows;
@@ -746,41 +664,6 @@ public class ToolTestComparator {
                System.out.println("\nTable Path : " + pathTable1);
                System.out.println("Table Row number : " + numbRows);
        }
-       /**
-        * Compare table rows
-        * 
-        */
-       private static boolean compareRow(Tuple rowValues1, Tuple rowValues2)
-       throws IOException {
-               boolean result = true;
-               Assert.assertEquals(rowValues1.size(), rowValues2.size());
-               for (int i = 0; i < rowValues1.size(); ++i) {
-                       if (!compareObj(rowValues1.get(i), rowValues2.get(i))) {
-                               System.out.println("DEBUG: " + " RowValue.get(" 
+ i
-                                               + ") value compare error : " + 
rowValues1.get(i) + " : "
-                                               + rowValues2.get(i));
-                               result = false;
-                               break;
-                       }
-               }
-               return result;
-       }
-
-       /**
-        * Compare table values
-        * 
-        */
-       private static boolean compareObj(Object object1, Object object2) {
-               if (object1 == null) {
-                       if (object2 == null)
-                               return true;
-                       else
-                               return false;
-               } else if (object1.equals(object2))
-                       return true;
-               else
-                       return false;
-       }
 
        /**
         * Compares two objects that implement the Comparable interface
@@ -837,28 +720,6 @@ public class ToolTestComparator {
        }
 
        /**
-        * Remove directory
-        * 
-        */
-       public static void removeDir(Path outPath) throws IOException {
-               String command = null;
-               if (whichCluster.equalsIgnoreCase("realCluster")) {
-                       command = System.getenv("HADOOP_HOME") + "/bin/hadoop 
fs -rmr "
-                       + outPath.toString();
-               } else {
-                       command = "rm -rf " + outPath.toString();
-               }
-               Runtime runtime = Runtime.getRuntime();
-               Process proc = runtime.exec(command);
-               int exitVal = -1;
-               try {
-                       exitVal = proc.waitFor();
-               } catch (InterruptedException e) {
-                       System.err.println(e);
-               }
-       }
-
-       /**
         * Calculate elapsed time
         * 
         */
@@ -1021,7 +882,6 @@ public class ToolTestComparator {
                                // Verify merge-join table is in sorted order
                                verifyMergeJoin(pathTable1, sortCol, 
sortString, numbCols, rowMod,verifyDataColName);
                        } else if (verifyOption.equals("sorted-union")) {
-                               Object lastVal = null;
 
                                // Verify sorted-union table is in sorted order
                                verifySortedUnion(unionPaths, pathTable1, 
sortCol, sortString,
@@ -1045,7 +905,6 @@ public class ToolTestComparator {
                                // Create sorted table
                                createsortedtable(pathTable1, pathTable2, 
sortString, debug);
                        }else if (verifyOption.equals("printrownumber")) {
-                               Object lastVal = null;
                                //print total number of rows of the table
                                printRowNumber(pathTable1,sortString);
                        }

Modified: 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java?rev=928385&r1=928384&r2=928385&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
 (original)
+++ 
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
 Sun Mar 28 12:16:54 2010
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Iterator;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.fs.Path;
@@ -159,7 +160,7 @@ public class TestOrderPreserveMultiTable
                        paths += ++fileId + ",";
                paths = paths.substring(0, paths.lastIndexOf(","));  // remove 
trailing comma
     paths += "}";
-               
+    
                String queryLoad = "records1 = LOAD '"
                + paths
                +       "' USING org.apache.hadoop.zebra.pig.TableLoader('" + 
columns + "', 'sorted');";
@@ -185,25 +186,30 @@ public class TestOrderPreserveMultiTable
                }
                
                // Test with input tables and provided output columns
-               testOrderPreserveUnion(inputTables, "int1", "int1, str1, 
byte1");
+               testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1, 
source_table");
                
                // Create results table for verification
-               ArrayList<ArrayList<Object>> resultTable = new 
ArrayList<ArrayList<Object>>();
+               HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable = 
+                 new HashMap<Integer, ArrayList<ArrayList<Object>>>();
+               
+               // The ordering from FileInputFormat glob expansion.
+               int[] tblIndexList = {0, 9, 1, 2, 3, 4, 5, 6, 7, 8};
+               
                for (int i=0; i<NUMB_TABLE; ++i) {
+                 ArrayList<ArrayList<Object>> rows = new 
ArrayList<ArrayList<Object>>();
                        for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
                                ArrayList<Object> resultRow = new 
ArrayList<Object>();
-                               
-                               resultRow.add(i);       // int1
+                               resultRow.add(tblIndexList[i]); // int1
                                resultRow.add(new String("string" + j));        
// str1
                                resultRow.add(new DataByteArray("byte" + 
(NUMB_TABLE_ROWS - j)));       // byte1
-                               
-                               resultTable.add(resultRow);
+                               rows.add(resultRow);
                        }
+                       resultTable.put(i, rows);
                }
                
                // Verify union table
                Iterator<Tuple> it = pigServer.openIterator("records1");
-               int numbRows = verifyTable(resultTable, 0, it);
+               int numbRows = verifyTable(resultTable, 0, 3, it);
                
                Assert.assertEquals(totalTableRows, numbRows);
                
@@ -212,100 +218,6 @@ public class TestOrderPreserveMultiTable
        }
        
        /**
-        * Verify union output table with expected results
-        * 
-        */
-       private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int 
keyColumn, Iterator<Tuple> it) throws IOException {
-               int numbRows = 0;
-               int index = 0;
-               Object value = resultTable.get(index).get(keyColumn);  // get 
value of primary key
-               
-               while (it.hasNext()) {
-                       Tuple rowValues = it.next();
-                       
-                       // If last primary sort key does match then search for 
next matching key
-                       if (! compareObj(value, rowValues.get(keyColumn))) {
-                               int subIndex = index + 1;
-                               while (subIndex < resultTable.size()) {
-                                       if ( ! compareObj(value, 
resultTable.get(subIndex).get(keyColumn)) ) {  // found new key
-                                               index = subIndex;
-                                               value = 
resultTable.get(index).get(keyColumn);
-                                               break;
-                                       }
-                                       ++subIndex;
-                               }
-                               Assert.assertEquals("Table comparison error for 
row : " + numbRows + " - no key found for : "
-                                       + rowValues.get(keyColumn), value, 
rowValues.get(keyColumn));
-                       }
-                       // Search for matching row with this primary key
-                       int subIndex = index;
-                       
-                       while (subIndex < resultTable.size()) {
-                               // Compare row
-                               ArrayList<Object> resultRow = 
resultTable.get(subIndex);
-                               if ( compareRow(rowValues, resultRow) )
-                                       break; // found matching row
-                               ++subIndex;
-                               Assert.assertEquals("Table comparison error for 
row : " + numbRows + " - no matching row found for : "
-                                       + rowValues.get(keyColumn), value, 
resultTable.get(subIndex).get(keyColumn));
-                       }
-                       ++numbRows;
-               }
-               Assert.assertEquals(resultTable.size(), numbRows);  // verify 
expected row count
-               return numbRows;
-       }
-       
-       /**
-        * Compare table rows
-        * 
-        */
-       private boolean compareRow(Tuple rowValues, ArrayList<Object> 
resultRow) throws IOException {
-               boolean result = true;
-               Assert.assertEquals(resultRow.size(), rowValues.size());
-               for (int i = 0; i < rowValues.size(); ++i) {
-                       if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
-                               result = false;
-                               break;
-                       }
-               }
-               return result;
-       }
-       
-       /**
-        * Compare table values
-        * 
-        */
-       private boolean compareObj(Object object1, Object object2) {
-               if (object1 == null) {
-                       if (object2 == null)
-                               return true;
-                       else
-                               return false;
-               } else if (object1.equals(object2))
-                       return true;
-               else
-                       return false;
-       }
-       
-       /**
-        * Print Pig Table (for debugging)
-        * 
-        */
-       private int printTable(String tablename) throws IOException {
-               Iterator<Tuple> it1 = pigServer.openIterator(tablename);
-               int numbRows = 0;
-               while (it1.hasNext()) {
-                       Tuple RowValue1 = it1.next();
-                       ++numbRows;
-                       System.out.println();
-                       for (int i = 0; i < RowValue1.size(); ++i)
-                               System.out.println("DEBUG: " + tablename + " 
RowValue.get(" + i + ") = " + RowValue1.get(i));
-               }
-               System.out.println("\nRow count : " + numbRows);
-               return numbRows;
-       }
-       
-       /**
         * Return the name of the routine that called getCurrentMethodName
         * 
         */


Reply via email to