vanzin commented on a change in pull request #24554: [SPARK-27622][Core]
Avoiding the network when block manager fetches disk persisted RDD blocks from
the same host
URL: https://github.com/apache/spark/pull/24554#discussion_r288347132
##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -567,6 +608,137 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
assert(store.getRemoteBytes("list1").isEmpty)
}
+ Seq(
+ StorageLevel(useDisk = true, useMemory = false, deserialized = false),
+ StorageLevel(useDisk = true, useMemory = false, deserialized = true)
+ ).foreach { storageLevel =>
+ test(s"SPARK-27622: avoid the network when block requested from same host,
$storageLevel") {
+ conf.set("spark.shuffle.io.maxRetries", "0")
+ val noFetcher = new MockBlockTransferService(0) {
+ override def fetchBlockSync(
+ host: String,
+ port: Int,
+ execId: String,
+ blockId: String,
+ tempFileManager: DownloadFileManager): ManagedBuffer = {
+ fail("Fetching over network is not expected when the block is
requested from same host")
+ }
+ }
+ val store1 = makeBlockManager(8000, "executor1", this.master,
Some(noFetcher))
+ val store2 = makeBlockManager(8000, "executor2", this.master,
Some(noFetcher))
+ val blockId = "list"
+ val array = new Array[Byte](4000)
+ store2.putIterator(blockId, List(array).iterator, storageLevel, true)
+
+ // check getRemoteBytes
+ val bytesViaStore1 = store1.getRemoteBytes(blockId)
+ assert(bytesViaStore1.isDefined, "list expected to be accessed")
+ val expectedContent =
store2.getBlockData(blockId).nioByteBuffer().array()
+ assert(bytesViaStore1.get.toArray === expectedContent)
+
+ // check getRemoteValues
+ val valueViaStore1 = store1.getRemoteValues[List.type](blockId)
+ assert(valueViaStore1.isDefined, "list expected to be accessed")
+ assert(valueViaStore1.get.data.toList.head === array)
+ }
+ }
+
+ private def testWithFileDelAfterLocalDiskRead(level: StorageLevel,
getValueOrBytes: Boolean) = {
+ val testedFunc = if (getValueOrBytes) "getRemoteValue()" else
"getRemoteBytes()"
+ val testNameSuffix = s"$level, $testedFunc"
+ test(s"SPARK-27622: as file is removed fall back to network fetch,
$testNameSuffix") {
+ conf.set("spark.shuffle.io.maxRetries", "0")
+ // variable to check the usage of the local disk of the remote executor
on the same host
+ var sameHostExecutorTried: Boolean = false
+ val store2 = makeBlockManager(8000, "executor2", this.master,
+ Some(new MockBlockTransferService(0)))
+ val blockId = "list"
+ val array = new Array[Byte](4000)
+ store2.putIterator(blockId, List(array).iterator, level, true)
+ val expectedBlockData = store2.getLocalBytes(blockId)
+ assert(expectedBlockData.isDefined)
+ val expectedByteBuffer = expectedBlockData.get.toByteBuffer()
+
+ val transferServiceAfterLocalAccess = new MockBlockTransferService(0) {
+ override def fetchBlockSync(
+ host: String,
+ port: Int,
+ execId: String,
+ blockId: String,
+ tempFileManager: DownloadFileManager): ManagedBuffer = {
+ assert(sameHostExecutorTried, "before using the network local disk
of the remote " +
+ "executor (running on the same host) is expected to be tried")
+ new NioManagedBuffer(expectedByteBuffer)
+ }
+ }
+
+ val blockManagerWithDeleteFactory: BlockManagerFactory = (
Review comment:
Is this the only special implementation of `BlockManagerFactory`? If so,
might be easier / cleaner to use a Mockito `spy`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]