Author: yanz
Date: Thu Apr 8 16:21:57 2010
New Revision: 931990
URL: http://svn.apache.org/viewvc?rev=931990&view=rev
Log:
PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader (xuefux
via yanz)
Added:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
Modified: hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/contrib/zebra/CHANGES.txt Thu Apr 8
16:21:57 2010
@@ -16,6 +16,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ PIG-1315 Implementing OrderedLoadFunc interface for Zebra TableLoader
(xuefux via yanz)
+
PIG-1306 Support of locally sorted input splits (yanz)
PIG-1268 Need an ant target that runs all pig-related tests in Zebra
(xuefuz via yanz)
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
Thu Apr 8 16:21:57 2010
@@ -122,26 +122,6 @@ abstract class TableExpr {
* @return A table scanner.
* @throws IOException
*/
- public TableScanner getScanner(UnsortedTableSplit split, String projection,
- Configuration conf) throws IOException, ParseException {
- BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf),
conf);
- reader.setProjection(projection);
- return reader.getScanner(split.getSplit(), true);
- }
-
- /**
- * Get a scanner with an unsorted split.
- *
- * @param split
- * The range split.
- * @param projection
- * The projection schema. It should never be null.
- * @param conf
- * The configuration
- * @return A table scanner.
- * @throws IOException
- */
public TableScanner getScanner(RowTableSplit split, String projection,
Configuration conf) throws IOException, ParseException, ParseException {
BasicTable.Reader reader =
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
Thu Apr 8 16:21:57 2010
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.zebra.tfile.RawComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -581,66 +580,6 @@ public class TableInputFormat implements
conf.setLong("table.input.split.minSize", minSize);
}
- private static InputSplit[] getUnsortedSplits(JobConf conf, int numSplits,
- TableExpr expr, List<BasicTable.Reader> readers,
- List<BasicTableStatus> status) throws IOException {
- long totalBytes = 0;
- for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext();) {
- BasicTableStatus s = it.next();
- totalBytes += s.getSize();
- }
-
- long maxSplits = totalBytes / getMinSplitSize(conf);
-
- if (numSplits > maxSplits) {
- numSplits = -1;
- }
-
- ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
- if (numSplits <= 0) {
- if (totalBytes <= 0) {
- BasicTable.Reader reader = readers.get(0);
- UnsortedTableSplit split =
- new UnsortedTableSplit(reader, null, conf);
- ret.add(split);
- } else {
- for (int i = 0; i < readers.size(); ++i) {
- BasicTable.Reader reader = readers.get(i);
- List<RangeSplit> subSplits = reader.rangeSplit(-1);
- for (Iterator<RangeSplit> it = subSplits.iterator(); it.hasNext();) {
- UnsortedTableSplit split =
- new UnsortedTableSplit(reader, it.next(), conf);
- ret.add(split);
- }
- }
- }
- } else {
- long goalSize = totalBytes / numSplits;
-
- double SPLIT_SLOP = 1.1;
- for (int i = 0; i < readers.size(); ++i) {
- BasicTable.Reader reader = readers.get(i);
- BasicTableStatus s = status.get(i);
- int nSplits =
- (int) ((s.getSize() + goalSize * (2 - SPLIT_SLOP)) / goalSize);
- if (nSplits > 1) {
- List<RangeSplit> subSplits = reader.rangeSplit(nSplits);
- for (Iterator<RangeSplit> it = subSplits.iterator(); it.hasNext();) {
- UnsortedTableSplit split =
- new UnsortedTableSplit(reader, it.next(), conf);
- ret.add(split);
- }
- } else {
- UnsortedTableSplit split = new UnsortedTableSplit(reader, null,
conf);
- ret.add(split);
- }
- }
- }
-
- LOG.info("getSplits : returning " + ret.size() + " file splits.");
- return ret.toArray(new InputSplit[ret.size()]);
- }
-
private static class DummyFileInputFormat extends
FileInputFormat<BytesWritable, Tuple> {
/**
* the next constant and class are copies from FileInputFormat
@@ -1109,85 +1048,6 @@ class SortedTableSplit implements InputS
/**
* Adaptor class for unsorted InputSplit for table.
*/
-class UnsortedTableSplit implements InputSplit {
- String path = null;
- RangeSplit split = null;
- String[] hosts = null;
- long length = 1;
-
- public UnsortedTableSplit(Reader reader, RangeSplit split, JobConf conf)
- throws IOException {
- this.path = reader.getPath();
- this.split = split;
- BlockDistribution dataDist = reader.getBlockDistribution(split);
- if (dataDist != null) {
- length = dataDist.getLength();
- hosts =
- dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation",
5));
- }
- }
-
- public UnsortedTableSplit() {
- // no-op for Writable construction
- }
-
- @Override
- public long getLength() throws IOException {
- return length;
- }
-
- @Override
- public String[] getLocations() throws IOException {
- if (hosts == null)
- {
- String[] tmp = new String[1];
- tmp[0] = "";
- return tmp;
- }
- return hosts;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- path = WritableUtils.readString(in);
- int bool = WritableUtils.readVInt(in);
- if (bool == 1) {
- if (split == null) split = new RangeSplit();
- split.readFields(in);
- }
- else {
- split = null;
- }
- hosts = WritableUtils.readStringArray(in);
- length = WritableUtils.readVLong(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeString(out, path);
- if (split == null) {
- WritableUtils.writeVInt(out, 0);
- }
- else {
- WritableUtils.writeVInt(out, 1);
- split.write(out);
- }
- WritableUtils.writeStringArray(out, hosts);
- WritableUtils.writeVLong(out, length);
- }
-
- public String getPath() {
- return path;
- }
-
- public RangeSplit getSplit() {
- return split;
- }
-}
-
-/**
- * Adaptor class for unsorted InputSplit for table.
- */
class RowTableSplit implements InputSplit {
String path = null;
RowSplit split = null;
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
Thu Apr 8 16:21:57 2010
@@ -18,7 +18,6 @@ package org.apache.hadoop.zebra.mapred;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -51,22 +50,16 @@ public class TableRecordReader implement
* @throws IOException
*/
public TableRecordReader(TableExpr expr, String projection,
- InputSplit split,
- JobConf conf) throws IOException, ParseException {
- if (split != null && split instanceof RowTableSplit) {
- RowTableSplit rowSplit = (RowTableSplit) split;
- if (!expr.sortedSplitRequired() &&
Projection.getVirtualColumnIndices(projection) != null)
- throw new IllegalArgumentException("virtual column requires union of
multiple sorted tables");
- scanner = expr.getScanner(rowSplit, projection, conf);
- } else if (expr.sortedSplitRequired()) {
- SortedTableSplit tblSplit = (SortedTableSplit) split;
- scanner =
- expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
- conf);
- } else {
- UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
- scanner = expr.getScanner(tblSplit, projection, conf);
- }
+ InputSplit split, JobConf conf) throws IOException,
ParseException {
+ if( split instanceof RowTableSplit ) {
+ RowTableSplit rowSplit = (RowTableSplit)split;
+ if( Projection.getVirtualColumnIndices( projection ) != null )
+ throw new IllegalArgumentException("virtual column
requires union of multiple sorted tables");
+ scanner = expr.getScanner(rowSplit, projection, conf);
+ } else {
+ SortedTableSplit tblSplit = (SortedTableSplit)split;
+ scanner = expr.getScanner( tblSplit.getBegin(),
tblSplit.getEnd(), projection, conf );
+ }
}
@Override
Added:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java?rev=931990&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java
(added)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/SortedTableSplitComparable.java
Thu Apr 8 16:21:57 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.data.DataReaderWriter;
+
+public class SortedTableSplitComparable implements
WritableComparable<SortedTableSplitComparable> {
+ private static final long serialVersionUID = 1L;
+
+ protected Integer index;
+
+ //need a default constructor to be able to de-serialize using just
+ // the Writable interface
+ public SortedTableSplitComparable(){}
+
+ public SortedTableSplitComparable(int index){
+ this.index = index;
+ }
+
+
+ @Override
+ public int compareTo(SortedTableSplitComparable other) {
+ return Integer.signum( index - other.index );
+ }
+
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ index = (Integer)DataReaderWriter.readDatum(in);
+ }
+
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ DataReaderWriter.writeDatum(out, index);
+ }
+
+ @Override
+ public String toString(){
+ return "Index = " + index ;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return index;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SortedTableSplitComparable other = (SortedTableSplitComparable) obj;
+ return this.index.intValue() == other.index.intValue();
+ }
+}
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
Thu Apr 8 16:21:57 2010
@@ -111,26 +111,6 @@ abstract class TableExpr {
}
/**
- * Get a scanner with an unsorted split.
- *
- * @param split
- * The range split.
- * @param projection
- * The projection schema. It should never be null.
- * @param conf
- * The configuration
- * @return A table scanner.
- * @throws IOException
- */
- public TableScanner getScanner(UnsortedTableSplit split, String projection,
- Configuration conf) throws IOException, ParseException {
- BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf),
conf);
- reader.setProjection(projection);
- return reader.getScanner(split.getSplit(), true);
- }
-
- /**
* Get a scanner with an row split.
*
* @param split
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
Thu Apr 8 16:21:57 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.zebra.tfile.RawComparable;
@@ -162,7 +163,7 @@ public class TableInputFormat extends In
private static final String GLOBALLY_SORTED = "globally_sorted";
private static final String LOCALLY_SORTED = "locally_sorted";
static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
-
+
/**
* Set the paths to the input table.
*
@@ -396,7 +397,6 @@ public class TableInputFormat extends In
* JobContext object.
* @param sortInfo
* ZebraSortInfo object containing sorting information.
- *
*/
public static void requireSortedTable(JobContext jobContext, ZebraSortInfo
sortInfo) throws IOException {
@@ -590,7 +590,7 @@ public class TableInputFormat extends In
bd = BlockDistribution.sum(bd,
reader.getBlockDistribution((RangeSplit) null));
}
- SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
+ SortedTableSplit split = new SortedTableSplit(0, null, null, bd, conf);
splits.add(split);
return splits;
}
@@ -637,7 +637,7 @@ public class TableInputFormat extends In
beginB = new BytesWritable(begin.buffer());
if (end != null)
endB = new BytesWritable(end.buffer());
- SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
+ SortedTableSplit split = new SortedTableSplit(i, beginB, endB, bd, conf);
splits.add(split);
}
LOG.info("getSplits : returning " + splits.size() + " sorted splits.");
@@ -1033,13 +1033,33 @@ public class TableInputFormat extends In
throw new IOException("Projection parsing failed : "+e.getMessage());
}
}
+
+ /**
+ * Get a comparable object from the given InputSplit object.
+ *
+ * @param inputSplit An InputSplit instance. It should be type of
SortedTableSplit.
+ * @return a comparable object of type WritableComparable
+ */
+ public static WritableComparable<?>
getSortedTableSplitComparable(InputSplit inputSplit) {
+ SortedTableSplit split = null;
+ if( inputSplit instanceof SortedTableSplit ) {
+ split = (SortedTableSplit)inputSplit;
+ } else {
+ throw new RuntimeException( "LoadFunc expected split of type [" +
+ SortedTableSplit.class.getCanonicalName() + "]" );
+ }
+ return new SortedTableSplitComparable( split.getIndex() );
+ }
+
}
/**
* Adaptor class for sorted InputSplit for table.
*/
class SortedTableSplit extends InputSplit implements Writable {
-
+ // the order of the split in all splits generated.
+ private int index;
+
BytesWritable begin = null, end = null;
String[] hosts;
@@ -1050,8 +1070,10 @@ class SortedTableSplit extends InputSpli
// no-op for Writable construction
}
- public SortedTableSplit(BytesWritable begin, BytesWritable end,
+ public SortedTableSplit(int index, BytesWritable begin, BytesWritable end,
BlockDistribution bd, Configuration conf) {
+ this.index = index;
+
if (begin != null) {
this.begin = new BytesWritable();
this.begin.set(begin.getBytes(), 0, begin.getLength());
@@ -1068,6 +1090,10 @@ class SortedTableSplit extends InputSpli
}
}
+ public int getIndex() {
+ return index;
+ }
+
@Override
public long getLength() throws IOException {
return length;
@@ -1141,85 +1167,6 @@ class SortedTableSplit extends InputSpli
/**
* Adaptor class for unsorted InputSplit for table.
*/
-class UnsortedTableSplit extends InputSplit implements Writable {
- String path = null;
- RangeSplit split = null;
- String[] hosts = null;
- long length = 1;
-
- public UnsortedTableSplit(Reader reader, RangeSplit split, Configuration
conf)
- throws IOException {
- this.path = reader.getPath();
- this.split = split;
- BlockDistribution dataDist = reader.getBlockDistribution(split);
- if (dataDist != null) {
- length = dataDist.getLength();
- hosts =
- dataDist.getHosts(conf.getInt("mapred.lib.table.input.nlocation",
5));
- }
- }
-
- public UnsortedTableSplit() {
- // no-op for Writable construction
- }
-
- @Override
- public long getLength() throws IOException {
- return length;
- }
-
- @Override
- public String[] getLocations() throws IOException {
- if (hosts == null)
- {
- String[] tmp = new String[1];
- tmp[0] = "";
- return tmp;
- }
- return hosts;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- path = WritableUtils.readString(in);
- int bool = WritableUtils.readVInt(in);
- if (bool == 1) {
- if (split == null) split = new RangeSplit();
- split.readFields(in);
- }
- else {
- split = null;
- }
- hosts = WritableUtils.readStringArray(in);
- length = WritableUtils.readVLong(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeString(out, path);
- if (split == null) {
- WritableUtils.writeVInt(out, 0);
- }
- else {
- WritableUtils.writeVInt(out, 1);
- split.write(out);
- }
- WritableUtils.writeStringArray(out, hosts);
- WritableUtils.writeVLong(out, length);
- }
-
- public String getPath() {
- return path;
- }
-
- public RangeSplit getSplit() {
- return split;
- }
-}
-
-/**
- * Adaptor class for unsorted InputSplit for table.
- */
class RowTableSplit extends InputSplit implements Writable{
/**
*
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableRecordReader.java
Thu Apr 8 16:21:57 2010
@@ -52,20 +52,19 @@ public class TableRecordReader extends R
* @throws IOException
*/
public TableRecordReader(TableExpr expr, String projection,
- InputSplit split, JobContext jobContext) throws IOException,
ParseException {
+ InputSplit split, JobContext jobContext) throws IOException,
ParseException {
Configuration conf = jobContext.getConfiguration();
- if (split instanceof RowTableSplit) {
- RowTableSplit rowSplit = (RowTableSplit) split;
- if ((!expr.sortedSplitRequired() || rowSplit.getTableIndex() == -1) &&
- Projection.getVirtualColumnIndices(projection) != null)
- throw new IllegalArgumentException("virtual column requires union of
multiple sorted tables");
- scanner = expr.getScanner(rowSplit, projection, conf);
+ if( split instanceof RowTableSplit ) {
+ RowTableSplit rowSplit = (RowTableSplit)split;
+ if( Projection.getVirtualColumnIndices( projection ) != null
&&
+ ( !expr.sortedSplitRequired() || rowSplit.getTableIndex()
== -1 ) ) {
+ throw new IllegalArgumentException("virtual column
requires union of multiple sorted tables");
+ }
+ scanner = expr.getScanner(rowSplit, projection, conf);
} else {
- SortedTableSplit tblSplit = (SortedTableSplit) split;
- scanner =
- expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
- conf);
- }
+ SortedTableSplit tblSplit = (SortedTableSplit)split;
+ scanner = expr.getScanner( tblSplit.getBegin(),
tblSplit.getEnd(), projection, conf );
+ }
}
@Override
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
Thu Apr 8 16:21:57 2010
@@ -32,14 +32,15 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.mapreduce.TableInputFormat;
import org.apache.hadoop.zebra.mapreduce.TableRecordReader;
import org.apache.hadoop.zebra.mapreduce.TableInputFormat.SplitMode;
-import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.ColumnType;
import org.apache.hadoop.zebra.schema.Schema;
@@ -50,6 +51,7 @@ import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
+import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
@@ -65,7 +67,7 @@ import org.apache.pig.CollectableLoadFun
* Pig IndexableLoadFunc and Slicer for Zebra Table
*/
public class TableLoader extends LoadFunc implements LoadMetadata,
LoadPushDown,
- IndexableLoadFunc, CollectableLoadFunc {
+ IndexableLoadFunc, CollectableLoadFunc, OrderedLoadFunc {
static final Log LOG = LogFactory.getLog(TableLoader.class);
private static final String UDFCONTEXT_PROJ_STRING =
"zebra.UDFContext.projectionString";
@@ -423,11 +425,17 @@ public class TableLoader extends LoadFun
public void setUDFContextSignature(String signature) {
udfContextSignature = signature;
}
+
+ @Override
+ public WritableComparable<?> getSplitComparable(InputSplit
split)
+ throws IOException {
+ return TableInputFormat.getSortedTableSplitComparable( split );
+ }
- @Override
- public void ensureAllKeyInstancesInSameSplit() throws IOException {
- Properties properties =
UDFContext.getUDFContext().getUDFProperties(this.getClass(),
- new String[] { udfContextSignature } );
- properties.setProperty(UDFCONTEXT_GLOBAL_SORTING, "true");
- }
+ @Override
+ public void ensureAllKeyInstancesInSameSplit() throws
IOException {
+ Properties properties =
UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+ new String[] { udfContextSignature } );
+ properties.setProperty(UDFCONTEXT_GLOBAL_SORTING,
"true");
+ }
}
Modified:
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java?rev=931990&r1=931989&r2=931990&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
(original)
+++
hadoop/pig/branches/branch-0.7/contrib/zebra/src/test/org/apache/hadoop/zebra/mapreduce/TestTfileSplit.java
Thu Apr 8 16:21:57 2010
@@ -127,4 +127,26 @@ public class TestTfileSplit {
Assert.assertEquals(splits.size(), 0);
}
+
+ @Test
+ public void testSortedSplitOrdering() throws IOException,
ParseException {
+ BasicTable.drop(path, conf);
+ TestBasicTable.createBasicTable(1, 1000000, "a, b, c, d, e, f",
"[a, e, d]", "a", path, true);
+
+ TableInputFormat inputFormat = new TableInputFormat();
+ Job job = new Job(conf);
+ inputFormat.setInputPaths(job, path);
+ inputFormat.setMinSplitSize(job, 100);
+ inputFormat.setProjection(job, "d");
+ inputFormat.requireSortedTable( job, null );
+ List<InputSplit> splits = inputFormat.getSplits(job);
+
+ int index = 0;
+ for( InputSplit is : splits ) {
+ Assert.assertTrue( is instanceof SortedTableSplit );
+ SortedTableSplit split = (SortedTableSplit)is;
+ Assert.assertEquals( index++, split.getIndex() );
+ }
+ }
+
}