squito commented on a change in pull request #24499: [SPARK-25888][Core] Serve 
local disk persisted blocks by the external service after releasing executor by 
dynamic allocation
URL: https://github.com/apache/spark/pull/24499#discussion_r282992855
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
 ##########
 @@ -92,4 +95,40 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll {
     }
     e.getMessage should include ("Fetch failure will not retry stage due to 
testing config")
   }
+
+  test("SPARK-25888: using external shuffle service fetching disk persisted 
blocks") {
+    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+    sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+    sc.env.blockManager.shuffleClient.getClass should 
equal(classOf[ExternalShuffleClient])
+
+    val rdd = sc.parallelize(0 until 100, 2)
+      .map { i => (i, 1) }
+      .persist(StorageLevel.DISK_ONLY)
+
+    rdd.count()
+
+    val blockId = RDDBlockId(rdd.id, 0)
+    eventually(timeout(2.seconds), interval(100.milliseconds)) {
+      val locations = sc.env.blockManager.master.getLocations(blockId)
+      assert(locations.size === 2)
+      assert(locations.map(_.port).contains(server.getPort),
+        "external shuffle service port should be contained")
+    }
+
+    sc.killExecutors(sc.getExecutorIds())
+
+    eventually(timeout(2.seconds), interval(100.milliseconds)) {
+      val locations = sc.env.blockManager.master.getLocations(blockId)
+      assert(locations.size === 1)
+      assert(locations.map(_.port).contains(server.getPort),
+        "external shuffle service port should be contained")
+    }
+
+    val rddSplit0Block = sc.env.blockManager.getRemoteValues(blockId)
+    assert(rddSplit0Block.isDefined)
+
+    // Invalidate the registered executors, disallowing access to their 
shuffle blocks (without
+    // deleting the actual shuffle files, so we could access them without the 
shuffle service).
+    rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs 
*/)
 
 Review comment:
   I don't think I understand the point of this last part -- doesn't look like 
you're accessing those files anymore after this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to