Author: gates
Date: Tue Dec 1 23:55:24 2009
New Revision: 886015
URL: http://svn.apache.org/viewvc?rev=886015&view=rev
Log:
PIG-1098 Zebra Performance Optimizations.
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Tue Dec 1 23:55:24 2009
@@ -8,6 +8,8 @@
IMPROVEMENTS
+ PIG-1098 Zebra Performance Optimizations (yanz via gates)
+
PIG-1074 Zebra store function should allow '::' in column names in
output
schema (yanz via gates)
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
Tue Dec 1 23:55:24 2009
@@ -52,6 +52,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRowSplit;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGScanner;
import org.apache.hadoop.zebra.types.CGSchema;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.Partition;
@@ -874,7 +875,7 @@
*/
private class BTScanner implements TableScanner {
private Projection schema;
- private TableScanner[] cgScanners;
+ private CGScanner[] cgScanners;
private int opCount = 0;
Random random = new Random(System.nanoTime());
// checking for consistency once every 1000 times.
@@ -936,7 +937,7 @@
}
// Helper function for initialization.
- private TableScanner createCGScanner(int cgIndex, CGRowSplit cgRowSplit,
+ private CGScanner createCGScanner(int cgIndex, CGRowSplit cgRowSplit,
RangeSplit rangeSplit,
BytesWritable beginKey,
BytesWritable endKey)
@@ -972,7 +973,7 @@
try {
schema = partition.getProjection();
- cgScanners = new TableScanner[colGroups.length];
+ cgScanners = new CGScanner[colGroups.length];
for (int i = 0; i < colGroups.length; ++i) {
if (!isCGDeleted(i) && partition.isCGNeeded(i))
{
@@ -1020,7 +1021,7 @@
for (int nx = 0; nx < cgScanners.length; nx++) {
if (cgScanners[nx] != null)
{
- cur = cgScanners[nx].advance();
+ cur = cgScanners[nx].advanceCG();
if (!firstAdvance) {
if (cur != first) {
throw new IOException(
@@ -1038,9 +1039,6 @@
@Override
public boolean atEnd() throws IOException {
- if (cgScanners.length == 0) {
- return true;
- }
boolean ret = true;
int i;
for (i = 0; i < cgScanners.length; i++)
@@ -1077,16 +1075,12 @@
@Override
public void getKey(BytesWritable key) throws IOException {
- if (cgScanners.length == 0) {
- return;
- }
-
int i;
for (i = 0; i < cgScanners.length; i++)
{
if (cgScanners[i] != null)
{
- cgScanners[i].getKey(key);
+ cgScanners[i].getCGKey(key);
break;
}
}
@@ -1104,7 +1098,7 @@
if (cgScanners[index] != null)
{
BytesWritable key2 = new BytesWritable();
- cgScanners[index].getKey(key2);
+ cgScanners[index].getCGKey(key2);
if (key.equals(key2)) {
return;
}
@@ -1129,7 +1123,7 @@
{
if (cgTuples[i] == null)
throw new AssertionError("cgTuples["+i+"] is null");
- cgScanners[i].getValue(cgTuples[i]);
+ cgScanners[i].getCGValue(cgTuples[i]);
}
}
}
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
Tue Dec 1 23:55:24 2009
@@ -136,8 +136,8 @@
@Override
public int compare(Entry<String, Long> o1, Entry<String, Long> o2) {
long diff = o1.getValue() - o2.getValue();
- if (diff < 0) return -1;
- if (diff > 0) return 1;
+ if (diff < 0) return 1;
+ if (diff > 0) return -1;
return 0;
}
});
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
Tue Dec 1 23:55:24 2009
@@ -381,7 +381,7 @@
* @return A scanner object.
* @throws IOException
*/
- public synchronized TableScanner getScanner(BytesWritable beginKey,
+ public synchronized CGScanner getScanner(BytesWritable beginKey,
BytesWritable endKey, boolean closeReader) throws IOException,
ParseException {
if (closed) {
@@ -422,7 +422,7 @@
* @return A scanner object.
* @throws IOException
*/
- public synchronized TableScanner getScanner(CGRangeSplit split,
+ public synchronized CGScanner getScanner(CGRangeSplit split,
boolean closeReader) throws IOException, ParseException {
if (closed) {
throw new EOFException("Reader already closed");
@@ -449,7 +449,7 @@
* @param rowSplit specifies part index, start row, and end row.
* @return A scanner object.
*/
- public synchronized TableScanner getScanner(boolean closeReader,
+ public synchronized CGScanner getScanner(boolean closeReader,
CGRowSplit rowSplit)
throws IOException, ParseException {
if (closed) {
@@ -1013,22 +1013,34 @@
@Override
public void getKey(BytesWritable key) throws IOException {
- if (atEnd()) {
- throw new EOFException("No more key-value to read");
+ if (atEnd()) {
+ throw new EOFException("No more key-value to read");
+ }
+ scanners[current].getKey(key);
+ }
+
+ @Override
+ public void getValue(Tuple row) throws IOException {
+ if (atEnd()) {
+ throw new EOFException("No more key-value to read");
+ }
+ try {
+ scanners[current].getValue(row);
+ } catch (ParseException e) {
+ throw new IOException("Invalid Projection: "+e.getMessage());
+ }
}
+
+ public void getCGKey(BytesWritable key) throws IOException {
scanners[current].getKey(key);
}
- @Override
- public void getValue(Tuple row) throws IOException {
- if (atEnd()) {
- throw new EOFException("No more key-value to read");
- }
+ public void getCGValue(Tuple row) throws IOException {
try {
- scanners[current].getValue(row);
- } catch (ParseException e) {
- throw new IOException("Invalid Projection: "+e.getMessage());
- }
+ scanners[current].getValue(row);
+ } catch (ParseException e) {
+ throw new IOException("Invalid Projection: "+e.getMessage());
+ }
}
@Override
@@ -1042,18 +1054,29 @@
@Override
public boolean advance() throws IOException {
- if (atEnd()) {
- return false;
+ if (atEnd()) {
+ return false;
+ }
+ scanners[current].advance();
+ if (scanners[current].atEnd()) {
+ ++current;
+ if (!atEnd()) {
+ scanners[current].rewind();
+ }
+ }
+ return true;
}
- scanners[current].advance();
- if (scanners[current].atEnd()) {
- ++current;
- if (!atEnd()) {
- scanners[current].rewind();
+
+ public boolean advanceCG() throws IOException {
+ scanners[current].advance();
+ if (scanners[current].atEnd()) {
+ ++current;
+ if (!atEnd()) {
+ scanners[current].rewind();
+ }
}
+ return true;
}
- return true;
- }
@Override
public boolean atEnd() throws IOException {
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
Tue Dec 1 23:55:24 2009
@@ -94,7 +94,7 @@
@Override
public float getProgress() throws IOException {
- return (float) ((scanner.atEnd()) ? 1.0 : 0);
+ return (float)((scanner.atEnd()) ? 1.0 : 0);
}
@Override
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
Tue Dec 1 23:55:24 2009
@@ -599,18 +599,21 @@
*/
@Override
public int compareTo(Schema other) {
- if (this.mFields.size() != other.mFields.size()) {
- return this.mFields.size() - other.mFields.size();
+ int mFieldsSize = this.mFields.size();
+ if (mFieldsSize != other.mFields.size()) {
+ return mFieldsSize - other.mFields.size();
}
int ret = 0;
- for (int nx = 0; nx < this.mFields.size(); nx++) {
- if (mFields.get(nx).schema == null
- && other.mFields.get(nx).schema != null) return -1;
- else if (mFields.get(nx).schema != null
- && other.mFields.get(nx).schema == null) return 1;
- else if (mFields.get(nx).schema == null
- && other.mFields.get(nx).schema == null) return 0;
- ret = mFields.get(nx).schema.compareTo(other.mFields.get(nx).schema);
+ for (int nx = 0; nx < mFieldsSize; nx++) {
+ Schema mFieldSchema = mFields.get(nx).schema;
+ Schema otherFieldSchema = other.mFields.get(nx).schema;
+ if (mFieldSchema == null
+ && otherFieldSchema != null) return -1;
+ else if (mFieldSchema != null
+ && otherFieldSchema == null) return 1;
+ else if (mFieldSchema == null
+ && otherFieldSchema == null) return 0;
+ ret = mFieldSchema.compareTo(otherFieldSchema);
if (ret != 0) {
return ret;
}
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
Tue Dec 1 23:55:24 2009
@@ -444,14 +444,17 @@
}
void insert(final BytesWritable key) throws ExecException {
- for (int i = 0; i < mSize; i++)
- ((Tuple) mTuple).set(mSources.get(i).getProjIndex(), mSources.get(i)
- .getRecord());
+ for (int i = 0; i < mSize; i++) {
+ PartitionedColumn mSource = mSources.get(i);
+ ((Tuple) mTuple).set(mSource.getProjIndex(), mSource.getRecord());
+ }
}
void read() throws ExecException {
- for (int i = 0; i < mSize; i++)
- mSources.get(i).setRecord(mTuple.get(mSources.get(i).getProjIndex()));
+ for (int i = 0; i < mSize; i++) {
+ PartitionedColumn mSource = mSources.get(i);
+ mSource.setRecord(mTuple.get(mSource.getProjIndex()));
+ }
}
void setSource(Tuple tuple) {
@@ -661,6 +664,7 @@
}
private HashMap<Integer, CGEntry> mCGs = null; // involved CGs
+ private CGEntry[] mCGList = new CGEntry[0];
private ArrayList<PartitionedColumn> mExecs = null; // stitches to be
// performed in sequence:
// called by LOAD
@@ -1177,6 +1181,10 @@
{
result = new CGEntry(cgindex);
mCGs.put(cgindex, result);
+
+ // Constructing a collection of mCGs so that
+ // we don't have to do it again [performance]
+ mCGList = (CGEntry[])mCGs.values().toArray(new CGEntry[mCGs.size()]);
}
return result;
}
@@ -1269,20 +1277,23 @@
* read in a tuple based on stitches
*/
public void read(Tuple t) throws AssertionError, IOException, Exception {
- if (mStitchSize == 0 || mCGs == null || mCGs.isEmpty())
+ if (mStitchSize == 0 || mCGs == null || mCGList.length == 0)
return;
// dispatch
mExecs.get(mStitchSize - 1).setRecord(t);
+ //Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
+ //Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
+ //while (it.hasNext())
+ // it.next().getValue().read();
+
// read in CG data
- Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
- Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
- while (it.hasNext())
- it.next().getValue().read();
+ for (int i = 0; i < mCGList.length; i++)
+ mCGList[i].read();
// dispatch
- mExecs.get(mStitchSize - 1).setRecord(t);
+ // mExecs.get(mStitchSize - 1).setRecord(t);
// start the stitch
for (int i = 0; i < mStitchSize; i++)
@@ -1296,7 +1307,7 @@
*/
public void insert(final BytesWritable key, final Tuple t)
throws AssertionError, IOException, Exception {
- if (mSplitSize == 0 || mCGs == null || mCGs.isEmpty())
+ if (mSplitSize == 0 || mCGs == null || mCGList.length == 0)
throw new AssertionError("Empty Column Group List!");
// dispatch
@@ -1308,10 +1319,14 @@
mExecs.get(i).split();
// insert CG data
- Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
- Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
- while (it.hasNext())
- it.next().getValue().insert(key);
+ //Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
+ //Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
+ //while (it.hasNext())
+ // it.next().getValue().insert(key);
+
+ for (int i = 0; i < mCGList.length; i++)
+ mCGList[i].insert(key);
+
return;
}
@@ -1323,12 +1338,16 @@
throw new ParseException(
"Internal Logical Error: Invalid number of column groups");
for (int i = 0; i < tuples.length; i++) {
- if (mCGs.get(i) != null) {
+ CGEntry mCG = mCGs.get(i);
+ if (mCG != null) {
if (tuples[i] == null) {
- mCGs.get(i).cleanup();
+ mCG.cleanup();
mCGs.remove(i);
+ // Constructing a collection of mCGs so that
+ // we don't have to do it again [performance]
+ mCGList = (CGEntry[])mCGs.values().toArray(new CGEntry[mCGs.size()]);
} else {
- mCGs.get(i).setSource(tuples[i]);
+ mCG.setSource(tuples[i]);
}
}
}
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
Tue Dec 1 23:55:24 2009
@@ -176,13 +176,15 @@
{
int i;
dispatch(dest);
- clearMaps();
- for (i = 0; i < exec.size(); i++)
+ clearMaps();
+ int execSize = exec.size();
+ for (i = 0; i < execSize; i++)
{
- if (exec.get(i) != null)
+ SplitColumn execElement = exec.get(i);
+ if (execElement != null)
{
// split is necessary
- exec.get(i).split();
+ execElement.split();
}
}
}
@@ -289,10 +291,11 @@
{
for (int i = 0; i < size; i++)
{
- if (children.get(i).projIndex != -1) // a
leaf: set projection directly
-
((Tuple)children.get(i).leaf.field).set(children.get(i).projIndex, ((Tuple)
field).get(children.get(i).fieldIndex));
+ SplitColumn child = children.get(i);
+ if (child.projIndex != -1) // a leaf: set
projection directly
+
((Tuple)child.leaf.field).set(child.projIndex, ((Tuple)
field).get(child.fieldIndex));
else
- children.get(i).field = ((Tuple)
field).get(children.get(i).fieldIndex);
+ child.field = ((Tuple)
field).get(child.fieldIndex);
}
} else if (st == Partition.SplitType.COLLECTION) {
DataBag srcBag, tgtBag;
@@ -300,17 +303,18 @@
Tuple tuple;
for (int i = 0; i < size; i++)
{
- if (children.get(i).projIndex != -1) // a leaf: set
projection directly
+ SplitColumn child = children.get(i);
+ if (child.projIndex != -1) // a leaf: set projection
directly
{
- tgtBag =
(DataBag)((Tuple)children.get(i).leaf.field).get(children.get(i).projIndex);
+ tgtBag =
(DataBag)((Tuple)child.leaf.field).get(child.projIndex);
} else {
- tgtBag = (DataBag) children.get(i).field;
+ tgtBag = (DataBag) child.field;
tgtBag.clear();
}
for (Iterator<Tuple> it = srcBag.iterator();
it.hasNext(); )
{
tuple = TypesUtils.createTuple(scratchSchema);
- tuple.set(0, it.next().get(children.get(i).fieldIndex));
+ tuple.set(0, it.next().get(child.fieldIndex));
tgtBag.add(tuple);
}
}
@@ -320,7 +324,8 @@
Object value;
for (int i = 0; i < size; i++)
{
- if (children.get(i).projIndex != -1) // a
leaf: set projection directly
+ SplitColumn child = children.get(i);
+ if (child.projIndex != -1) // a leaf: set
projection directly
{
for (it = keys.iterator(); it.hasNext(); )
{
@@ -328,13 +333,13 @@
value = ((Map<String, Object>) field).get(key);
if (value == null)
continue;
- ((Map<String, Object>)
(((Tuple)children.get(i).leaf.field).get(children.get(i).projIndex))).put(key,
value);
+ ((Map<String, Object>)
(((Tuple)child.leaf.field).get(child.projIndex))).put(key, value);
}
} else {
for (it = keys.iterator(); it.hasNext(); )
{
key = it.next();
- children.get(i).field =
((Map<String, Object>) field).get(key);
+ child.field = ((Map<String, Object>)
field).get(key);
}
}
}
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=886015&r1=886014&r2=886015&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
Tue Dec 1 23:55:24 2009
@@ -86,7 +86,8 @@
*/
public static void resetTuple(Tuple tuple) {
try {
- for (int i = 0; i < tuple.size(); ++i) {
+ int tupleSize = tuple.size();
+ for (int i = 0; i < tupleSize; ++i) {
tuple.set(i, null);
}
}