Victsm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r661791974
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
##########
@@ -222,7 +223,7 @@ public void sendMergedBlockMetaReq(
handler.addRpcRequest(requestId, callback);
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(
- new MergedBlockMetaRequest(requestId, appId, shuffleId,
reduceId)).addListener(listener);
+ new MergedBlockMetaRequest(requestId, appId, shuffleId,
shuffleSequenceId, reduceId)).addListener(listener);
Review comment:
Discussed with @venkata91 on this issue again. One thing I think that we
didn't all realize before is the implication of introducing this shuffle
sequence ID. Before introducing this sequence ID, a block push request is
identifying the partition it needs to be merged into uniquely with <appID,
shuffleID, partitionID>. Similarly, the shuffle service is uniquely identifying
the location of the latest merged shuffle file for a given partition using
<appID, shuffleID, partitionID>. So the information passed in the block push
request is sufficient for the shuffle service to identify the latest merged
shuffle file for that shuffle partition on its node.
Now with shuffle sequence ID, that changes the things. A block push request
(for an indeterminate shuffle map stage) is using <appID, shuffleID,
shuffleSeqID, partitionID> to uniquely identify the merged shuffle file that it
needs to be merged into. However, the shuffleSeqID passed in the block push
request might not be the latest shuffleSeqID tracked by the shuffle service for
the latest merged shuffle file for the given shuffle partition. The seqID in
the block push request could be smaller than the latest seqID tracked by ESS
(if the task is a dangling task from a failed previous attempt) or larger (if
the task is from a newer attempt of the indeterminate shuffle map stage). When
the latest shuffleSeqID tracked by ESS is still present in memory, there isn't
much of a problem. However, when this in-memory metadata is cleaned after merge
finalization, we would encounter issues.
In scenario where a dangling task from a failed 1st stage attempt is still
pushing blocks after the 2nd stage attempt gets merge finalized, or 1st stage
attempt gets merge finalized but downstream reduce stage experiences shuffle
fetch failures leading to retry of the stage and tasks from 2nd stage attempt
pushing blocks, the shuffleSeqID passed in the block push request would be
either smaller or larger than the shuffleSeqID corresponding to the latest
merged shuffle file maintained on the ESS node. Since the metadata about what's
the latest shuffleSeqID for a previously merge finalized shuffle is already
cleaned due to merge finalization, we would have the following options:
1) We maintain a separate data structure to keep track of the latest
shuffleSeqID encountered so far for a given shuffle ID for each ESS, and this
information is not deleted during merge finalization. The concern with this
approach is that, we would need to access this separate data structure during
both handling of block push and merge finalization, and this could lead to the
same concurrency issue we saw earlier with handling multiple app attempts.
2) We rely on the merged shuffle files to tell what's the latest
shuffleSeqID for a given shuffle ID encountered by the ESS so far. This would
require searching the local files for a given appID, shuffleID, partition ID
with various shuffle sequence ID. However, there does not seem to be a clear
upper bound for when to stop this search. If the block push request has a
sequence ID of 2, it is possible that the ESS does not hold merged shuffle
files for sequence ID 1, 2, 3 but holds merged shuffle files for sequence ID 4.
3) We do not care what's the latest sequence ID encountered by a given ESS.
Instead we keep all shuffle files for all sequence IDs encountered for a
shuffle ID on the ESS. This would then require the fetch protocol to specify
sequence ID as well, as proposed in this PR.
@otterc @mridulm @Ngone51 thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]