Victsm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r661791974



##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
##########
@@ -222,7 +223,7 @@ public void sendMergedBlockMetaReq(
     handler.addRpcRequest(requestId, callback);
     RpcChannelListener listener = new RpcChannelListener(requestId, callback);
     channel.writeAndFlush(
-      new MergedBlockMetaRequest(requestId, appId, shuffleId, 
reduceId)).addListener(listener);
+      new MergedBlockMetaRequest(requestId, appId, shuffleId, 
shuffleSequenceId, reduceId)).addListener(listener);

Review comment:
       Discussed with @venkata91 on this issue again. One thing I think that we 
didn't all realize before is the implication of introducing this shuffle 
sequence ID. Before introducing this sequence ID, a block push request is 
identifying the partition it needs to be merged into uniquely with <appID, 
shuffleID, partitionID>. Similarly, the shuffle service is uniquely identifying 
the location of the latest merged shuffle file for a given partition using 
<appID, shuffleID, partitionID>. So the information passed in the block push 
request is sufficient for the shuffle service to identify the latest merged 
shuffle file for that shuffle partition on its node.
   
   Now with shuffle sequence ID, that changes the things. A block push request 
(for an indeterminate shuffle map stage) is using <appID, shuffleID, 
shuffleSeqID, partitionID> to uniquely identify the merged shuffle file that it 
needs to be merged into. However, the shuffleSeqID passed in the block push 
request might not be the latest shuffleSeqID tracked by the shuffle service for 
the latest merged shuffle file for the given shuffle partition. The seqID in 
the block push request could be smaller than the latest seqID tracked by ESS 
(if the task is a dangling task from a failed previous attempt) or larger (if 
the task is from a newer attempt of the indeterminate shuffle map stage). When 
the latest shuffleSeqID tracked by ESS is still present in memory, there isn't 
much of a problem. However, when this in-memory metadata is cleaned after merge 
finalization, we would encounter issues.
   
   In scenario where a dangling task from a failed 1st stage attempt is still 
pushing blocks after the 2nd stage attempt gets merge finalized, or 1st stage 
attempt gets merge finalized but downstream reduce stage experiences shuffle 
fetch failures leading to retry of the stage and tasks from 2nd stage attempt 
pushing blocks, the shuffleSeqID passed in the block push request would be 
either smaller or larger than the shuffleSeqID corresponding to the latest 
merged shuffle file maintained on the ESS node. Since the metadata about what's 
the latest shuffleSeqID for a previously merge finalized shuffle is already 
cleaned due to merge finalization, we would have the following options:
   1) We maintain a separate data structure to keep track of the latest 
shuffleSeqID encountered so far for a given shuffle ID for each ESS, and this 
information is not deleted during merge finalization. The concern with this 
approach is that, we would need to access this separate data structure during 
both handling of block push and merge finalization, and this could lead to the 
same concurrency issue we saw earlier with handling multiple app attempts.
   2) We rely on the merged shuffle files to tell what's the latest 
shuffleSeqID for a given shuffle ID encountered by the ESS so far. This would 
require searching the local files for a given appID, shuffleID, partition ID 
with various shuffle sequence ID. However, there does not seem to be a clear 
upper bound for when to stop this search. If the block push request has a 
sequence ID of 2, it is possible that the ESS does not hold merged shuffle 
files for sequence ID 1, 2, 3 but holds merged shuffle files for sequence ID 4.
   3) We do not care what's the latest sequence ID encountered by a given ESS. 
Instead we keep all shuffle files for all sequence IDs encountered for a 
shuffle ID on the ESS. This would then require the fetch protocol to specify 
sequence ID as well, as proposed in this PR.
   
   @otterc @mridulm @Ngone51 thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to