otterc commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r534352437



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -877,7 +886,7 @@ void closeAllFiles() {
       }

Review comment:
       Yes, but I think truncating the data file twice while finalizing is 
unnecessary. Also we can't simply ignore the exceptions while truncating all 
these 3 files during close. I think truncation of these files should be 
separated from `closeAllFiles`.
   This is the change I have made:
   ```
     public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
        ...
         while (partitionsIter.hasNext()) {
           AppShufflePartitionInfo partition = partitionsIter.next();
           synchronized (partition) {
             try {
               partition.finalizePartition();
               bitmaps[idx] = partition.mapTracker;
               reduceIds[idx] = partition.reduceId;
               sizes[idx++] = partition.getPosition();
             } catch (IOException ioe) {
               logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
                 msg.shuffleId, partition.reduceId, ioe);
             } finally {
               partition.closeAllFiles();
               // The partition should be removed after the files are written 
so that any new stream
               // for the same reduce partition will see that the data file 
exists.
               partitionsIter.remove();
             }
           }
         }
         mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps, reduceIds, 
sizes);
       }
       partitions.remove(appShuffleId);
       logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
       return mergeStatuses;
     }
   
      private void finalizePartition() throws IOException {
         if (position != lastChunkOffset) {
           try {
             updateChunkInfo(position, lastMergedMapIndex);
           } catch (IOException ioe) {
             if (seekFailed) {
               // If seek has failed then fail finalize of this partition by 
propagating the exception.
               throw ioe;
             }
             // If seek hasn't failed then ignore this exception.
           }
         }
         // Get rid of any partial block data at the end of the file. This 
could either
         // be due to failure or a request still being processed when the 
shuffle
         // merge gets finalized.
         dataChannel.truncate(lastChunkOffset);
         metaFile.setLength(metaFile.getFilePointer());
         indexFile.setLength(indexFile.getFilePointer());
       }
   ```
   Any exception while truncating data/meta/index file during finalize will 
propagate the exception to the caller and that partition will not be considered 
merged.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to