mridulm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r680078206



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -407,27 +408,65 @@ protected Ratio getRatio() {
       return rddAndSplitIds;
     }
 
+    /**
+     * @param blockIds Regular shuffle blockIds starts with SHUFFLE_BLOCK_ID 
to be parsed
+     * @param shuffleId shuffle blocks shuffleId
+     * @return mapId and reduceIds of the shuffle blocks in the same order as 
that of the blockIds
+     *
+     * Regular shuffle blocks format should be blockIdParts[1] = shuffleId,
+     * blockIdParts[2] = mapId, blockIdParts[3] = reduceId
+     */
     private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
-      // For regular shuffle blocks, primaryId is mapId and secondaryIds are 
reduceIds.
-      // For shuffle chunks, primaryIds is reduceId and secondaryIds are 
chunkIds.
-      final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
+      final int[] mapIdAndReduceIds = new int[2 * blockIds.length];
       for (int i = 0; i < blockIds.length; i++) {
         String[] blockIdParts = blockIds[i].split("_");
-        if (blockIdParts.length != 4
-          || (!requestForMergedBlockChunks && 
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID))
-          || (requestForMergedBlockChunks && 
!blockIdParts[0].equals(SHUFFLE_CHUNK_ID))) {
+        if (blockIdParts.length != 4 || 
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID)) {
           throw new IllegalArgumentException("Unexpected shuffle block id 
format: " + blockIds[i]);
         }
         if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
           throw new IllegalArgumentException("Expected shuffleId=" + shuffleId 
+
             ", got:" + blockIds[i]);
         }
-        // For regular blocks, blockIdParts[2] is mapId. For chunks, it is 
reduceId.
-        primaryIdAndSecondaryIds[2 * i] = Integer.parseInt(blockIdParts[2]);
-        // For regular blocks, blockIdParts[3] is reduceId. For chunks, it is 
chunkId.
-        primaryIdAndSecondaryIds[2 * i + 1] = 
Integer.parseInt(blockIdParts[3]);
+        // mapId
+        mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
+        // reduceId
+        mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
       }
-      return primaryIdAndSecondaryIds;
+      return mapIdAndReduceIds;
+    }
+
+    /**
+     * @param blockIds Shuffle merged chunks starts with SHUFFLE_CHUNK_ID to 
be parsed
+     * @param shuffleId shuffle blocks shuffleId
+     * @param shuffleMergeId shuffleMergeId is used to uniquely identify 
merging process
+     *                       of shuffle by an indeterminate stage attempt.
+     * @return reduceId and chunkIds of the shuffle chunks in the same order 
as that of the
+     *         blockIds
+     *
+     * Shuffle merged chunks format should be blockIdParts[1] = shuffleId,
+     * blockIdParts[2] = shuffleMergeId, blockIdParts[3] = reduceId, 
blockIdParts[4] = chunkId

Review comment:
       Small change to @Ngone51's comment: 
shuffleChunk_$shuffleId_$shuffleMergeId_$reduceId_$chunkId might be better ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to