venkata91 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r661972672



##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
##########
@@ -222,7 +223,7 @@ public void sendMergedBlockMetaReq(
     handler.addRpcRequest(requestId, callback);
     RpcChannelListener listener = new RpcChannelListener(requestId, callback);
     channel.writeAndFlush(
-      new MergedBlockMetaRequest(requestId, appId, shuffleId, 
reduceId)).addListener(listener);
+      new MergedBlockMetaRequest(requestId, appId, shuffleId, 
shuffleSequenceId, reduceId)).addListener(listener);

Review comment:
       After having another discussion with @Victsm, now it is clear on how we 
want to proceed. Following are the changes that needs to be added:
   1. Pass `shuffleSequenceId` as part of push, fetch and finalize protocol - 
seems like we are all in agreement with this change now.
   2. Keep track of `shuffleId -> shuffleSequenceId (latest attempt)` mapping 
as part of the existing Map we have by adding an additional level for 
`shuffleSequenceId` -  `Map<ShuffleId, Map<ShuffleSequenceId, Map<PartitionId, 
AppShufflePartitionInfo>>>` With this throughout the lifecycle of the 
application, we will keep track of the latest `shuffleId -> shuffleSequenceId` 
mapping, this way we can reject the push/fetch requests made for a older 
shuffleSequenceId for a shuffleId. Also as part of the finalization request for 
an `<appid, shuffleId, shuffleSequenceId>, we can simply close all the files 
tracked for the `shuffleId, shuffleSequenceId` and remove this metadata but 
still keep the `shuffleSequenceId` key around. This is required for rejecting 
invalid push/fetch requests.
   3. Whenever a push request is received for a `<shuffleId, 
shuffleSequenceId>` which is greater than the current one tracked as part of 
the metadata, close/clean up all the files for the older attempt in an async 
manner. This way the stale attempts shuffle data can be gradually cleaned.
   
   Given that we are all now convinced on the protocol changes required, I'm 
planning to separate out the protocol changes and the implementation detail 
into separate PR so that we can get the protocol changes PR merged before 3.2 
branch cut. Thoughts @mridulm @Ngone51 @Victsm @otterc ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to