seayoun commented on a change in pull request #31643:
URL: https://github.com/apache/spark/pull/31643#discussion_r585400491



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -138,9 +138,24 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
     }
     long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
     int[][] reduceIdArr = new int[mapIds.length][];
+    int blockIdIndex = 0;
     for (int i = 0; i < mapIds.length; i++) {
       reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
+      // The `blockIds`'s order must be same with the read order specified in 
in FetchShuffleBlocks
+      // because the shuffle data's return order should match the `blockIds`'s 
order to ensure
+      // blockId and data match.
+      if (!batchFetchEnabled) {
+        for (int j = 0; j < reduceIdArr[i].length; j++) {
+          this.blockIds[blockIdIndex++] = "shuffle_" + shuffleId + "_" + 
mapIds[i] + "_"
+                  + reduceIdArr[i][j];

Review comment:
       Do you mean replace `mapIdToReduceIds` from `HashMap<Long, 
ArrayList<Integer>>` to `HashMap<Long, Pair<Integer, String>>`, add extral 
blockId in a pair?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -106,41 +107,53 @@ private boolean isShuffleBlocks(String[] blockIds) {
   }
 
   /**
-   * Analyze the pass in blockIds and create FetchShuffleBlocks message.
-   * The blockIds has been sorted by mapId and reduceId. It's produced in
-   * org.apache.spark.MapOutputTracker.convertMapStatuses.
+   * Create FetchShuffleBlocks message and rebuild internal blockIds by
+   * analyzing the pass in blockIds.
    */
-  private FetchShuffleBlocks createFetchShuffleBlocksMsg(
+  private FetchShuffleBlocks createFetchShuffleBlocksMsgAndBuildBlockIds(
       String appId, String execId, String[] blockIds) {
     String[] firstBlock = splitBlockId(blockIds[0]);
     int shuffleId = Integer.parseInt(firstBlock[1]);
     boolean batchFetchEnabled = firstBlock.length == 5;
 
-    HashMap<Long, ArrayList<Integer>> mapIdToReduceIds = new HashMap<>();
+    HashMap<Long, BlocksInfo> mapIdToBlocksInfo = new HashMap<>();

Review comment:
       Seems to be better.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -138,9 +138,24 @@ private FetchShuffleBlocks createFetchShuffleBlocksMsg(
     }
     long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
     int[][] reduceIdArr = new int[mapIds.length][];
+    int blockIdIndex = 0;
     for (int i = 0; i < mapIds.length; i++) {
       reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));
+      // The `blockIds`'s order must be same with the read order specified in 
in FetchShuffleBlocks
+      // because the shuffle data's return order should match the `blockIds`'s 
order to ensure
+      // blockId and data match.
+      if (!batchFetchEnabled) {
+        for (int j = 0; j < reduceIdArr[i].length; j++) {
+          this.blockIds[blockIdIndex++] = "shuffle_" + shuffleId + "_" + 
mapIds[i] + "_"
+                  + reduceIdArr[i][j];

Review comment:
       I understand your design and it's a smart design, I think it maybe a 
little complex for reading; single class may be more easier to read.
   What's your options?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
##########
@@ -81,7 +81,6 @@ public OneForOneBlockFetcher(
       TransportConf transportConf,
       DownloadFileManager downloadFileManager) {
     this.client = client;
-    this.blockIds = blockIds;

Review comment:
       The blockIds will be used to create OpenBlocks or ShuffleFetchBlocks 
later in contructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to