Author: yanz Date: Sun Mar 28 12:15:05 2010 New Revision: 928384 URL: http://svn.apache.org/viewvc?rev=928384&view=rev Log: PIG-1306 Support of locally sorted input splits (yanz)
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjection.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveSimple.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveUnion.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveVariableTable.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnion.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionMergeJoin.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestSortedTableUnionSourceTableProj.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Sun Mar 28 12:15:05 2010 @@ -16,6 +16,8 @@ Trunk (unreleased changes) IMPROVEMENTS + PIG-1306 Support of locally sorted input splits (yanz) + PIG-1268 Need an ant target that runs all pig-related tests in Zebra (xuefuz via yanz) PIG-1207 Data sanity check should be performed at the end of writing instead of later at query time (yanz) Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java Sun Mar 28 12:15:05 2010 @@ -29,6 +29,8 @@ import org.apache.hadoop.zebra.io.BasicT import org.apache.hadoop.zebra.io.TableScanner; import org.apache.hadoop.zebra.parser.ParseException; import org.apache.hadoop.zebra.schema.Schema; +import org.apache.hadoop.zebra.types.Projection; +import org.apache.pig.data.Tuple; /** * Table expression for reading a BasicTable. @@ -117,6 +119,12 @@ class BasicTableExpr extends TableExpr { } @Override + public TableScanner getScanner(RowTableSplit split, String projection, + Configuration conf) throws IOException, ParseException { + return new BasicTableScanner(split, projection, conf); + } + + @Override public Schema getSchema(Configuration conf) throws IOException { return BasicTable.Reader.getSchema(path, conf); } @@ -131,4 +139,75 @@ class BasicTableExpr extends TableExpr { { BasicTable.dumpInfo(path.toString(), ps, conf, indent); } + + /** + * Basic Table Scanner + */ + class BasicTableScanner implements TableScanner { + private int tableIndex = -1; + private Integer[] virtualColumnIndices = null; + private TableScanner scanner = null; + + BasicTableScanner(RowTableSplit split, String projection, + Configuration conf) throws IOException, ParseException, ParseException { + tableIndex = split.getTableIndex(); + virtualColumnIndices = Projection.getVirtualColumnIndices(projection); + BasicTable.Reader reader = + new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf); + reader.setProjection(projection); + scanner = reader.getScanner(true, split.getSplit()); + } + + @Override + public boolean advance() throws IOException { + return scanner.advance(); + } + + @Override + public boolean atEnd() throws IOException { + return scanner.atEnd(); + } + + @Override + public Schema getSchema() { + return scanner.getSchema(); + } + + @Override + public void getKey(BytesWritable key) throws IOException { + scanner.getKey(key); + } + + @Override + public void getValue(Tuple row) throws IOException { + scanner.getValue(row); + if (virtualColumnIndices != null) + { + for (int i = 0; i < virtualColumnIndices.length; i++) + { + row.set(virtualColumnIndices[i], tableIndex); + } + } + } + + @Override + public boolean seekTo(BytesWritable key) throws IOException { + return scanner.seekTo(key); + } + + @Override + public void seekToEnd() throws IOException { + scanner.seekToEnd(); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + + @Override + public String getProjection() { + return scanner.getProjection(); + } + } } Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java Sun Mar 28 12:15:05 2010 @@ -27,7 +27,6 @@ import org.apache.hadoop.io.BytesWritabl import org.apache.hadoop.zebra.io.BasicTable; import org.apache.hadoop.zebra.io.TableScanner; import org.apache.hadoop.zebra.parser.ParseException; -import org.apache.hadoop.zebra.types.Projection; import org.apache.hadoop.zebra.schema.Schema; /** @@ -132,10 +131,10 @@ abstract class TableExpr { } /** - * Get a scanner with an unsorted split. + * Get a scanner with an row split. * * @param split - * The range split. + * The row split. * @param projection * The projection schema. It should never be null. * @param conf @@ -144,11 +143,8 @@ abstract class TableExpr { * @throws IOException */ public TableScanner getScanner(RowTableSplit split, String projection, - Configuration conf) throws IOException, ParseException, ParseException { - BasicTable.Reader reader = - new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf); - reader.setProjection(projection); - return reader.getScanner(true, split.getSplit()); + Configuration conf) throws IOException, ParseException { + return null; } /** Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Sun Mar 28 12:15:05 2010 @@ -144,6 +144,12 @@ import org.apache.pig.data.Tuple; * </UL> */ public class TableInputFormat extends InputFormat<BytesWritable, Tuple> { + public enum SplitMode { + UNSORTED, /* Data is not sorted. Default split mode*/ + LOCALLY_SORTED, /* Output by the each mapper is sorted */ + GLOBALLY_SORTED /* Output is locally sorted and the key ranges are not overlapped, not even on boundary */ + }; + static Log LOG = LogFactory.getLog(TableInputFormat.class); private static final String INPUT_EXPR = "mapreduce.lib.table.input.expr"; @@ -151,6 +157,10 @@ public class TableInputFormat extends In private static final String INPUT_SORT = "mapreduce.lib.table.input.sort"; static final String INPUT_FE = "mapreduce.lib.table.input.fe"; static final String INPUT_DELETED_CGS = "mapreduce.lib.table.input.deleted_cgs"; + private static final String INPUT_SPLIT_MODE = "mapreduce.lib.table.input.split_mode"; + private static final String UNSORTED = "unsorted"; + private static final String GLOBALLY_SORTED = "globally_sorted"; + private static final String LOCALLY_SORTED = "locally_sorted"; static final String DELETED_CG_SEPARATOR_PER_UNION = ";"; /** @@ -316,16 +326,28 @@ public class TableInputFormat extends In return null; } - /** - * Set requirement for sorted table - * - *...@param conf - * Configuration object. - */ - private static void setSorted(Configuration conf) { + private static boolean globalOrderingRequired(JobContext jobContext) + { + Configuration conf = jobContext.getConfiguration(); + String result = conf.get(INPUT_SPLIT_MODE, UNSORTED); + return result.equalsIgnoreCase(GLOBALLY_SORTED); + } + + private static void setSorted(JobContext jobContext) { + Configuration conf = jobContext.getConfiguration(); conf.setBoolean(INPUT_SORT, true); } + private static void setSorted(JobContext jobContext, SplitMode sm) + { + setSorted(jobContext); + Configuration conf = jobContext.getConfiguration(); + if (sm == SplitMode.GLOBALLY_SORTED) + conf.set(INPUT_SPLIT_MODE, GLOBALLY_SORTED); + else if (sm == SplitMode.LOCALLY_SORTED) + conf.set(INPUT_SPLIT_MODE, LOCALLY_SORTED); + } + /** * Get the SortInfo object regarding a Zebra table * @@ -368,13 +390,33 @@ public class TableInputFormat extends In /** * Requires sorted table or table union * + * @deprecated + * * @param jobContext * JobContext object. * @param sortInfo * ZebraSortInfo object containing sorting information. * */ - public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException { + + public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException { + setSplitMode(jobContext, SplitMode.GLOBALLY_SORTED, sortInfo); + } + + /** + * + * @param conf + * JonConf object + * @param sm + * Split mode: unsorted, globally sorted, locally sorted. Default is unsorted + * @param sortInfo + * ZebraSortInfo object containing sorting information. Will be ignored if + * the split mode is null or unsorted + * @throws IOException + */ + public static void setSplitMode(JobContext jobContext, SplitMode sm, ZebraSortInfo sortInfo) throws IOException { + if (sm == null || sm == SplitMode.UNSORTED) + return; TableExpr expr = getInputExpr( jobContext ); Configuration conf = jobContext.getConfiguration(); String comparatorName = null; @@ -428,8 +470,7 @@ public class TableInputFormat extends In } } } - // need key range input splits for sorted table union - setSorted(conf); + setSorted(jobContext, sm); } /** @@ -599,6 +640,7 @@ public class TableInputFormat extends In SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf); splits.add(split); } + LOG.info("getSplits : returning " + splits.size() + " sorted splits."); return splits; } @@ -716,7 +758,8 @@ public class TableInputFormat extends In FileStatus[] fileStatuses = fs.listStatus(globStat.getPath(), inputFilter); // reorder according to CG index BasicTable.Reader reader = readers.get(index); - reader.rearrangeFileIndices(fileStatuses); + if (fileStatuses.length > 1) + reader.rearrangeFileIndices(fileStatuses); for(FileStatus stat: fileStatuses) { if (stat != null) result.add(stat); @@ -758,6 +801,7 @@ public class TableInputFormat extends In boolean first = true; PathFilter filter = null; List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>(); + int[] realReaderIndices = new int[readers.size()]; for (int i = 0; i < readers.size(); ++i) { BasicTable.Reader reader = readers.get(i); @@ -766,7 +810,8 @@ public class TableInputFormat extends In /* We can create input splits only if there does exist a valid column group for split. * Otherwise, we do not create input splits. */ - if (splitCGIndex >= 0) { + if (splitCGIndex >= 0) { + realReaderIndices[realReaders.size()] = i; realReaders.add(reader); if (first) { @@ -862,11 +907,13 @@ public class TableInputFormat extends In if (splitLen > 0) batches[++numBatches] = splitLen; - List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches); + List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, + batches, numBatches); + int realTableIndex = realReaderIndices[tableIndex]; for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) { RowSplit subSplit = it.next(); - RowTableSplit split = new RowTableSplit(reader, subSplit, conf); + RowTableSplit split = new RowTableSplit(reader, subSplit, realTableIndex, conf); ret.add(split); } } @@ -884,7 +931,7 @@ public class TableInputFormat extends In return getSplits( jobContext, false ); } - private List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) throws IOException { + List<InputSplit> getSplits(JobContext jobContext, boolean singleSplit) throws IOException { Configuration conf = jobContext.getConfiguration(); TableExpr expr = getInputExpr( conf ); if( getSorted(conf) ) { @@ -938,9 +985,20 @@ public class TableInputFormat extends In return new ArrayList<InputSplit>(); } - return sorted ? - singleSplit ? getSortedSplits( conf, 1, expr, readers, status) : getSortedSplits(conf, -1, expr, readers, status) : - getRowSplits( conf, expr, readers, status); + List<InputSplit> result; + if (sorted) + { + if (singleSplit) + result = getSortedSplits( conf, 1, expr, readers, status); + else if (globalOrderingRequired(jobContext)) + result = getSortedSplits(conf, -1, expr, readers, status); + else + result = getRowSplits( conf, expr, readers, status); + } else + result = getRowSplits( conf, expr, readers, status); + + return result; + } catch (ParseException e) { throw new IOException("Projection parsing failed : "+e.getMessage()); } finally { @@ -1166,15 +1224,17 @@ class RowTableSplit extends InputSplit i /** * */ -String path = null; + String path = null; + int tableIndex = 0; RowSplit split = null; String[] hosts = null; long length = 1; - public RowTableSplit(Reader reader, RowSplit split, Configuration conf) + public RowTableSplit(Reader reader, RowSplit split, int tableIndex, Configuration conf) throws IOException { this.path = reader.getPath(); this.split = split; + this.tableIndex = tableIndex; BlockDistribution dataDist = reader.getBlockDistribution(split); if (dataDist != null) { length = dataDist.getLength(); @@ -1199,6 +1259,7 @@ String path = null; @Override public void readFields(DataInput in) throws IOException { + tableIndex = WritableUtils.readVInt(in); path = WritableUtils.readString(in); int bool = WritableUtils.readVInt(in); if (bool == 1) { @@ -1214,6 +1275,7 @@ String path = null; @Override public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, tableIndex); WritableUtils.writeString(out, path); if (split == null) { WritableUtils.writeVInt(out, 0); @@ -1233,4 +1295,8 @@ String path = null; public RowSplit getSplit() { return split; } + + public int getTableIndex() { + return tableIndex; + } } Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java Sun Mar 28 12:15:05 2010 @@ -53,19 +53,18 @@ public class TableRecordReader extends R */ public TableRecordReader(TableExpr expr, String projection, InputSplit split, JobContext jobContext) throws IOException, ParseException { - Configuration conf = jobContext.getConfiguration(); - if (expr.sortedSplitRequired()) { + Configuration conf = jobContext.getConfiguration(); + if (split instanceof RowTableSplit) { + RowTableSplit rowSplit = (RowTableSplit) split; + if ((!expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1) && + Projection.getVirtualColumnIndices(projection) != null) + throw new IllegalArgumentException("virtual column requires union of multiple sorted tables"); + scanner = expr.getScanner(rowSplit, projection, conf); + } else { SortedTableSplit tblSplit = (SortedTableSplit) split; scanner = expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection, conf); - } else if (split != null && split instanceof RowTableSplit) { - RowTableSplit rowSplit = (RowTableSplit) split; - scanner = expr.getScanner(rowSplit, projection, conf); - } - else { - UnsortedTableSplit tblSplit = (UnsortedTableSplit) split; - scanner = expr.getScanner(tblSplit, projection, conf); } } Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java Sun Mar 28 12:15:05 2010 @@ -29,7 +29,6 @@ import java.util.concurrent.PriorityBloc import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.zebra.io.BasicTable; -import org.apache.hadoop.zebra.io.BasicTableStatus; import org.apache.hadoop.zebra.io.TableScanner; import org.apache.hadoop.zebra.parser.ParseException; import org.apache.hadoop.zebra.types.Projection; @@ -157,6 +156,13 @@ class TableUnionExpr extends CompositeTa throw new IllegalArgumentException("virtual column requires union of multiple tables"); return new SortedTableUnionScanner(scanners, Projection.getVirtualColumnIndices(projection)); } + + @Override + public TableScanner getScanner(RowTableSplit split, String projection, + Configuration conf) throws IOException, ParseException { + BasicTableExpr expr = (BasicTableExpr) composite.get(split.getTableIndex()); + return expr.getScanner(split, projection, conf); + } } /** Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Sun Mar 28 12:15:05 2010 @@ -38,6 +38,8 @@ import org.apache.hadoop.mapreduce.lib.i import org.apache.hadoop.zebra.io.BasicTable; import org.apache.hadoop.zebra.mapreduce.TableInputFormat; import org.apache.hadoop.zebra.mapreduce.TableRecordReader; +import org.apache.hadoop.zebra.mapreduce.TableInputFormat.SplitMode; +import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo; import org.apache.hadoop.zebra.parser.ParseException; import org.apache.hadoop.zebra.schema.ColumnType; import org.apache.hadoop.zebra.schema.Schema; @@ -57,15 +59,17 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.impl.util.UDFContext; import org.apache.hadoop.zebra.pig.comparator.*; import org.apache.pig.IndexableLoadFunc; +import org.apache.pig.CollectableLoadFunc; /** * Pig IndexableLoadFunc and Slicer for Zebra Table */ public class TableLoader extends LoadFunc implements LoadMetadata, LoadPushDown, - IndexableLoadFunc{ + IndexableLoadFunc, CollectableLoadFunc { static final Log LOG = LogFactory.getLog(TableLoader.class); private static final String UDFCONTEXT_PROJ_STRING = "zebra.UDFContext.projectionString"; + private static final String UDFCONTEXT_GLOBAL_SORTING = "zebra.UDFContext.globalSorting"; private String projectionString; @@ -170,14 +174,19 @@ public class TableLoader extends LoadFun * @throws IOException */ private void setProjection(Job job) throws IOException { + Properties properties = UDFContext.getUDFContext().getUDFProperties( + this.getClass(), new String[]{ udfContextSignature } ); + boolean requireGlobalOrder = "true".equals(properties.getProperty( UDFCONTEXT_GLOBAL_SORTING)); + if (requireGlobalOrder && !sorted) + throw new IOException("Global sorting can be only asked on table loaded as sorted"); if( sorted ) { - TableInputFormat.requireSortedTable( job, null ); + SplitMode splitMode = + requireGlobalOrder ? SplitMode.GLOBALLY_SORTED : SplitMode.LOCALLY_SORTED; + TableInputFormat.setSplitMode(job, splitMode, null); sortInfo = TableInputFormat.getSortInfo( job ); } try { - Properties properties = UDFContext.getUDFContext().getUDFProperties( - this.getClass(), new String[]{ udfContextSignature } ); String prunedProjStr = properties.getProperty( UDFCONTEXT_PROJ_STRING ); if( prunedProjStr != null ) { @@ -414,4 +423,11 @@ public class TableLoader extends LoadFun public void setUDFContextSignature(String signature) { udfContextSignature = signature; } + + @Override + public void ensureAllKeyInstancesInSameSplit() throws IOException { + Properties properties = UDFContext.getUDFContext().getUDFProperties(this.getClass(), + new String[] { udfContextSignature } ); + properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, "true"); + } } Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/BaseTestCase.java Sun Mar 28 12:15:05 2010 @@ -20,6 +20,9 @@ package org.apache.hadoop.zebra; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import junit.framework.Assert; @@ -31,6 +34,7 @@ import org.apache.hadoop.conf.Configured import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.data.Tuple; public class BaseTestCase extends Configured { protected static PigServer pigServer = null; @@ -126,6 +130,45 @@ public class BaseTestCase extends Config } } + /** + * Verify union output table with expected results + * + */ + protected int verifyTable(HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable, + int keyColumn, int tblIdxCol, Iterator<Tuple> it) throws IOException { + int numbRows = 0; + int index = 0, rowIndex = -1, rowCount = -1, prevIndex = -1; + Object value; + boolean first = true; + ArrayList<ArrayList<Object>> rows = null; + + while (it.hasNext()) { + Tuple rowValues = it.next(); + + if (first) { + index = (Integer) rowValues.get(tblIdxCol); + Assert.assertNotSame(prevIndex, index); + rows = resultTable.get(index); + rowIndex = 0; + rowCount = rows.size(); + first = false; + } + value = rows.get(rowIndex++).get(keyColumn); + Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found for : " + + rowValues.get(keyColumn), value, rowValues.get(keyColumn)); + + if (rowIndex == rowCount) + { + // current table is run out; start on a new table for next iteration + first = true; + prevIndex = index; + } + + ++numbRows; + } + return numbRows; + } + public static void checkTableExists(boolean expected, String strDir) throws IOException { File theDir = null; boolean actual = false; Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/ToolTestComparator.java Sun Mar 28 12:15:05 2010 @@ -37,7 +37,7 @@ public class ToolTestComparator extends final static String TABLE_SCHEMA = "count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes," + "byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string, f2:string)," - + "c1:collection(a:string, b:string)"; + + "c1:collection(record(a:string, b:string))"; final static String TABLE_STORAGE = "[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]"; private static Random generator = new Random(); @@ -399,13 +399,15 @@ public class ToolTestComparator extends tuple.set(12, tupRecord1); // insert collection1 - tupColl1.set(0, "c1 a " + seed); - tupColl1.set(1, "c1 a " + i); - bag1.add(tupColl1); // first collection item - - tupColl2.set(0, "c1 b " + seed); - tupColl2.set(1, "c1 b " + i); - bag1.add(tupColl2); // second collection item + // tupColl1.set(0, "c1 a " + seed); + // tupColl1.set(1, "c1 a " + i); + // bag1.add(tupColl1); // first collection item + bag1.add(tupRecord1); // first collection item + bag1.add(tupRecord1); // second collection item + + // tupColl2.set(0, "c1 b " + seed); + // tupColl2.set(1, "c1 b " + i); + // bag1.add(tupColl2); // second collection item tuple.set(13, bag1); Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/ToolTestComparator.java Sun Mar 28 12:15:05 2010 @@ -19,45 +19,32 @@ package org.apache.hadoop.zebra.mapreduce; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.IOException; import java.io.PrintStream; -import java.io.Serializable; import java.util.HashMap; import java.util.Iterator; import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Random; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.zebra.io.BasicTable; import org.apache.hadoop.zebra.io.TableInserter; import org.apache.hadoop.zebra.io.TableScanner; -import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit; import org.apache.hadoop.zebra.mapreduce.SortedTableSplit; import org.apache.hadoop.zebra.mapreduce.TableInputFormat; import org.apache.hadoop.zebra.parser.ParseException; import org.apache.hadoop.zebra.pig.TableStorer; import org.apache.hadoop.zebra.schema.Schema; import org.apache.hadoop.zebra.types.TypesUtils; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; +import org.apache.hadoop.zebra.BaseTestCase; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecJob; -import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; -import org.apache.pig.test.MiniCluster; import org.junit.Assert; /** @@ -66,25 +53,15 @@ import org.junit.Assert; * Utility for verifying tables created during Zebra Stress Testing * */ -public class ToolTestComparator { +public class ToolTestComparator extends BaseTestCase { final static String TABLE_SCHEMA = "count:int,seed:int,int1:int,int2:int,str1:string,str2:string,byte1:bytes," + "byte2:bytes,float1:float,long1:long,double1:double,m1:map(string),r1:record(f1:string, f2:string)," - + "c1:collection(a:string, b:string)"; + + "c1:collection(record(a:string, b:string))"; final static String TABLE_STORAGE = "[count,seed,int1,int2,str1,str2,byte1,byte2,float1,long1,double1];[m1#{a}];[r1,c1]"; private static Random generator = new Random(); - private static Configuration conf; - private static FileSystem fs; - - protected static ExecType execType = ExecType.MAPREDUCE; - private static MiniCluster cluster; - protected static PigServer pigServer; protected static ExecJob pigJob; - private static Path path; - - private static String zebraJar; - private static String whichCluster; private static int totalNumbCols; private static long totalNumbVerifiedRows; @@ -93,65 +70,7 @@ public class ToolTestComparator { * Setup and initialize environment */ public static void setUp() throws Exception { - System.out.println("setUp()"); - if (System.getProperty("hadoop.log.dir") == null) { - String base = new File(".").getPath(); // getAbsolutePath(); - System - .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs"); - } - - - if (System.getProperty("whichCluster") == null) { - System.setProperty("whichCluster", "miniCluster"); - whichCluster = System.getProperty("whichCluster"); - } else { - whichCluster = System.getProperty("whichCluster"); - } - - System.out.println("cluster: " + whichCluster); - if (whichCluster.equalsIgnoreCase("realCluster") - && System.getenv("HADOOP_HOME") == null) { - System.out.println("Please set HADOOP_HOME"); - System.exit(0); - } - - conf = new Configuration(); - - if (whichCluster.equalsIgnoreCase("realCluster") - && System.getenv("USER") == null) { - System.out.println("Please set USER"); - System.exit(0); - } - zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar"; - File file = new File(zebraJar); - if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) { - System.out.println("Please put zebra.jar at hadoop_home/../jars"); - System.exit(0); - } - - if (whichCluster.equalsIgnoreCase("realCluster")) { - System.out.println("Running realCluster"); - pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil - .toProperties(conf)); - pigServer.registerJar(zebraJar); - path = new Path("/user/" + System.getenv("USER") + "/TestComparator"); - // removeDir(path); - fs = path.getFileSystem(conf); - } - - if (whichCluster.equalsIgnoreCase("miniCluster")) { - System.out.println("Running miniCluster"); - if (execType == ExecType.MAPREDUCE) { - cluster = MiniCluster.buildCluster(); - pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); - fs = cluster.getFileSystem(); - path = new Path(fs.getWorkingDirectory() + "/TestComparator"); - // removeDir(path); - System.out.println("path1 =" + path); - } else { - pigServer = new PigServer(ExecType.LOCAL); - } - } + init(); } /** @@ -389,7 +308,7 @@ public class ToolTestComparator { while (it1.hasNext()) { ++numbRows1; // increment row count - Tuple rowValue = it1.next(); + it1.next(); } numbRows.add(numbRows1); numbUnionRows += numbRows1; @@ -499,13 +418,15 @@ public class ToolTestComparator { tuple.set(12, tupRecord1); // insert collection1 - tupColl1.set(0, "c1 a " + seed); - tupColl1.set(1, "c1 a " + i); - bag1.add(tupColl1); // first collection item - - tupColl2.set(0, "c1 b " + seed); - tupColl2.set(1, "c1 b " + i); - bag1.add(tupColl2); // second collection item + // tupColl1.set(0, "c1 a " + seed); + // tupColl1.set(1, "c1 a " + i); + // bag1.add(tupColl1); // first collection item + bag1.add(tupRecord1); // first collection item + bag1.add(tupRecord1); // second collection item + + // tupColl2.set(0, "c1 b " + seed); + // tupColl2.set(1, "c1 b " + i); + // bag1.add(tupColl2); // second collection item tuple.set(13, bag1); @@ -604,7 +525,7 @@ public class ToolTestComparator { TableInputFormat.requireSortedTable(job, null); TableInputFormat tif = new TableInputFormat(); - SortedTableSplit split = (SortedTableSplit) tif.getSplits(job).get(0); + SortedTableSplit split = (SortedTableSplit) tif.getSplits(job, true).get(0); TableScanner scanner = reader.getScanner(split.getBegin(), split.getEnd(), true); BytesWritable key = new BytesWritable(); @@ -731,12 +652,9 @@ public class ToolTestComparator { TableInputFormat.setInputPaths(job, new Path(pathTable1)); TableInputFormat.requireSortedTable(job, null); - TableInputFormat tif = new TableInputFormat(); - TableScanner scanner = reader.getScanner(null, null, true); BytesWritable key = new BytesWritable(); - Tuple rowValue = TypesUtils.createTuple(scanner.getSchema()); while (!scanner.atEnd()) { ++numbRows; @@ -746,41 +664,6 @@ public class ToolTestComparator { System.out.println("\nTable Path : " + pathTable1); System.out.println("Table Row number : " + numbRows); } - /** - * Compare table rows - * - */ - private static boolean compareRow(Tuple rowValues1, Tuple rowValues2) - throws IOException { - boolean result = true; - Assert.assertEquals(rowValues1.size(), rowValues2.size()); - for (int i = 0; i < rowValues1.size(); ++i) { - if (!compareObj(rowValues1.get(i), rowValues2.get(i))) { - System.out.println("DEBUG: " + " RowValue.get(" + i - + ") value compare error : " + rowValues1.get(i) + " : " - + rowValues2.get(i)); - result = false; - break; - } - } - return result; - } - - /** - * Compare table values - * - */ - private static boolean compareObj(Object object1, Object object2) { - if (object1 == null) { - if (object2 == null) - return true; - else - return false; - } else if (object1.equals(object2)) - return true; - else - return false; - } /** * Compares two objects that implement the Comparable interface @@ -837,28 +720,6 @@ public class ToolTestComparator { } /** - * Remove directory - * - */ - public static void removeDir(Path outPath) throws IOException { - String command = null; - if (whichCluster.equalsIgnoreCase("realCluster")) { - command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr " - + outPath.toString(); - } else { - command = "rm -rf " + outPath.toString(); - } - Runtime runtime = Runtime.getRuntime(); - Process proc = runtime.exec(command); - int exitVal = -1; - try { - exitVal = proc.waitFor(); - } catch (InterruptedException e) { - System.err.println(e); - } - } - - /** * Calculate elapsed time * */ @@ -1021,7 +882,6 @@ public class ToolTestComparator { // Verify merge-join table is in sorted order verifyMergeJoin(pathTable1, sortCol, sortString, numbCols, rowMod,verifyDataColName); } else if (verifyOption.equals("sorted-union")) { - Object lastVal = null; // Verify sorted-union table is in sorted order verifySortedUnion(unionPaths, pathTable1, sortCol, sortString, @@ -1045,7 +905,6 @@ public class ToolTestComparator { // Create sorted table createsortedtable(pathTable1, pathTable2, sortString, debug); }else if (verifyOption.equals("printrownumber")) { - Object lastVal = null; //print total number of rows of the table printRowNumber(pathTable1,sortString); } Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java?rev=928384&r1=928383&r2=928384&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java Sun Mar 28 12:15:05 2010 @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Iterator; import java.util.ArrayList; +import java.util.HashMap; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; @@ -159,7 +160,7 @@ public class TestOrderPreserveMultiTable paths += ++fileId + ","; paths = paths.substring(0, paths.lastIndexOf(",")); // remove trailing comma paths += "}"; - + String queryLoad = "records1 = LOAD '" + paths + "' USING org.apache.hadoop.zebra.pig.TableLoader('" + columns + "', 'sorted');"; @@ -185,25 +186,30 @@ public class TestOrderPreserveMultiTable } // Test with input tables and provided output columns - testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1"); + testOrderPreserveUnion(inputTables, "int1", "int1, str1, byte1, source_table"); // Create results table for verification - ArrayList<ArrayList<Object>> resultTable = new ArrayList<ArrayList<Object>>(); + HashMap<Integer, ArrayList<ArrayList<Object>>> resultTable = + new HashMap<Integer, ArrayList<ArrayList<Object>>>(); + + // The ordering from FileInputFormat glob expansion. + int[] tblIndexList = {0, 9, 1, 2, 3, 4, 5, 6, 7, 8}; + for (int i=0; i<NUMB_TABLE; ++i) { + ArrayList<ArrayList<Object>> rows = new ArrayList<ArrayList<Object>>(); for (int j=0; j<NUMB_TABLE_ROWS; ++j) { ArrayList<Object> resultRow = new ArrayList<Object>(); - - resultRow.add(i); // int1 + resultRow.add(tblIndexList[i]); // int1 resultRow.add(new String("string" + j)); // str1 resultRow.add(new DataByteArray("byte" + (NUMB_TABLE_ROWS - j))); // byte1 - - resultTable.add(resultRow); + rows.add(resultRow); } + resultTable.put(i, rows); } // Verify union table Iterator<Tuple> it = pigServer.openIterator("records1"); - int numbRows = verifyTable(resultTable, 0, it); + int numbRows = verifyTable(resultTable, 0, 3, it); Assert.assertEquals(totalTableRows, numbRows); @@ -212,100 +218,6 @@ public class TestOrderPreserveMultiTable } /** - * Verify union output table with expected results - * - */ - private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int keyColumn, Iterator<Tuple> it) throws IOException { - int numbRows = 0; - int index = 0; - Object value = resultTable.get(index).get(keyColumn); // get value of primary key - - while (it.hasNext()) { - Tuple rowValues = it.next(); - - // If last primary sort key does match then search for next matching key - if (! compareObj(value, rowValues.get(keyColumn))) { - int subIndex = index + 1; - while (subIndex < resultTable.size()) { - if ( ! compareObj(value, resultTable.get(subIndex).get(keyColumn)) ) { // found new key - index = subIndex; - value = resultTable.get(index).get(keyColumn); - break; - } - ++subIndex; - } - Assert.assertEquals("Table comparison error for row : " + numbRows + " - no key found for : " - + rowValues.get(keyColumn), value, rowValues.get(keyColumn)); - } - // Search for matching row with this primary key - int subIndex = index; - - while (subIndex < resultTable.size()) { - // Compare row - ArrayList<Object> resultRow = resultTable.get(subIndex); - if ( compareRow(rowValues, resultRow) ) - break; // found matching row - ++subIndex; - Assert.assertEquals("Table comparison error for row : " + numbRows + " - no matching row found for : " - + rowValues.get(keyColumn), value, resultTable.get(subIndex).get(keyColumn)); - } - ++numbRows; - } - Assert.assertEquals(resultTable.size(), numbRows); // verify expected row count - return numbRows; - } - - /** - * Compare table rows - * - */ - private boolean compareRow(Tuple rowValues, ArrayList<Object> resultRow) throws IOException { - boolean result = true; - Assert.assertEquals(resultRow.size(), rowValues.size()); - for (int i = 0; i < rowValues.size(); ++i) { - if (! compareObj(rowValues.get(i), resultRow.get(i)) ) { - result = false; - break; - } - } - return result; - } - - /** - * Compare table values - * - */ - private boolean compareObj(Object object1, Object object2) { - if (object1 == null) { - if (object2 == null) - return true; - else - return false; - } else if (object1.equals(object2)) - return true; - else - return false; - } - - /** - * Print Pig Table (for debugging) - * - */ - private int printTable(String tablename) throws IOException { - Iterator<Tuple> it1 = pigServer.openIterator(tablename); - int numbRows = 0; - while (it1.hasNext()) { - Tuple RowValue1 = it1.next(); - ++numbRows; - System.out.println(); - for (int i = 0; i < RowValue1.size(); ++i) - System.out.println("DEBUG: " + tablename + " RowValue.get(" + i + ") = " + RowValue1.get(i)); - } - System.out.println("\nRow count : " + numbRows); - return numbRows; - } - - /** * Return the name of the routine that called getCurrentMethodName * */