vanzin commented on a change in pull request #24554: [SPARK-27622][Core] 
Avoiding the network when block manager fetches disk persisted RDD blocks from 
the same host
URL: https://github.com/apache/spark/pull/24554#discussion_r291390072
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
 ##########
 @@ -567,6 +568,113 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(store.getRemoteBytes("list1").isEmpty)
   }
 
+  Seq(
+    StorageLevel(useDisk = true, useMemory = false, deserialized = false),
+    StorageLevel(useDisk = true, useMemory = false, deserialized = true),
+    StorageLevel(useDisk = true, useMemory = false, deserialized = true, 
replication = 2)
+  ).foreach { storageLevel =>
+    test(s"SPARK-27622: avoid the network when block requested from same host, 
$storageLevel") {
+      conf.set("spark.shuffle.io.maxRetries", "0")
+      val sameHostBm = makeBlockManager(8000, "sameHost", master)
+
+      val otherHostTransferSrv = spy(sameHostBm.blockTransferService)
+      doAnswer { _ =>
+         "otherHost"
+      }.when(otherHostTransferSrv).hostName
+      val otherHostBm = makeBlockManager(8000, "otherHost", master, 
Some(otherHostTransferSrv))
+
+      // This test always uses the cleanBm to get the block. In case of 
replication
+      // the block can be added to the otherHostBm as direct disk read will use
+      // the local disk of sameHostBm where the block is replicated to.
+      // When there is no replication then block must be added via sameHostBm 
directly.
+      val bmToPutBlock = if (storageLevel.replication > 1) otherHostBm else 
sameHostBm
+      val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten
+      val blockId = "list"
+      bmToPutBlock.putIterator(blockId, List(array).iterator, storageLevel, 
tellMaster = true)
+
+      val sameHostTransferSrv = spy(sameHostBm.blockTransferService)
+      doAnswer { _ =>
+         fail("Fetching over network is not expected when the block is 
requested from same host")
+      }.when(sameHostTransferSrv).fetchBlockSync(mc.any(), mc.any(), mc.any(), 
mc.any(), mc.any())
+      val cleanBm = makeBlockManager(8000, "clean", master, 
Some(sameHostTransferSrv))
+
+      // check getRemoteBytes
+      val bytesViaStore1 = cleanBm.getRemoteBytes(blockId)
+      assert(bytesViaStore1.isDefined)
+      val expectedContent = 
sameHostBm.getBlockData(blockId).nioByteBuffer().array()
+      assert(bytesViaStore1.get.toArray === expectedContent)
+
+      // check getRemoteValues
+      val valueViaStore1 = cleanBm.getRemoteValues[List.type](blockId)
+      assert(valueViaStore1.isDefined)
+      assert(valueViaStore1.get.data.toList.head === array)
+    }
+  }
+
+  private def testWithFileDelAfterLocalDiskRead(level: StorageLevel, 
getValueOrBytes: Boolean) = {
+    val testedFunc = if (getValueOrBytes) "getRemoteValue()" else 
"getRemoteBytes()"
+    val testNameSuffix = s"$level, $testedFunc"
+    test(s"SPARK-27622: as file is removed fall back to network fetch, 
$testNameSuffix") {
+      conf.set("spark.shuffle.io.maxRetries", "0")
+      // variable to check the usage of the local disk of the remote executor 
on the same host
+      var sameHostExecutorTried: Boolean = false
+      val store2 = makeBlockManager(8000, "executor2", this.master,
+        Some(new MockBlockTransferService(0)))
+      val blockId = "list"
+      val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten
+      store2.putIterator(blockId, List(array).iterator, level, true)
+      val expectedBlockData = store2.getLocalBytes(blockId)
+      assert(expectedBlockData.isDefined)
+      val expectedByteBuffer = expectedBlockData.get.toByteBuffer()
+      val mockTransferService = new MockBlockTransferService(0) {
+        override def fetchBlockSync(
+            host: String,
+            port: Int,
+            execId: String,
+            blockId: String,
+            tempFileManager: DownloadFileManager): ManagedBuffer = {
+          assert(sameHostExecutorTried, "before using the network local disk 
of the remote " +
+            "executor (running on the same host) is expected to be tried")
+          new NioManagedBuffer(expectedByteBuffer)
+        }
+      }
+      val store1 = makeBlockManager(8000, "executor1", this.master, 
Some(mockTransferService))
+      val spiedStore1 = spy(store1)
+      doAnswer { inv =>
+        val blockId = inv.getArguments()(0).asInstanceOf[BlockId]
+        val localDirs = inv.getArguments()(1).asInstanceOf[Array[String]]
+        val blockSize = inv.getArguments()(2).asInstanceOf[Long]
+        val res = store1.readDiskBlockFromSameHostExecutor(blockId, localDirs, 
blockSize)
+        assert(res.isDefined)
+        // delete the file behind the blockId
+        ExecutorDiskUtils.getFile(localDirs, store1.subDirsPerLocalDir, 
blockId.name).delete()
 
 Review comment:
   Should assert that the file is deleted.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to