otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r640855529



##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -824,17 +919,17 @@ public void 
testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
   private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) 
throws IOException {
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
-      AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId 
appShuffleId, int reduceId,
-        File dataFile, File indexFile, File metaFile) throws IOException {
+      AppShufflePartitionInfo newAppShufflePartitionInfo(AppAttemptShuffleId 
appAttemptShuffleId, int reduceId,
+                                                         File dataFile, File 
indexFile, File metaFile) throws IOException {

Review comment:
       Ni: indentation

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -61,6 +61,17 @@
 
   private static final Logger log = 
LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class);
   private final String TEST_APP = "testApp";
+  private final String MERGE_DIRECTORY = "merge_directory";
+  private final int NON_ATTEMPTID = -1;

Review comment:
       NIt: NO_ATTEMPT_ID

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -149,10 +160,10 @@ public void testFinalizeWithMultipleReducePartitions() 
throws IOException {
   @Test
   public void testDeferredBufsAreWrittenDuringOnData() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, -1, 
0, 0, 0, 0));

Review comment:
       NIT: Use NO_ATTEMPT_ID here and at other places

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -117,64 +123,85 @@ public ShuffleIndexInformation load(File file) throws 
IOException {
    * shuffle does not exist, initializes the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      AppAttemptShuffleId appAttemptShuffleId,
+      AppAttemptPathsInfo appAttemptPathsInfo,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
-    if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
+    File dataFile = getMergedShuffleDataFile(
+      appAttemptPathsInfo, appAttemptShuffleId.appId, 
appAttemptShuffleId.shuffleId, reduceId);
+    if (!partitions.containsKey(appAttemptShuffleId) && dataFile.exists()) {
       // If this partition is already finalized then the partitions map will 
not contain
       // the appShuffleId but the data file would exist. In that case the 
block is considered late.
       return null;
     }
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap());
-    return shufflePartitions.computeIfAbsent(reduceId, key -> {
-      // It only gets here when the key is not present in the map. This could 
either
-      // be the first time the merge manager receives a pushed block for a 
given application
-      // shuffle partition, or after the merged shuffle file is finalized. We 
handle these
-      // two cases accordingly by checking if the file already exists.
-      File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
-      File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
-      try {
-        if (dataFile.exists()) {
-          return null;
-        } else {
-          return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, 
indexFile, metaFile);
-        }
-      } catch (IOException e) {
-        logger.error(
-          "Cannot create merged shuffle partition with data file {}, index 
file {}, and "
-            + "meta file {}", dataFile.getAbsolutePath(),
-            indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
-        throw new RuntimeException(
-          String.format("Cannot initialize merged shuffle partition for appId 
%s shuffleId %s "
-          + "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId, 
reduceId), e);
+    // While processing the application remove, where the shuffle partitions 
info for the specific
+    // application will be cleaned up, this method will still be called to 
create new partitions
+    // as of receiving the push blocks. To avoid the potential memory leak, 
before creating the
+    // empty hashmap for storing the shuffle partitions information in the 
partitions hashmap,
+    // we need to make sure that the entry for the specific application must 
still exist in
+    // appAttemptsPathInfo hashmap. Otherwise, the push blocks should be 
ignored.

Review comment:
       I don't see how this completely avoids the issue. There can be a context 
switch after line 150 and another thread can call `applicationRemoved`. I don't 
think we should try to solve this issue with this PR. This is a problem with 
existing code so why not create another jira for it. The goal of this jira is 
to add support in DiskBlockManager to server local merged data and we keep 
deviating from it.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -290,16 +343,63 @@ void deleteExecutorDirs(Path[] dirs) {
     }
   }
 
+  /**
+   * Create StreamCallback for invalid push blocks with the specific error 
message.
+   * If specific error message is null, this StreamCallback won't throw 
exception in client.
+   */
+  private StreamCallbackWithID createCallbackForInvalidPushBlocks(
+      String streamId,
+      String errorMessage) {
+    return new StreamCallbackWithID() {
+      @Override
+      public String getID() {
+        return streamId;
+      }
+
+      @Override
+      public void onData(String streamId, ByteBuffer buf) {
+        // Ignore the requests. It reaches here either when a request is 
received after the
+        // shuffle file is finalized or when a request is for a duplicate 
block.
+      }
+
+      @Override
+      public void onComplete(String streamId) {
+        if (errorMessage != null) {
+          // Throw an exception here so the block data is drained from channel 
and server
+          // responds RpcFailure to the client.
+          throw new RuntimeException(String.format("Block %s %s", streamId, 
errorMessage));
+        }
+        // For duplicate block that is received before the shuffle merge 
finalizes, the
+        // server should respond success to the client.
+      }
+
+      @Override
+      public void onFailure(String streamId, Throwable cause) {
+      }
+    };
+  }
+
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     // Retrieve merged shuffle file metadata
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
+    AppAttemptPathsInfo appAttemptPathsInfo = 
getAppAttemptPathsInfo(msg.appId);
+    final String streamId = String.format("%s_%d_%d_%d",
+      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, 
msg.mapIndex, msg.reduceId);
+    AppAttemptShuffleId appAttemptShuffleId =
+      new AppAttemptShuffleId(msg.appId, msg.attemptId, msg.shuffleId);
+    if (appAttemptPathsInfo.attemptId != appAttemptShuffleId.attemptId) {
+      // If this Block belongs to a former application attempt, it is 
considered late,
+      // as only the blocks from the current application attempt will be merged
+      return createCallbackForInvalidPushBlocks(streamId,
+        
ErrorHandler.BlockPushErrorHandler.NEWER_ATTEMPT_HAS_STARTED_MESSAGE_SUFFIX);

Review comment:
       Please add a TODO  as well as jira ticket for this that the client need 
to handle this failure. 

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -52,6 +53,14 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new 
Array[File](subDirsPerLocalDir))
 
+  /**
+   * Create merge directories
+   */
+  createLocalDirsForMergedShuffleBlocks(conf)
+
+  private[spark] lazy val activeMergedShuffleDirs: Option[Array[File]] =
+    findActiveMergedShuffleDirs(conf)
+

Review comment:
       If this executor is not creating the merged directory, then currently it 
seems that activeMergedShuffleDirs will be empty. Check line 216. 

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
##########
@@ -35,7 +35,11 @@
   public final String[] localDirs;
   /** Number of subdirectories created within each localDir. */
   public final int subDirsPerLocalDir;
-  /** Shuffle manager (SortShuffleManager) that the executor is using. */
+  /** Shuffle manager (SortShuffleManager) that the executor is using.

Review comment:
       Nit: Since it is no longer one line, start it from the next line. Also 
fix line 42

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -61,6 +61,17 @@
 
   private static final Logger log = 
LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class);
   private final String TEST_APP = "testApp";
+  private final String MERGE_DIRECTORY = "merge_directory";
+  private final int NON_ATTEMPTID = -1;
+  private final int ATTEMPTID_1 = 1;

Review comment:
       Nit: ATTEMPT_ID_1

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -336,51 +436,26 @@ public StreamCallbackWithID 
receiveBlockDataAsStream(PushBlockStream msg) {
     final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != 
null
       && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null
         : partitionInfoBeforeCheck;
-    final String streamId = String.format("%s_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, 
msg.mapIndex,
-      msg.reduceId);
     if (partitionInfo != null) {
       return new PushBlockStreamCallback(this, streamId, partitionInfo, 
msg.mapIndex);
     } else {
       // For a duplicate block or a block which is late, respond back with a 
callback that handles
       // them differently.
-      return new StreamCallbackWithID() {
-        @Override
-        public String getID() {
-          return streamId;
-        }
-
-        @Override
-        public void onData(String streamId, ByteBuffer buf) {
-          // Ignore the requests. It reaches here either when a request is 
received after the
-          // shuffle file is finalized or when a request is for a duplicate 
block.
-        }
-
-        @Override
-        public void onComplete(String streamId) {
-          if (isTooLate) {
-            // Throw an exception here so the block data is drained from 
channel and server
-            // responds RpcFailure to the client.
-            throw new RuntimeException(String.format("Block %s %s", streamId,
-              ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
-          }
-          // For duplicate block that is received before the shuffle merge 
finalizes, the
-          // server should respond success to the client.
-        }
-
-        @Override
-        public void onFailure(String streamId, Throwable cause) {
-        }
-      };
+      if (isTooLate) {

Review comment:
       Can you please explain why you have broken down this into if/else here? 
What will will the client do when it receives a `null` as a failure message? 

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
##########
@@ -40,5 +41,15 @@ trait ShuffleBlockResolver {
    */
   def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): 
ManagedBuffer
 
+  /**
+   * Retrieve the data for the specified merged shuffle block as multiple 
chunks.
+   */
+  def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer]

