Author: yanz Date: Thu Apr 8 03:10:24 2010 New Revision: 931764 URL: http://svn.apache.org/viewvc?rev=931764&view=rev Log: PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader (xuefux via yanz)
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapSideCoGroup.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Thu Apr 8 03:10:24 2010 @@ -16,6 +16,8 @@ Trunk (unreleased changes) IMPROVEMENTS + PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader (xuefux via yanz) + PIG-1306 Support of locally sorted input splits (yanz) PIG-1268 Need an ant target that runs all pig-related tests in Zebra (xuefuz via yanz) Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Thu Apr 8 03:10:24 2010 @@ -122,26 +122,6 @@ abstract class TableExpr { * @return A table scanner. * @throws IOException */ - public TableScanner getScanner(UnsortedTableSplit split, String projection, - Configuration conf) throws IOException, ParseException { - BasicTable.Reader reader = - new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf); - reader.setProjection(projection); - return reader.getScanner(split.getSplit(), true); - } - - /** - * Get a scanner with an unsorted split. - * - * @param split - * The range split. - * @param projection - * The projection schema. It should never be null. - * @param conf - * The configuration - * @return A table scanner. - * @throws IOException - */ public TableScanner getScanner(RowTableSplit split, String projection, Configuration conf) throws IOException, ParseException, ParseException { BasicTable.Reader reader = Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Thu Apr 8 03:10:24 2010 @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.zebra.tfile.RawComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -581,66 +580,6 @@ public class TableInputFormat implements conf.setLong("table.input.split.minSize", minSize); } - private static InputSplit[] getUnsortedSplits(JobConf conf, int numSplits, - TableExpr expr, List<BasicTable.Reader> readers, - List<BasicTableStatus> status) throws IOException { - long totalBytes = 0; - for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext();) { - BasicTableStatus s = it.next(); - totalBytes += s.getSize(); - } - - long maxSplits = totalBytes / getMinSplitSize(conf); - - if (numSplits > maxSplits) { - numSplits = -1; - } - - ArrayList<InputSplit> ret = new ArrayList<InputSplit>(); - if (numSplits <= 0) { - if (totalBytes <= 0) { - BasicTable.Reader reader = readers.get(0); - UnsortedTableSplit split = - new UnsortedTableSplit(reader, null, conf); - ret.add(split); - } else { - for (int i = 0; i < readers.size(); ++i) { - BasicTable.Reader reader = readers.get(i); - List<RangeSplit> subSplits = reader.rangeSplit(-1); - for (Iterator<RangeSplit> it = subSplits.iterator(); it.hasNext();) { - UnsortedTableSplit split = - new UnsortedTableSplit(reader, it.next(), conf); - ret.add(split); - } - } - } - } else { - long goalSize = totalBytes / numSplits; - - double SPLIT_SLOP = 1.1; - for (int i = 0; i < readers.size(); ++i) { - BasicTable.Reader reader = readers.get(i); - BasicTableStatus s = status.get(i); - int nSplits = - (int) ((s.getSize() + goalSize * (2 - SPLIT_SLOP)) / goalSize); - if (nSplits > 1) { - List<RangeSplit> subSplits = reader.rangeSplit(nSplits); - for (Iterator<RangeSplit> it = subSplits.iterator(); it.hasNext();) { - UnsortedTableSplit split = - new UnsortedTableSplit(reader, it.next(), conf); - ret.add(split); - } - } else { - UnsortedTableSplit split = new UnsortedTableSplit(reader, null, conf); - ret.add(split); - } - } - } - - LOG.info("getSplits : returning " + ret.size() + " file splits."); - return ret.toArray(new InputSplit[ret.size()]); - } - private static class DummyFileInputFormat extends FileInputFormat<BytesWritable, Tuple> { /** * the next constant and class are copies from FileInputFormat @@ -1109,85 +1048,6 @@ class SortedTableSplit implements InputS /** * Adaptor class for unsorted InputSplit for table. */ -class UnsortedTableSplit implements InputSplit { - String path = null; - RangeSplit split = null; - String[] hosts = null; - long length = 1; - - public UnsortedTableSplit(Reader reader, RangeSplit split, JobConf conf) - throws IOException { - this.path = reader.getPath(); - this.split = split; - BlockDistribution dataDist = reader.getBlockDistribution(split); - if (dataDist != null) { - length = dataDist.getLength(); - hosts = - dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 5)); - } - } - - public UnsortedTableSplit() { - // no-op for Writable construction - } - - @Override - public long getLength() throws IOException { - return length; - } - - @Override - public String[] getLocations() throws IOException { - if (hosts == null) - { - String[] tmp = new String[1]; - tmp[0] = ""; - return tmp; - } - return hosts; - } - - @Override - public void readFields(DataInput in) throws IOException { - path = WritableUtils.readString(in); - int bool = WritableUtils.readVInt(in); - if (bool == 1) { - if (split == null) split = new RangeSplit(); - split.readFields(in); - } - else { - split = null; - } - hosts = WritableUtils.readStringArray(in); - length = WritableUtils.readVLong(in); - } - - @Override - public void write(DataOutput out) throws IOException { - WritableUtils.writeString(out, path); - if (split == null) { - WritableUtils.writeVInt(out, 0); - } - else { - WritableUtils.writeVInt(out, 1); - split.write(out); - } - WritableUtils.writeStringArray(out, hosts); - WritableUtils.writeVLong(out, length); - } - - public String getPath() { - return path; - } - - public RangeSplit getSplit() { - return split; - } -} - -/** - * Adaptor class for unsorted InputSplit for table. - */ class RowTableSplit implements InputSplit { String path = null; RowSplit split = null; Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Thu Apr 8 03:10:24 2010 @@ -18,7 +18,6 @@ package org.apache.hadoop.zebra.mapred; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -51,22 +50,16 @@ public class TableRecordReader implement * @throws IOException */ public TableRecordReader(TableExpr expr, String projection, - InputSplit split, - JobConf conf) throws IOException, ParseException { - if (split != null && split instanceof RowTableSplit) { - RowTableSplit rowSplit = (RowTableSplit) split; - if (!expr.sortedSplitRequired() && Projection.getVirtualColumnIndices(projection) != null) - throw new IllegalArgumentException("virtual column requires union of multiple sorted tables"); - scanner = expr.getScanner(rowSplit, projection, conf); - } else if (expr.sortedSplitRequired()) { - SortedTableSplit tblSplit = (SortedTableSplit) split; - scanner = - expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection, - conf); - } else { - UnsortedTableSplit tblSplit = (UnsortedTableSplit) split; - scanner = expr.getScanner(tblSplit, projection, conf); - } + InputSplit split, JobConf conf) throws IOException, ParseException { + if( split instanceof RowTableSplit ) { + RowTableSplit rowSplit = (RowTableSplit)split; + if( Projection.getVirtualColumnIndices( projection ) != null ) + throw new IllegalArgumentException("virtual column requires union of multiple sorted tables"); + scanner = expr.getScanner(rowSplit, projection, conf); + } else { + SortedTableSplit tblSplit = (SortedTableSplit)split; + scanner = expr.getScanner( tblSplit.getBegin(), tblSplit.getEnd(), projection, conf ); + } } @Override Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java?rev=931764&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java (added) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java Thu Apr 8 03:10:24 2010 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.zebra.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.pig.data.DataReaderWriter; + +public class SortedTableSplitComparable implements WritableComparable<SortedTableSplitComparable> { + private static final long serialVersionUID = 1L; + + protected Integer index; + + //need a default constructor to be able to de-serialize using just + // the Writable interface + public SortedTableSplitComparable(){} + + public SortedTableSplitComparable(int index){ + this.index = index; + } + + + @Override + public int compareTo(SortedTableSplitComparable other) { + return Integer.signum( index - other.index ); + } + + + @Override + public void readFields(DataInput in) throws IOException { + index = (Integer)DataReaderWriter.readDatum(in); + } + + + @Override + public void write(DataOutput out) throws IOException { + DataReaderWriter.writeDatum(out, index); + } + + @Override + public String toString(){ + return "Index = " + index ; + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return index; + } + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SortedTableSplitComparable other = (SortedTableSplitComparable) obj; + return this.index.intValue() == other.index.intValue(); + } +} Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java Thu Apr 8 03:10:24 2010 @@ -111,26 +111,6 @@ abstract class TableExpr { } /** - * Get a scanner with an unsorted split. - * - * @param split - * The range split. - * @param projection - * The projection schema. It should never be null. - * @param conf - * The configuration - * @return A table scanner. - * @throws IOException - */ - public TableScanner getScanner(UnsortedTableSplit split, String projection, - Configuration conf) throws IOException, ParseException { - BasicTable.Reader reader = - new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf); - reader.setProjection(projection); - return reader.getScanner(split.getSplit(), true); - } - - /** * Get a scanner with an row split. * * @param split Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Thu Apr 8 03:10:24 2010 @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.zebra.tfile.RawComparable; @@ -162,7 +163,7 @@ public class TableInputFormat extends In private static final String GLOBALLY_SORTED = "globally_sorted"; private static final String LOCALLY_SORTED = "locally_sorted"; static final String DELETED_CG_SEPARATOR_PER_UNION = ";"; - + /** * Set the paths to the input table. * @@ -396,7 +397,6 @@ public class TableInputFormat extends In * JobContext object. * @param sortInfo * ZebraSortInfo object containing sorting information. - * */ public static void requireSortedTable(JobContext jobContext, ZebraSortInfo sortInfo) throws IOException { @@ -590,7 +590,7 @@ public class TableInputFormat extends In bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit) null)); } - SortedTableSplit split = new SortedTableSplit(null, null, bd, conf); + SortedTableSplit split = new SortedTableSplit(0, null, null, bd, conf); splits.add(split); return splits; } @@ -637,7 +637,7 @@ public class TableInputFormat extends In beginB = new BytesWritable(begin.buffer()); if (end != null) endB = new BytesWritable(end.buffer()); - SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf); + SortedTableSplit split = new SortedTableSplit(i, beginB, endB, bd, conf); splits.add(split); } LOG.info("getSplits : returning " + splits.size() + " sorted splits."); @@ -1033,13 +1033,33 @@ public class TableInputFormat extends In throw new IOException("Projection parsing failed : "+e.getMessage()); } } + + /** + * Get a comparable object from the given InputSplit object. + * + * @param inputSplit An InputSplit instance. It should be type of SortedTableSplit. + * @return a comparable object of type WritableComparable + */ + public static WritableComparable<?> getSortedTableSplitComparable(InputSplit inputSplit) { + SortedTableSplit split = null; + if( inputSplit instanceof SortedTableSplit ) { + split = (SortedTableSplit)inputSplit; + } else { + throw new RuntimeException( "LoadFunc expected split of type [" + + SortedTableSplit.class.getCanonicalName() + "]" ); + } + return new SortedTableSplitComparable( split.getIndex() ); + } + } /** * Adaptor class for sorted InputSplit for table. */ class SortedTableSplit extends InputSplit implements Writable { - + // the order of the split in all splits generated. + private int index; + BytesWritable begin = null, end = null; String[] hosts; @@ -1050,8 +1070,10 @@ class SortedTableSplit extends InputSpli // no-op for Writable construction } - public SortedTableSplit(BytesWritable begin, BytesWritable end, + public SortedTableSplit(int index, BytesWritable begin, BytesWritable end, BlockDistribution bd, Configuration conf) { + this.index = index; + if (begin != null) { this.begin = new BytesWritable(); this.begin.set(begin.getBytes(), 0, begin.getLength()); @@ -1068,6 +1090,10 @@ class SortedTableSplit extends InputSpli } } + public int getIndex() { + return index; + } + @Override public long getLength() throws IOException { return length; @@ -1141,85 +1167,6 @@ class SortedTableSplit extends InputSpli /** * Adaptor class for unsorted InputSplit for table. */ -class UnsortedTableSplit extends InputSplit implements Writable { - String path = null; - RangeSplit split = null; - String[] hosts = null; - long length = 1; - - public UnsortedTableSplit(Reader reader, RangeSplit split, Configuration conf) - throws IOException { - this.path = reader.getPath(); - this.split = split; - BlockDistribution dataDist = reader.getBlockDistribution(split); - if (dataDist != null) { - length = dataDist.getLength(); - hosts = - dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation", 5)); - } - } - - public UnsortedTableSplit() { - // no-op for Writable construction - } - - @Override - public long getLength() throws IOException { - return length; - } - - @Override - public String[] getLocations() throws IOException { - if (hosts == null) - { - String[] tmp = new String[1]; - tmp[0] = ""; - return tmp; - } - return hosts; - } - - @Override - public void readFields(DataInput in) throws IOException { - path = WritableUtils.readString(in); - int bool = WritableUtils.readVInt(in); - if (bool == 1) { - if (split == null) split = new RangeSplit(); - split.readFields(in); - } - else { - split = null; - } - hosts = WritableUtils.readStringArray(in); - length = WritableUtils.readVLong(in); - } - - @Override - public void write(DataOutput out) throws IOException { - WritableUtils.writeString(out, path); - if (split == null) { - WritableUtils.writeVInt(out, 0); - } - else { - WritableUtils.writeVInt(out, 1); - split.write(out); - } - WritableUtils.writeStringArray(out, hosts); - WritableUtils.writeVLong(out, length); - } - - public String getPath() { - return path; - } - - public RangeSplit getSplit() { - return split; - } -} - -/** - * Adaptor class for unsorted InputSplit for table. - */ class RowTableSplit extends InputSplit implements Writable{ /** * Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java Thu Apr 8 03:10:24 2010 @@ -52,20 +52,19 @@ public class TableRecordReader extends R * @throws IOException */ public TableRecordReader(TableExpr expr, String projection, - InputSplit split, JobContext jobContext) throws IOException, ParseException { + InputSplit split, JobContext jobContext) throws IOException, ParseException { Configuration conf = jobContext.getConfiguration(); - if (split instanceof RowTableSplit) { - RowTableSplit rowSplit = (RowTableSplit) split; - if ((!expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1) && - Projection.getVirtualColumnIndices(projection) != null) - throw new IllegalArgumentException("virtual column requires union of multiple sorted tables"); - scanner = expr.getScanner(rowSplit, projection, conf); + if( split instanceof RowTableSplit ) { + RowTableSplit rowSplit = (RowTableSplit)split; + if( Projection.getVirtualColumnIndices( projection ) != null && + ( !expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1 ) ) { + throw new IllegalArgumentException("virtual column requires union of multiple sorted tables"); + } + scanner = expr.getScanner(rowSplit, projection, conf); } else { - SortedTableSplit tblSplit = (SortedTableSplit) split; - scanner = - expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection, - conf); - } + SortedTableSplit tblSplit = (SortedTableSplit)split; + scanner = expr.getScanner( tblSplit.getBegin(), tblSplit.getEnd(), projection, conf ); + } } @Override Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Thu Apr 8 03:10:24 2010 @@ -32,14 +32,15 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.zebra.io.BasicTable; import org.apache.hadoop.zebra.mapreduce.TableInputFormat; import org.apache.hadoop.zebra.mapreduce.TableRecordReader; import org.apache.hadoop.zebra.mapreduce.TableInputFormat.SplitMode; -import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo; import org.apache.hadoop.zebra.parser.ParseException; import org.apache.hadoop.zebra.schema.ColumnType; import org.apache.hadoop.zebra.schema.Schema; @@ -50,6 +51,7 @@ import org.apache.pig.Expression; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; import org.apache.pig.LoadPushDown; +import org.apache.pig.OrderedLoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; @@ -65,7 +67,7 @@ import org.apache.pig.CollectableLoadFun * Pig IndexableLoadFunc and Slicer for Zebra Table */ public class TableLoader extends LoadFunc implements LoadMetadata, LoadPushDown, - IndexableLoadFunc, CollectableLoadFunc { + IndexableLoadFunc, CollectableLoadFunc, OrderedLoadFunc { static final Log LOG = LogFactory.getLog(TableLoader.class); private static final String UDFCONTEXT_PROJ_STRING = "zebra.UDFContext.projectionString"; @@ -423,11 +425,17 @@ public class TableLoader extends LoadFun public void setUDFContextSignature(String signature) { udfContextSignature = signature; } + + @Override + public WritableComparable<?> getSplitComparable(InputSplit split) + throws IOException { + return TableInputFormat.getSortedTableSplitComparable( split ); + } - @Override - public void ensureAllKeyInstancesInSameSplit() throws IOException { - Properties properties = UDFContext.getUDFContext().getUDFProperties(this.getClass(), - new String[] { udfContextSignature } ); - properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, "true"); - } + @Override + public void ensureAllKeyInstancesInSameSplit() throws IOException { + Properties properties = UDFContext.getUDFContext().getUDFProperties(this.getClass(), + new String[] { udfContextSignature } ); + properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, "true"); + } } Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java?rev=931764&r1=931763&r2=931764&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java Thu Apr 8 03:10:24 2010 @@ -127,4 +127,26 @@ public class TestTfileSplit { Assert.assertEquals(splits.size(), 0); } + + @Test + public void testSortedSplitOrdering() throws IOException, ParseException { + BasicTable.drop(path, conf); + TestBasicTable.createBasicTable(1, 1000000, "a, b, c, d, e, f", "[a, e, d]", "a", path, true); + + TableInputFormat inputFormat = new TableInputFormat(); + Job job = new Job(conf); + inputFormat.setInputPaths(job, path); + inputFormat.setMinSplitSize(job, 100); + inputFormat.setProjection(job, "d"); + inputFormat.requireSortedTable( job, null ); + List<InputSplit> splits = inputFormat.getSplits(job); + + int index = 0; + for( InputSplit is : splits ) { + Assert.assertTrue( is instanceof SortedTableSplit ); + SortedTableSplit split = (SortedTableSplit)is; + Assert.assertEquals( index++, split.getIndex() ); + } + } + } Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapSideCoGroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapSideCoGroup.java?rev=931764&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapSideCoGroup.java (added) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapSideCoGroup.java Thu Apr 8 03:10:24 2010 @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.pig; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.zebra.BaseTestCase; +import org.apache.hadoop.zebra.io.BasicTable; +import org.apache.hadoop.zebra.io.TableInserter; +import org.apache.hadoop.zebra.io.TestBasicTable; +import org.apache.hadoop.zebra.schema.Schema; +import org.apache.hadoop.zebra.types.TypesUtils; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.DataBag; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMapSideCoGroup extends BaseTestCase { + private static Path table1, table2; + private static Configuration conf; + + @BeforeClass + public static void setUp() throws Exception { + init(); + TestBasicTable.setUpOnce(); + conf = TestBasicTable.conf; + table1 = getTableFullPath( "TestMapSideCoGroup1" ); + removeDir( table1 ); + table2 = getTableFullPath( "TestMapSideCoGroup2" ); + removeDir( table2 ); + } + + @AfterClass + public static void tearDown() throws Exception { + pigServer.shutdown(); + } + + @Test + public void test() throws IOException { + int table1RowCount = 100000; + int table2RowCount = 200000; + int table1DupFactor = 15; + int table2DupFactor = 125; + createTable( table1RowCount, table1DupFactor, "a:int, b:string, c:string", "[a, b, c]", "a", table1 ); + createTable( table2RowCount, table2DupFactor, "a:int, d:string", "[a, d]", "a", table2 ); + + String qs1 = "T1 = load '" + table1.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, b, c', 'sorted');"; + System.out.println( "qs1: " + qs1 ); + String qs2 = "T2 = load '" + table2.toString() + "' USING org.apache.hadoop.zebra.pig.TableLoader('a, d', 'sorted');"; + System.out.println( "qs2: " + qs2 ); + + pigServer.registerQuery( qs1 ); + pigServer.registerQuery( qs2 ); + + String qs3 = "T3 = cogroup T1 by a, T2 by a USING 'merge';"; + pigServer.registerQuery( qs3 ); + + org.apache.pig.impl.logicalLayer.schema.Schema schema = pigServer.dumpSchema( "T3" ); + Assert.assertEquals( "{group: int,T1: {a: int,b: chararray,c: chararray},T2: {a: int,d: chararray}}", + schema.toString() ); + Iterator<Tuple> it = pigServer.openIterator( "T3" ); + int count = 0; + int expectedCount = Math.max( table1RowCount/table1DupFactor, table2RowCount/table2DupFactor) + 1; + int totalRowsInBag1 = 0; + int totalRowsInBag2 = 0; + while( it.hasNext() ) { + Tuple result = it.next(); + totalRowsInBag1 += ( (DataBag)result.get( 1 ) ).size(); + totalRowsInBag2 += ( (DataBag)result.get( 2 ) ).size(); +// System.out.println( "tuple = " + result.toDelimitedString( "," ) ); + count++; + } + + Assert.assertEquals( expectedCount, count ); + Assert.assertEquals(table1RowCount, totalRowsInBag1 ); + Assert.assertEquals(table2RowCount, totalRowsInBag2 ); + } + + public static void createTable(int rows, int step, String strSchema, String storage, String sortColumns, Path path) + throws IOException { + if( fs.exists(path) ) { + BasicTable.drop(path, conf); + } + + BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage, sortColumns, null, conf); + writer.finish(); + + Schema schema = writer.getSchema(); + String colNames[] = schema.getColumns(); + Tuple tuple = TypesUtils.createTuple(schema); + + writer = new BasicTable.Writer(path, conf); + TableInserter inserter = writer.getInserter( String.format("part-%06d", 1), true ); + for( int i = 1; i <= rows; ++i ) { + BytesWritable key = new BytesWritable( String.format( "key%09d", i/step ).getBytes() ); + TypesUtils.resetTuple(tuple); + tuple.set( 0, i / step ); + for( int k = 1; k < tuple.size(); ++k ) { + try { + tuple.set( k, new String( "col-" + colNames[k] + i * 10 ) ); + } catch (ExecException e) { + e.printStackTrace(); + } + } + inserter.insert(key, tuple); + } + inserter.close(); + + writer = new BasicTable.Writer(path, conf); + writer.close(); + } + +}