[ 
https://issues.apache.org/jira/browse/PIG-645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pradeep Kamath updated PIG-645:
-------------------------------

        Fix Version/s: types_branch
    Affects Version/s: types_branch
               Status: Patch Available  (was: Open)

Attached patch to fix the issues with streaming. The root cause of the issue 
was the changes introduced by PIG-629 (PERFORMANCE: Eliminate use of 
TargetedTuple for each input tuple in the map()) caused a race condition on the 
input tuple in the map() between recordReader.next(Tuple value) and the 
streaming binary.

BEFORE PIG-629, the flow of a tuple from record reader to map was as follows:
The recordReader instance gets the *same* TargetedTuple object reference in 
every next(TargetedTuple value) call (this is because Hadoop reuses the value 
object for each recordReader.next(value) call). The recordReader.next(value) 
call inturn calls PigSlice.next(Tuple value) which has the following 
implementation:
{code}
public boolean next(Tuple value) throws IOException {
        Tuple t = loader.getNext();
        if (t == null) {
            return false;
        }
        value.reference(t);
        return true;
    }
{code}
Here value.reference(t) calls the TargetedTuple.reference(Tuple) method which 
simply stores the supplied the tuple in its member Tuple variable "t". 

In PigMapBase.map(), the toTuple() method on the input TargetedTuple is called 
which returns the above store tuple reference "t". This reference is then 
attached to the roots of the map plan. 

The point to note is this final tuple reference which is used by the operators 
in the map plan is the reference to the tuple returned from the loader and not 
the reference to the TargetedTuple which we get from the recordReader and which 
is supplied as an argument to the map() call. The loader creates a new tuple 
reference on each getNext(). This guarantees that the operators in the map plan 
always work with a differnt tuple reference on each map() call though the 
TargetedTuple reference supplied in the map() is the same and reused by Hadoop.

AFTER PIG-629, the flow changed as follows:
TargetedTuple was removed and Tuple was used instead. The PigSlice.next(Tuple 
value) code remained intact. However DefaultTuple.reference(Tuple) call in it 
assigns the internal mFields arraylist to the arraylist of the supplied tuple. 
Note that here the internal member arraylist of the DefaultTuple is changed to 
"refer" to the internal arraylist of the Tuple the loader gives. 
In map(), the tuple which is supplied as input argument to the map() call is 
attached directly to the roots. So in the case of streaming, this tuple is 
finally supplied to the binary by using a storage function (PigStorage by 
default). However this tuple refernce is the same as the one which gets reused 
by hadoop in the next recordReader.next(value) call. So while the storage 
function is in the process of writing the current Tuple's contents (the mFields 
arraylist), it can get changed underneath due to recordReader.next(value) call. 
So unless the storage functions writes to the binary's stdin BEFORE the next 
recordReader.next(value) call, the input sent to the Binary will be garbled.

The fix is the following one line change:
{noformat}
         for (PhysicalOperator root : roots) {
-            root.attachInput(inpTuple);
+            root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
         }

{noformat}

In map(), instead of attaching the inpTuple directly to the roots of the plan, 
a new Tuple is created which refers to the same mFields arrayList as in 
inpTuple. With this change, all operators in the map plan, now work on a 
different Tuple reference from the one which is supplied in the map() argument 
(and which is reused by Hadoop). This reference will refer to the mFields of 
the Tuple returned from the loader which is guaranteed to be a new arraylist 
for each input tuple since the loader creates a new Tuple each time. 

> Streaming is broken with the latest trunk
> -----------------------------------------
>
>                 Key: PIG-645
>                 URL: https://issues.apache.org/jira/browse/PIG-645
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: types_branch
>            Reporter: Olga Natkovich
>            Assignee: Pradeep Kamath
>             Fix For: types_branch
>
>         Attachments: PIG-645.patch
>
>
> Several tests we run are failing now

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to