This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b6b11b514c4 [SPARK-40168][CORE] Handle `SparkException` during shuffle 
block migration
b6b11b514c4 is described below

commit b6b11b514c46189594a89b2b5607a5016c84d97f
Author: Warren Zhu <warren.zh...@gmail.com>
AuthorDate: Fri Sep 9 14:22:14 2022 -0700

    [SPARK-40168][CORE] Handle `SparkException` during shuffle block migration
    
    ### What changes were proposed in this pull request?
    Explicitly handle FileNotFoundException wrapped inside SparkException, then 
mark this block as deleted, further avoid retry of this block and stop of 
current migration thread
    
    ### Why are the changes needed?
    FileNotFoundException wrapped inside SparkException is not handled 
correctly, causing unnecessary retry and stop of current migration thread
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added test in BlockManagerDecommissionUnitSuite
    
    Closes #37603 from warrenzhu25/deco-error.
    
    Authored-by: Warren Zhu <warren.zh...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/storage/BlockManagerDecommissioner.scala |  2 +-
 .../BlockManagerDecommissionUnitSuite.scala        | 45 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 82450dd2651..6e3cf9c9b41 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -127,7 +127,7 @@ private[storage] class BlockManagerDecommissioner(
               }
               logInfo(s"Migrated $shuffleBlockInfo to $peer")
             } catch {
-              case e: IOException =>
+              case e @ ( _ : IOException | _ : SparkException) =>
                 // If a block got deleted before netty opened the file handle, 
then trying to
                 // load the blocks now will fail. This is most likely to occur 
if we start
                 // migrating blocks and then the shuffle TTL cleaner kicks in. 
However this
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index b7ac378b4c6..df4f256afb6 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.storage
 
+import java.io.FileNotFoundException
+
+import scala.concurrent.Future
 import scala.concurrent.duration._
 
 import org.mockito.{ArgumentMatchers => mc}
@@ -186,6 +189,48 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     validateDecommissionTimestampsOnManager(bmDecomManager, fail = false, 
assertDone = false)
   }
 
+  test("SPARK-40168: block decom manager handles shuffle file not found") {
+    // Set up the mocks so we return one shuffle block
+    val bm = mock(classOf[BlockManager])
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    // First call get blocks, then empty list simulating a delete.
+    when(migratableShuffleBlockResolver.getStoredShuffles())
+      .thenReturn(Seq(ShuffleBlockInfo(1, 1)))
+      .thenReturn(Seq())
+    when(migratableShuffleBlockResolver.getMigrationBlocks(mc.any()))
+      .thenReturn(
+        List(
+          (ShuffleIndexBlockId(1, 1, 1), mock(classOf[ManagedBuffer])),
+          (ShuffleDataBlockId(1, 1, 1), mock(classOf[ManagedBuffer]))))
+      .thenReturn(List())
+
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq())
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+    val blockTransferService = mock(classOf[BlockTransferService])
+    // Simulate FileNotFoundException wrap inside SparkException
+    when(
+      blockTransferService
+        .uploadBlock(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), 
mc.any(), mc.isNull()))
+      .thenReturn(Future.failed(
+        new java.io.IOException("boop", new FileNotFoundException("file not 
found"))))
+    when(
+      blockTransferService
+        .uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), 
mc.any(), mc.isNull()))
+      .thenCallRealMethod()
+
+    when(bm.blockTransferService).thenReturn(blockTransferService)
+
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm)
+    validateDecommissionTimestampsOnManager(
+      bmDecomManager,
+      numShuffles = Option(1))
+  }
+
   test("block decom manager handles IO failures") {
     // Set up the mocks so we return one shuffle block
     val bm = mock(classOf[BlockManager])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to