[
https://issues.apache.org/jira/browse/PIG-645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pradeep Kamath updated PIG-645:
-------------------------------
Fix Version/s: types_branch
Affects Version/s: types_branch
Status: Patch Available (was: Open)
Attached patch to fix the issues with streaming. The root cause of the issue
was the changes introduced by PIG-629 (PERFORMANCE: Eliminate use of
TargetedTuple for each input tuple in the map()) caused a race condition on the
input tuple in the map() between recordReader.next(Tuple value) and the
streaming binary.
BEFORE PIG-629, the flow of a tuple from record reader to map was as follows:
The recordReader instance gets the *same* TargetedTuple object reference in
every next(TargetedTuple value) call (this is because Hadoop reuses the value
object for each recordReader.next(value) call). The recordReader.next(value)
call inturn calls PigSlice.next(Tuple value) which has the following
implementation:
{code}
public boolean next(Tuple value) throws IOException {
Tuple t = loader.getNext();
if (t == null) {
return false;
}
value.reference(t);
return true;
}
{code}
Here value.reference(t) calls the TargetedTuple.reference(Tuple) method which
simply stores the supplied the tuple in its member Tuple variable "t".
In PigMapBase.map(), the toTuple() method on the input TargetedTuple is called
which returns the above store tuple reference "t". This reference is then
attached to the roots of the map plan.
The point to note is this final tuple reference which is used by the operators
in the map plan is the reference to the tuple returned from the loader and not
the reference to the TargetedTuple which we get from the recordReader and which
is supplied as an argument to the map() call. The loader creates a new tuple
reference on each getNext(). This guarantees that the operators in the map plan
always work with a differnt tuple reference on each map() call though the
TargetedTuple reference supplied in the map() is the same and reused by Hadoop.
AFTER PIG-629, the flow changed as follows:
TargetedTuple was removed and Tuple was used instead. The PigSlice.next(Tuple
value) code remained intact. However DefaultTuple.reference(Tuple) call in it
assigns the internal mFields arraylist to the arraylist of the supplied tuple.
Note that here the internal member arraylist of the DefaultTuple is changed to
"refer" to the internal arraylist of the Tuple the loader gives.
In map(), the tuple which is supplied as input argument to the map() call is
attached directly to the roots. So in the case of streaming, this tuple is
finally supplied to the binary by using a storage function (PigStorage by
default). However this tuple refernce is the same as the one which gets reused
by hadoop in the next recordReader.next(value) call. So while the storage
function is in the process of writing the current Tuple's contents (the mFields
arraylist), it can get changed underneath due to recordReader.next(value) call.
So unless the storage functions writes to the binary's stdin BEFORE the next
recordReader.next(value) call, the input sent to the Binary will be garbled.
The fix is the following one line change:
{noformat}
for (PhysicalOperator root : roots) {
- root.attachInput(inpTuple);
+ root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
}
{noformat}
In map(), instead of attaching the inpTuple directly to the roots of the plan,
a new Tuple is created which refers to the same mFields arrayList as in
inpTuple. With this change, all operators in the map plan, now work on a
different Tuple reference from the one which is supplied in the map() argument
(and which is reused by Hadoop). This reference will refer to the mFields of
the Tuple returned from the loader which is guaranteed to be a new arraylist
for each input tuple since the loader creates a new Tuple each time.
> Streaming is broken with the latest trunk
> -----------------------------------------
>
> Key: PIG-645
> URL: https://issues.apache.org/jira/browse/PIG-645
> Project: Pig
> Issue Type: Bug
> Affects Versions: types_branch
> Reporter: Olga Natkovich
> Assignee: Pradeep Kamath
> Fix For: types_branch
>
> Attachments: PIG-645.patch
>
>
> Several tests we run are failing now
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.