mridulm commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r535492414



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -825,45 +881,79 @@ void resetChunkTracker() {
      * @param mapIndex the map index to be added to chunk tracker.
      */
     void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
-      long idxStartPos = -1;
       try {
-        // update the chunk tracker to meta file before index file
-        writeChunkTracker(mapIndex);
-        idxStartPos = indexFile.getFilePointer();
         logger.trace("{} shuffleId {} reduceId {} updated index current {} 
updated {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset,
-          chunkOffset);
-        indexFile.writeLong(chunkOffset);
-      } catch (IOException ioe) {
-        if (idxStartPos != -1) {
-          // reset the position to avoid corrupting index files during 
exception.
-          logger.warn("{} shuffleId {} reduceId {} reset index to position {}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos);
-          indexFile.seek(idxStartPos);
+          appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset, chunkOffset);
+        if (indexMetaUpdateFailed) {
+          indexFile.getChannel().position(indexFile.getPos());
         }
+        indexFile.getDos().writeLong(chunkOffset);
+        // Chunk bitmap should be written to the meta file after the index 
file because if there are
+        // any exceptions during writing the offset to the index file, meta 
file should not be
+        // updated. If the update to the index file is successful but the 
update to meta file isn't
+        // then the index file position is reset in the catch clause.
+        writeChunkTracker(mapIndex);
+        indexFile.updatePos(8);
+        this.lastChunkOffset = chunkOffset;
+        indexMetaUpdateFailed = false;
+      } catch (IOException ioe) {
+        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", 
appShuffleId.appId,
+          appShuffleId.shuffleId, reduceId);
+        indexMetaUpdateFailed = true;
+        // Any exception here is propagated to the caller and the caller can 
decide whether to
+        // abort or not.
         throw ioe;
       }
-      this.lastChunkOffset = chunkOffset;
     }
 
     private void writeChunkTracker(int mapIndex) throws IOException {
       if (mapIndex == -1) {
         return;
       }
       chunkTracker.add(mapIndex);
-      long metaStartPos = metaFile.getFilePointer();
       try {
         logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to 
meta file",
           appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
-        chunkTracker.serialize(metaFile);
+        if (indexMetaUpdateFailed) {
+          metaFile.getChannel().position(metaFile.getPos());
+        }
+        int chunkTrackerSize = chunkTracker.serializedSizeInBytes();
+        chunkTracker.serialize(metaFile.getDos());
+        metaFile.updatePos(chunkTrackerSize);
       } catch (IOException ioe) {
-        logger.warn("{} shuffleId {} reduceId {} mapIndex {} reset position of 
meta file to {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex, 
metaStartPos);
-        metaFile.seek(metaStartPos);
+        logger.warn("{} shuffleId {} reduceId {} mapIndex {} update to meta 
file failed",

Review comment:
       We are logging this multiple times, remove from here and rely on caller 
to log

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -825,45 +881,79 @@ void resetChunkTracker() {
      * @param mapIndex the map index to be added to chunk tracker.
      */
     void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
-      long idxStartPos = -1;
       try {
-        // update the chunk tracker to meta file before index file
-        writeChunkTracker(mapIndex);
-        idxStartPos = indexFile.getFilePointer();
         logger.trace("{} shuffleId {} reduceId {} updated index current {} 
updated {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset,
-          chunkOffset);
-        indexFile.writeLong(chunkOffset);
-      } catch (IOException ioe) {
-        if (idxStartPos != -1) {
-          // reset the position to avoid corrupting index files during 
exception.
-          logger.warn("{} shuffleId {} reduceId {} reset index to position {}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos);
-          indexFile.seek(idxStartPos);
+          appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset, chunkOffset);
+        if (indexMetaUpdateFailed) {
+          indexFile.getChannel().position(indexFile.getPos());
         }
+        indexFile.getDos().writeLong(chunkOffset);
+        // Chunk bitmap should be written to the meta file after the index 
file because if there are
+        // any exceptions during writing the offset to the index file, meta 
file should not be
+        // updated. If the update to the index file is successful but the 
update to meta file isn't
+        // then the index file position is reset in the catch clause.
+        writeChunkTracker(mapIndex);
+        indexFile.updatePos(8);
+        this.lastChunkOffset = chunkOffset;
+        indexMetaUpdateFailed = false;
+      } catch (IOException ioe) {
+        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", 
appShuffleId.appId,
+          appShuffleId.shuffleId, reduceId);
+        indexMetaUpdateFailed = true;
+        // Any exception here is propagated to the caller and the caller can 
decide whether to
+        // abort or not.
         throw ioe;
       }
-      this.lastChunkOffset = chunkOffset;
     }
 
     private void writeChunkTracker(int mapIndex) throws IOException {
       if (mapIndex == -1) {
         return;
       }
       chunkTracker.add(mapIndex);
-      long metaStartPos = metaFile.getFilePointer();
       try {
         logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to 
meta file",
           appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
-        chunkTracker.serialize(metaFile);
+        if (indexMetaUpdateFailed) {
+          metaFile.getChannel().position(metaFile.getPos());
+        }
+        int chunkTrackerSize = chunkTracker.serializedSizeInBytes();
+        chunkTracker.serialize(metaFile.getDos());
+        metaFile.updatePos(chunkTrackerSize);

Review comment:
       Use `fileChannel.position()` to find position after write to find 
`chunkTrackerSize` and avoid `serializedSizeInBytes`

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -825,45 +881,79 @@ void resetChunkTracker() {
      * @param mapIndex the map index to be added to chunk tracker.
      */
     void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
-      long idxStartPos = -1;
       try {
-        // update the chunk tracker to meta file before index file
-        writeChunkTracker(mapIndex);
-        idxStartPos = indexFile.getFilePointer();
         logger.trace("{} shuffleId {} reduceId {} updated index current {} 
updated {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset,
-          chunkOffset);
-        indexFile.writeLong(chunkOffset);
-      } catch (IOException ioe) {
-        if (idxStartPos != -1) {
-          // reset the position to avoid corrupting index files during 
exception.
-          logger.warn("{} shuffleId {} reduceId {} reset index to position {}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos);
-          indexFile.seek(idxStartPos);
+          appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset, chunkOffset);
+        if (indexMetaUpdateFailed) {
+          indexFile.getChannel().position(indexFile.getPos());
         }
+        indexFile.getDos().writeLong(chunkOffset);
+        // Chunk bitmap should be written to the meta file after the index 
file because if there are
+        // any exceptions during writing the offset to the index file, meta 
file should not be
+        // updated. If the update to the index file is successful but the 
update to meta file isn't
+        // then the index file position is reset in the catch clause.
+        writeChunkTracker(mapIndex);
+        indexFile.updatePos(8);
+        this.lastChunkOffset = chunkOffset;
+        indexMetaUpdateFailed = false;
+      } catch (IOException ioe) {
+        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", 
appShuffleId.appId,
+          appShuffleId.shuffleId, reduceId);
+        indexMetaUpdateFailed = true;
+        // Any exception here is propagated to the caller and the caller can 
decide whether to
+        // abort or not.
         throw ioe;
       }
-      this.lastChunkOffset = chunkOffset;
     }
 
     private void writeChunkTracker(int mapIndex) throws IOException {
       if (mapIndex == -1) {
         return;
       }
       chunkTracker.add(mapIndex);
-      long metaStartPos = metaFile.getFilePointer();
       try {
         logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to 
meta file",
           appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
-        chunkTracker.serialize(metaFile);
+        if (indexMetaUpdateFailed) {
+          metaFile.getChannel().position(metaFile.getPos());
+        }
+        int chunkTrackerSize = chunkTracker.serializedSizeInBytes();
+        chunkTracker.serialize(metaFile.getDos());
+        metaFile.updatePos(chunkTrackerSize);
       } catch (IOException ioe) {
-        logger.warn("{} shuffleId {} reduceId {} mapIndex {} reset position of 
meta file to {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex, 
metaStartPos);
-        metaFile.seek(metaStartPos);
+        logger.warn("{} shuffleId {} reduceId {} mapIndex {} update to meta 
file failed",
+          appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
+        // Any exceptions while writing to meta file is propagated to 
updateChunkInfo so that
+        // the index file position can be reset as well.
         throw ioe;
       }
     }
 
+    private void incrementIOExceptions() {
+      numIOExceptions++;
+    }
+
+    private boolean shouldAbort(int ioExceptionsThresholdDuringMerge) {
+      return numIOExceptions > ioExceptionsThresholdDuringMerge;
+    }
+
+    private void finalizePartition() throws IOException {
+      if (dataFilePos != lastChunkOffset) {
+        try {
+          updateChunkInfo(dataFilePos, lastMergedMapIndex);
+        } catch (IOException ioe) {
+          // Any exceptions here while updating the meta files can be ignored. 
If the files
+          // aren't successfully updated they will be truncated.
+        }
+      }
+      // Get rid of any partial block data at the end of the file. This could 
either
+      // be due to failure, or a request still being processed when the shuffle
+      // merge gets finalized, or any exceptions while updating index/meta 
files.
+      dataChannel.truncate(lastChunkOffset);
+      indexFile.getChannel().truncate(indexFile.getPos());
+      metaFile.getChannel().truncate(metaFile.getPos());

Review comment:
       Do we want to ignore exceptions here while doing truncate (for each 
call) ? This is best case effort without impact on functionality in case there 
is unread suffix data, right ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -369,27 +388,19 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
       mergeStatuses =
         new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new 
long[0]);
     } else {
-      Collection<AppShufflePartitionInfo> partitionsToFinalize = 
shufflePartitions.values();
-      int totalPartitions = partitionsToFinalize.size();
-      RoaringBitmap[] bitmaps = new RoaringBitmap[totalPartitions];
-      int[] reduceIds = new int[totalPartitions];
-      long[] sizes = new long[totalPartitions];
-      Iterator<AppShufflePartitionInfo> partitionsIter = 
partitionsToFinalize.iterator();
-      int idx = 0;
+      List<RoaringBitmap> bitmaps = new LinkedList<>();
+      List<Integer> reduceIds = new LinkedList<>();
+      List<Long> sizes = new LinkedList<>();

Review comment:
       nit: Use `ArrayList` instead of `LinkedList` (and size it to 
`shufflePartitions.size()`).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to