attilapiros commented on a change in pull request #25299: [SPARK-27651][Core] 
Avoid the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349513148
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
 ##########
 @@ -96,6 +99,52 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll wi
     e.getMessage should include ("Fetch failure will not retry stage due to 
testing config")
   }
 
+  test("SPARK-27651: read host local shuffle blocks from disk and avoid 
network remote fetches") {
+    val confWithHostLocalRead =
+      conf.clone.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+    
confWithHostLocalRead.set(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 5)
+    sc = new SparkContext("local-cluster[2,1,1024]", "test", 
confWithHostLocalRead)
+    sc.getConf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should 
equal(true)
+    sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+    sc.env.blockManager.hostLocalDirManager.isDefined should equal(true)
+    sc.env.blockManager.blockStoreClient.getClass should 
equal(classOf[ExternalBlockStoreClient])
+
+    // In a slow machine, one slave may register hundreds of milliseconds 
ahead of the other one.
+    // If we don't wait for all slaves, it's possible that only one executor 
runs all jobs. Then
+    // all shuffle blocks will be in this executor, 
ShuffleBlockFetcherIterator will directly fetch
+    // local blocks from the local BlockManager and won't send requests to 
ExternalShuffleService.
+    // In this case, we won't receive FetchFailed. And it will make this test 
fail.
+    // Therefore, we should wait until all slaves are up
+    TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+
+    val rdd = sc.parallelize(0 until 1000, 10)
+      .map { i => (i, 1) }
+      .reduceByKey(_ + _)
+
+    rdd.count()
+    rdd.count()
+
+    val cachedExecutors = rdd.mapPartitions { _ =>
+      SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager =>
+        localDirManager.getCachedHostLocalDirs().keySet.iterator
+      }.getOrElse(Iterator.empty)
+    }.collect().toSet
+
+    // both executors are caching the dirs of the other one
+    cachedExecutors should equal(sc.getExecutorIds().toSet)
+
+    // Invalidate the registered executors, disallowing access to their 
shuffle blocks (without
+    // deleting the actual shuffle files, so we could access them without the 
shuffle service).
+    // As directories are already cached there is no request to external 
shuffle service.
+    rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs 
*/)
+
+    // Now Spark will not receive FetchFailed as host local blocks are read 
from the cached local
+    // disk directly
+    rdd.count()
 
 Review comment:
   There is no reason to keep both (first I have used the `count()` to validate 
the feature then I have added the 2nd one to validate the values too and forget 
to remove the 1st one). 
   So I keep the 2nd one only. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to