Review comment:
       All of these are missing the `dirs` parameter

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -52,6 +53,14 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new 
Array[File](subDirsPerLocalDir))
 
+  /**
+   * Create merge directories
+   */
+  createLocalDirsForMergedShuffleBlocks(conf)
+
+  private[spark] lazy val activeMergedShuffleDirs: Option[Array[File]] =
+    findActiveMergedShuffleDirs(conf)
+

Review comment:
       BTW this is no longer needed. The dirs are passed to the getMerged... 
APIs.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +736,23 @@ private[spark] class BlockManager(
     }
   }
 
+  /**
+   * Get the local merged shuffle block data for the given block ID as 
multiple chunks.
+   * A merged shuffle file is divided into multiple chunks according to the 
index file.
+   * Instead of reading the entire file as a single block, we split it into 
smaller chunks
+   * which will be memory efficient when performing certain operations.
+   */
+  override def getLocalMergedBlockData(blockId: ShuffleBlockId): 
Seq[ManagedBuffer] = {

Review comment:
       This is also the older API. @zhouyejoe We need to add the dirs. Please 
look at these APIs in https://github.com/apache/spark/pull/32140/ 
   

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +101,34 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * @see 
[[org.apache.spark.network.shuffle.RemoteBlockPushResolver#getFile(java.lang.String,
+   *   java.lang.String)]]
+   */
+  def getMergedShuffleFile(blockId: BlockId): File = {

Review comment:
       @zhouyejoe This is outdated code. We need to pass the `dirs: 
Option[Array[String]]`to this method to support discovering location of merged 
block directories when dynamic resource allocation is enabled.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -415,24 +490,65 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    partitions.remove(appAttemptShuffleId);
+    logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId, 
msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+          executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo, 
MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appAttemptsPathsInfo.computeIfAbsent(appId, id ->
+            new AppAttemptPathsInfo(appId, mergeDirectoryMeta.attemptId, 
executorInfo.localDirs,
+              mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          AtomicBoolean newAttemptRegistered = new AtomicBoolean(false);
+          appAttemptsPathsInfo.compute(appId, (id, appAttemptPathsInfo) -> {
+            if (appAttemptPathsInfo == null || (appAttemptPathsInfo != null
+                && mergeDirectoryMeta.attemptId > 
appAttemptPathsInfo.attemptId)) {
+              appAttemptPathsInfo =
+                new AppAttemptPathsInfo(
+                  appId, mergeDirectoryMeta.attemptId, executorInfo.localDirs,
+                  mergeDirectoryMeta.mergeDir, 
executorInfo.subDirsPerLocalDir);
+              newAttemptRegistered.set(true);
+            }
+            return appAttemptPathsInfo;
+          });
+          if (newAttemptRegistered.get()) {
+            logger.warn("Remove shuffle partition info for {}_{} generated for 
former attempts",
+              appId, mergeDirectoryMeta.attemptId);
+            cleanupShufflePartitionInfo(appId, mergeDirectoryMeta.attemptId);

Review comment:
       `cleanupShufflePartitionInfo` iterates over the whole `partitions` map 
so we have increased the latency of registration. This cleanup can happen 
asynchronously. Please add a TODO and create a jira for it as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to