[ 
https://issues.apache.org/jira/browse/PIG-934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12748070#action_12748070
 ] 

Pradeep Kamath commented on PIG-934:
------------------------------------

To get an idea of how this seeking in case of regular loads in map tasks, I 
looked at PigSlice.java, the seek happens in the init() code before bindTo():
{code}
public void init(DataStorage base) throws IOException {
        ..

        fsis = base.asElement(base.getActiveContainer(), file).sopen();

        fsis.seek(start, FLAGS.SEEK_CUR);

 
        end = start + getLength();


        if (file.endsWith(".bz") || file.endsWith(".bz2")) {

            is = new CBZip2InputStream(fsis, 9);

        } else if (file.endsWith(".gz")) {

            is = new GZIPInputStream(fsis);

            // We can't tell how much of the underlying stream GZIPInputStream

            // has actually consumed

            end = Long.MAX_VALUE;

        } else {

            is = fsis;

        }

        loader.bindTo(file.toString(), new BufferedPositionedInputStream(is,

                start), start, end);

    }
{code}
    
I think we need a FileLocalizer.sOpenSingleFile() method which can return a 
SeekableInputStream and we can use that in setup() in POLoad.
Something along the lines of :
{code}
static public InputStream open(String fileSpec, PigContext pigContext) throws 
IOException {
        fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
        if (!fileSpec.startsWith(LOCAL_PREFIX)) {
            init(pigContext);
            ElementDescriptor elem = 
pigContext.getDfs().asElement(fullPath(fileSpec, pigContext));
            return elem.sopen();
        }
        else {
            fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
            //buffering because we only want buffered streams to be passed to 
load functions.
            /*return new BufferedInputStream(new FileInputStream(fileSpec));*/
            init(pigContext);
            ElementDescriptor elem = 
pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
            return elem.sopen;
        }
    }

{code}
 The above code would only work with single files and not dirs which should be 
ok for merge join. We should probably set this up with a new constructor in 
POLoad which also indicates that a single file is being processed.



> Merge join implementation currently does not seek to right point on the right 
> side input based on the offset provided by the index
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: PIG-934
>                 URL: https://issues.apache.org/jira/browse/PIG-934
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.3.1
>            Reporter: Pradeep Kamath
>
> We use POLoad to seek into right file which has the following code: 
> {noformat}
>    public void setUp() throws IOException{
>         String filename = lFile.getFileName();
>         loader = 
> (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());        
>         is = FileLocalizer.open(filename, pc);
>         loader.bindTo(filename , new BufferedPositionedInputStream(is), 
> this.offset, Long.MAX_VALUE);
>     }
> {noformat}
> Between opening the stream and bindTo we do not seek to the right offset. 
> bindTo itself does not perform any seek.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to