venkata91 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r661972672
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
##########
@@ -222,7 +223,7 @@ public void sendMergedBlockMetaReq(
handler.addRpcRequest(requestId, callback);
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(
- new MergedBlockMetaRequest(requestId, appId, shuffleId,
reduceId)).addListener(listener);
+ new MergedBlockMetaRequest(requestId, appId, shuffleId,
shuffleSequenceId, reduceId)).addListener(listener);
Review comment:
After having another discussion with @Victsm, now it is clear on how we
want to proceed. Following are the changes that needs to be added:
1. Pass `shuffleSequenceId` as part of push, fetch and finalize protocol -
seems like we are all in agreement with this change now.
2. Keep track of `shuffleId -> shuffleSequenceId (latest attempt)` mapping
as part of the existing Map we have by adding an additional level for
`shuffleSequenceId` - `Map<ShuffleId, Map<ShuffleSequenceId, Map<PartitionId,
AppShufflePartitionInfo>>>` With this throughout the lifecycle of the
application, we will keep track of the latest `shuffleId -> shuffleSequenceId`
mapping, this way we can reject the push/fetch requests made for a older
shuffleSequenceId for a shuffleId. Also as part of the finalization request for
an `<appid, shuffleId, shuffleSequenceId>, we can simply close all the files
tracked for the `shuffleId, shuffleSequenceId` and remove this metadata but
still keep the `shuffleSequenceId` key around. This is required for rejecting
invalid push/fetch requests.
3. Whenever a push request is received for a `<shuffleId,
shuffleSequenceId>` which is greater than the current one tracked as part of
the metadata, close/clean up all the files for the older attempt in an async
manner. This way the stale attempts shuffle data can be gradually cleaned.
Given that we are all now convinced on the protocol changes required, I'm
planning to separate out the protocol changes and the implementation detail
into separate PR so that we can get the protocol changes PR merged before 3.2
branch cut. Thoughts @mridulm @Ngone51 @Victsm @otterc ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]