Author: pradeepkth
Date: Wed Sep 2 01:13:04 2009
New Revision: 810327
URL: http://svn.apache.org/viewvc?rev=810327&view=rev
Log:
PIG-934: Merge join implementation currently does not seek to right point on
the right side input based on the offset provided by the index (ashutoshc via
pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=810327&r1=810326&r2=810327&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Sep 2 01:13:04 2009
@@ -59,7 +59,11 @@
PIG-792: skew join implementation (sriranjan via olgan)
BUG FIXES
-
+
+ PIG-934: Merge join implementation currently does not seek to right point
+ on the right side input based on the offset provided by the index
+ (ashutoshc via pradeepkth)
+
PIG-925: Fix join in local mode (daijy)
PIG-913: Error in Pig script when grouping on chararray column (daijy)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=810327&r1=810326&r2=810327&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
Wed Sep 2 01:13:04 2009
@@ -100,7 +100,7 @@
String filename = lFile.getFileName();
loader =
(LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
- is = FileLocalizer.open(filename, pc);
+ is = (this.offset == 0) ? FileLocalizer.open(filename, pc) :
FileLocalizer.open(filename, this.offset,pc);
loader.bindTo(filename , new BufferedPositionedInputStream(is),
this.offset, Long.MAX_VALUE);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=810327&r1=810326&r2=810327&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Wed Sep 2
01:13:04 2009
@@ -41,6 +41,8 @@
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -346,6 +348,48 @@
}
}
+ /**
+ * @param fileSpec
+ * @param offset
+ * @param pigContext
+ * @return SeekableInputStream
+ * @throws IOException
+ *
+ * This is an overloaded version of open where there is a need to seek in
stream. Currently seek is supported
+ * only in file, not in directory or glob.
+ */
+ static public SeekableInputStream open(String fileSpec, long offset,
PigContext pigContext) throws IOException {
+
+ init(pigContext);
+ fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
+
+ ElementDescriptor elem;
+ if (!fileSpec.startsWith(LOCAL_PREFIX))
+ elem = pigContext.getDfs().asElement(fullPath(fileSpec,
pigContext));
+
+ else{
+ fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
+ elem = pigContext.getLfs().asElement(fullPath(fileSpec,
pigContext));
+ }
+
+ if (elem.exists() &&
(!elem.getDataStorage().isContainer(elem.toString()))) {
+ try {
+ if (elem.systemElement())
+ throw new IOException ("Attempt is made to open system
file " + elem.toString());
+
+ SeekableInputStream sis = elem.sopen();
+ sis.seek(offset, FLAGS.SEEK_SET);
+ return sis;
+ }
+ catch (DataStorageException e) {
+ throw WrappedIOException.wrap("Failed to determine if elem=" +
elem + " is container", e);
+ }
+ }
+ // Either a directory or a glob.
+ else
+ throw new IOException("Currently seek is supported only in a file,
not in glob or directory.");
+ }
+
static public OutputStream create(String fileSpec, PigContext pigContext)
throws IOException{
return create(fileSpec,false,pigContext);
}