mridulm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r680078206
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -407,27 +408,65 @@ protected Ratio getRatio() {
return rddAndSplitIds;
}
+ /**
+ * @param blockIds Regular shuffle blockIds starts with SHUFFLE_BLOCK_ID
to be parsed
+ * @param shuffleId shuffle blocks shuffleId
+ * @return mapId and reduceIds of the shuffle blocks in the same order as
that of the blockIds
+ *
+ * Regular shuffle blocks format should be blockIdParts[1] = shuffleId,
+ * blockIdParts[2] = mapId, blockIdParts[3] = reduceId
+ */
private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
- // For regular shuffle blocks, primaryId is mapId and secondaryIds are
reduceIds.
- // For shuffle chunks, primaryIds is reduceId and secondaryIds are
chunkIds.
- final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
+ final int[] mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
- if (blockIdParts.length != 4
- || (!requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID))
- || (requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_CHUNK_ID))) {
+ if (blockIdParts.length != 4 ||
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID)) {
throw new IllegalArgumentException("Unexpected shuffle block id
format: " + blockIds[i]);
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId
+
", got:" + blockIds[i]);
}
- // For regular blocks, blockIdParts[2] is mapId. For chunks, it is
reduceId.
- primaryIdAndSecondaryIds[2 * i] = Integer.parseInt(blockIdParts[2]);
- // For regular blocks, blockIdParts[3] is reduceId. For chunks, it is
chunkId.
- primaryIdAndSecondaryIds[2 * i + 1] =
Integer.parseInt(blockIdParts[3]);
+ // mapId
+ mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
+ // reduceId
+ mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
}
- return primaryIdAndSecondaryIds;
+ return mapIdAndReduceIds;
+ }
+
+ /**
+ * @param blockIds Shuffle merged chunks starts with SHUFFLE_CHUNK_ID to
be parsed
+ * @param shuffleId shuffle blocks shuffleId
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify
merging process
+ * of shuffle by an indeterminate stage attempt.
+ * @return reduceId and chunkIds of the shuffle chunks in the same order
as that of the
+ * blockIds
+ *
+ * Shuffle merged chunks format should be blockIdParts[1] = shuffleId,
+ * blockIdParts[2] = shuffleMergeId, blockIdParts[3] = reduceId,
blockIdParts[4] = chunkId
Review comment:
Small change to @Ngone51's comment:
shuffleChunk_$shuffleId_$shuffleMergeId_$reduceId_$chunkId might be better ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]