vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349369435
##########
File path:
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
##########
@@ -96,6 +99,52 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with
BeforeAndAfterAll wi
e.getMessage should include ("Fetch failure will not retry stage due to
testing config")
}
+ test("SPARK-27651: read host local shuffle blocks from disk and avoid
network remote fetches") {
+ val confWithHostLocalRead =
+ conf.clone.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+
confWithHostLocalRead.set(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 5)
+ sc = new SparkContext("local-cluster[2,1,1024]", "test",
confWithHostLocalRead)
+ sc.getConf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should
equal(true)
+ sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+ sc.env.blockManager.hostLocalDirManager.isDefined should equal(true)
+ sc.env.blockManager.blockStoreClient.getClass should
equal(classOf[ExternalBlockStoreClient])
+
+ // In a slow machine, one slave may register hundreds of milliseconds
ahead of the other one.
+ // If we don't wait for all slaves, it's possible that only one executor
runs all jobs. Then
+ // all shuffle blocks will be in this executor,
ShuffleBlockFetcherIterator will directly fetch
+ // local blocks from the local BlockManager and won't send requests to
ExternalShuffleService.
+ // In this case, we won't receive FetchFailed. And it will make this test
fail.
+ // Therefore, we should wait until all slaves are up
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+
+ val rdd = sc.parallelize(0 until 1000, 10)
+ .map { i => (i, 1) }
+ .reduceByKey(_ + _)
+
+ rdd.count()
+ rdd.count()
+
+ val cachedExecutors = rdd.mapPartitions { _ =>
+ SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager =>
+ localDirManager.getCachedHostLocalDirs().keySet.iterator
+ }.getOrElse(Iterator.empty)
+ }.collect().toSet
+
+ // both executors are caching the dirs of the other one
+ cachedExecutors should equal(sc.getExecutorIds().toSet)
+
+ // Invalidate the registered executors, disallowing access to their
shuffle blocks (without
+ // deleting the actual shuffle files, so we could access them without the
shuffle service).
+ // As directories are already cached there is no request to external
shuffle service.
+ rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs
*/)
+
+ // Now Spark will not receive FetchFailed as host local blocks are read
from the cached local
+ // disk directly
+ rdd.count()
Review comment:
Can you use just the job below (or just this one)? Or are both needed for
some reason?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]