otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r638114426



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -116,29 +115,35 @@ public ShuffleIndexInformation load(File file) throws 
IOException {
    * application, retrieves the associated metadata. If not present and the 
corresponding merged
    * shuffle does not exist, initializes the metadata.
    */
-  private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+  private AppAttemptShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
+      AppAttemptShuffleId appAttemptShuffleId,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
-    if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
+    if (!appsPathsInfo.containsKey(appAttemptShuffleId.appId)

Review comment:
       We are checking `appPathsInfo` for the attempt so this can be pulled out 
of here and checked before calling this method

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -761,28 +818,31 @@ public boolean equals(Object o) {
       if (o == null || getClass() != o.getClass()) {
         return false;
       }
-      AppShuffleId that = (AppShuffleId) o;
-      return shuffleId == that.shuffleId && Objects.equal(appId, that.appId);
+      AppAttemptShuffleId that = (AppAttemptShuffleId) o;
+      return Objects.equal(appId, that.appId)
+        && attemptId == that.attemptId
+        && shuffleId == that.shuffleId;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hashCode(appId, shuffleId);
+      return Objects.hashCode(appId, attemptId, shuffleId);
     }
 
     @Override
     public String toString() {
-      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-        .append("appId", appId)
-        .append("shuffleId", shuffleId)
+      return Objects.toStringHelper(this)
+        .add("appId", appId)
+        .add("attemptId", attemptId)
+        .add("shuffleId", shuffleId)
         .toString();
     }
   }
 
   /** Metadata tracked for an actively merged shuffle partition */
-  public static class AppShufflePartitionInfo {
+  public static class AppAttemptShufflePartitionInfo {

Review comment:
       We only create a partition info for the latest attempt. I don't really 
think renaming this class is necessary. 
   The java doc also says this. Here `attempt` has no significance since it is 
just the part of the `AppAttemptShuffleId` which is renamed.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,32 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) 
extends BlockId {
+  override def name: String = "shuffleMerged_" + appId + "_" + shuffleId + "_" 
+ reduceId + ".data"
+}
+
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+    appId: String,
+    shuffleId: Int,
+    reduceId: Int) extends BlockId {
+  override def name: String =
+    "shuffleMerged_" + appId + "_" + shuffleId + "_" + reduceId + ".index"
+}
+
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedMetaBlockId(
+    appId: String,
+    shuffleId: Int,
+    reduceId: Int) extends BlockId {
+  override def name: String =
+    "shuffleMerged_" + appId + "_" + shuffleId + "_" + reduceId + ".meta"

Review comment:
       So the file names that the client expects now starts with 
`shuffleMerged`. Has this be changed on the server side which writes these 
files? IIRC it was writing to files starting with `mergedShuffle`

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -415,24 +429,65 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    partitions.remove(appAttemptShuffleId);
+    logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId, 
msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+          executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo, 
MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == -1) {
+          // When attemptId is 0, there is no attemptId stored in the 
ExecutorShuffleInfo.

Review comment:
       Are there UTs added for these cases?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -59,7 +59,7 @@ default boolean shouldLogError(Throwable t) {
      * will not retry pushing the block nor log the exception on the client 
side.
      */
     public static final String TOO_LATE_MESSAGE_SUFFIX =
-      "received after merged shuffle is finalized";
+      "received after merged shuffle is finalized or newer attempt has 
started";

Review comment:
       Why don't we make "newer attempt has started" a separate message?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -92,7 +91,7 @@
   public RemoteBlockPushResolver(TransportConf conf) {
     this.conf = conf;
     this.partitions = Maps.newConcurrentMap();
-    this.appsPathInfo = Maps.newConcurrentMap();
+    this.appsPathsInfo = Maps.newConcurrentMap();

Review comment:
       Nit: rename to appAttemptPathsInfo

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -293,9 +307,9 @@ void deleteExecutorDirs(Path[] dirs) {
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     // Retrieve merged shuffle file metadata
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
-    AppShufflePartitionInfo partitionInfoBeforeCheck =
-      getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId);
+    AppAttemptShuffleId appAttemptShuffleId = new 
AppAttemptShuffleId(msg.appId, msg.attemptId, msg.shuffleId);
+    AppAttemptShufflePartitionInfo partitionInfoBeforeCheck =

Review comment:
       We can just check before creating the partitionInfo, whether the 
attemptId is latest or not. If not, then throw the relevant exception here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to