otterc commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r656760036



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -498,7 +501,7 @@ public boolean hasNext() {
     @Override
     public ManagedBuffer next() {
       ManagedBuffer block = 
Preconditions.checkNotNull(mergeManager.getMergedBlockData(
-        appId, shuffleId, reduceIds[reduceIdx], 
chunkIds[reduceIdx][chunkIdx]));
+        appId, shuffleId, shuffleSequenceId, reduceIds[reduceIdx], 
chunkIds[reduceIdx][chunkIdx]));

Review comment:
       Same here

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -476,12 +477,14 @@ public ManagedBuffer next() {
 
     private final String appId;
     private final int shuffleId;
+    private final int shuffleSequenceId;
     private final int[] reduceIds;
     private final int[][] chunkIds;
 
     ShuffleChunkManagedBufferIterator(FetchShuffleBlockChunks msg) {
       appId = msg.appId;
       shuffleId = msg.shuffleId;
+      shuffleSequenceId = msg.shuffleSequenceId;

Review comment:
       Same here.

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/MergedBlockMetaRequest.java
##########
@@ -32,13 +32,20 @@
   public final long requestId;
   public final String appId;
   public final int shuffleId;
+  public final int shuffleSequenceId;
   public final int reduceId;
 
-  public MergedBlockMetaRequest(long requestId, String appId, int shuffleId, 
int reduceId) {
+  public MergedBlockMetaRequest(
+      long requestId,
+      String appId,
+      int shuffleId,
+      int shuffleSequenceId,

Review comment:
       Same here. Why do we need to modify the fetch side requests?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -156,26 +157,31 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
   @VisibleForTesting
   AppShufflePartitionInfo newAppShufflePartitionInfo(
       AppShuffleId appShuffleId,
+      int shuffleSequenceId,
       int reduceId,
       File dataFile,
       File indexFile,
       File metaFile) throws IOException {
-    return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile,
+    return new AppShufflePartitionInfo(appShuffleId, shuffleSequenceId, 
reduceId, dataFile,
       new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
   }
 
   @Override
-  public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int 
reduceId) {
+  public MergedBlockMeta getMergedBlockMeta(
+      String appId,
+      int shuffleId,
+      int shuffleSequenceId,
+      int reduceId) {
     AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
-    File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
+    File indexFile = getMergedShuffleIndexFile(appShuffleId, 
shuffleSequenceId, reduceId);

Review comment:
       It seems you are changing the fetch side protocols so that you can 
figure out the `shuffleSequenceId` here to find which files to use. I don't 
think we should change the fetch side protocols if it's just for this reason.
   Instead there should be some logic in the {{RemoteBlockPushResolver}} to 
know which is the latest shuffle sequence Id 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to