Author: pradeepkth
Date: Wed Sep  2 01:13:04 2009
New Revision: 810327

URL: http://svn.apache.org/viewvc?rev=810327&view=rev
Log:
PIG-934: Merge join implementation currently does not seek to right point on 
the right side input based on the offset provided by the index (ashutoshc via 
pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=810327&r1=810326&r2=810327&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Sep  2 01:13:04 2009
@@ -59,7 +59,11 @@
 PIG-792: skew join implementation (sriranjan via olgan)
 
 BUG FIXES
-    
+
+    PIG-934: Merge join implementation currently does not seek to right point
+    on the right side input based on the offset provided by the index
+    (ashutoshc via pradeepkth)
+
     PIG-925: Fix join in local mode (daijy)
 
     PIG-913: Error in Pig script when grouping on chararray column (daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=810327&r1=810326&r2=810327&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
 Wed Sep  2 01:13:04 2009
@@ -100,7 +100,7 @@
         String filename = lFile.getFileName();
         loader = 
(LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
         
-        is = FileLocalizer.open(filename, pc);
+        is = (this.offset == 0) ? FileLocalizer.open(filename, pc) : 
FileLocalizer.open(filename, this.offset,pc);
         
         loader.bindTo(filename , new BufferedPositionedInputStream(is), 
this.offset, Long.MAX_VALUE);
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=810327&r1=810326&r2=810327&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Wed Sep  2 
01:13:04 2009
@@ -41,6 +41,8 @@
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -346,6 +348,48 @@
         }
     }
     
+    /**
+     * @param fileSpec
+     * @param offset
+     * @param pigContext
+     * @return SeekableInputStream
+     * @throws IOException
+     * 
+     * This is an overloaded version of open where there is a need to seek in 
stream. Currently seek is supported
+     * only in file, not in directory or glob.
+     */
+    static public SeekableInputStream open(String fileSpec, long offset, 
PigContext pigContext) throws IOException {
+        
+        init(pigContext);
+        fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
+        
+        ElementDescriptor elem;
+        if (!fileSpec.startsWith(LOCAL_PREFIX)) 
+            elem = pigContext.getDfs().asElement(fullPath(fileSpec, 
pigContext));
+                
+        else{
+            fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
+            elem = pigContext.getLfs().asElement(fullPath(fileSpec, 
pigContext));            
+        }
+        
+        if (elem.exists() && 
(!elem.getDataStorage().isContainer(elem.toString()))) {
+            try {
+                if (elem.systemElement())
+                    throw new IOException ("Attempt is made to open system 
file " + elem.toString());
+                
+                SeekableInputStream sis = elem.sopen();
+                sis.seek(offset, FLAGS.SEEK_SET);
+                return sis;
+            }
+            catch (DataStorageException e) {
+                throw WrappedIOException.wrap("Failed to determine if elem=" + 
elem + " is container", e);
+            }
+        }
+        // Either a directory or a glob.
+        else
+            throw new IOException("Currently seek is supported only in a file, 
not in glob or directory.");
+    }
+    
     static public OutputStream create(String fileSpec, PigContext pigContext) 
throws IOException{
         return create(fileSpec,false,pigContext);
     }


Reply via email to