attilapiros commented on a change in pull request #28331:
URL: https://github.com/apache/spark/pull/28331#discussion_r432829473



##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
##########
@@ -22,85 +22,202 @@ import java.util.concurrent.Semaphore
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite, Success}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
 import org.apache.spark.internal.config
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, 
SparkListenerTaskStart}
+import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
 import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
 
 class BlockManagerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext
-    with ResetSystemProperties {
+    with ResetSystemProperties with Eventually {
+
+  val numExecs = 3
+  val numParts = 3
+
+  test(s"verify that an already running task which is going to cache data 
succeeds " +
+    s"on a decommissioned executor") {
+    runDecomTest(true, false, true)
+  }
+
+  test(s"verify that shuffle blocks are migrated.") {
+    runDecomTest(false, true, false)
+  }
 
-  override def beforeEach(): Unit = {
-    val conf = new SparkConf().setAppName("test")
+  test(s"verify that both migrations can work at the same time.") {
+    runDecomTest(true, true, false)
+  }
+
+  private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: 
Boolean) = {
+    val master = s"local-cluster[${numExecs}, 1, 1024]"
+    val conf = new SparkConf().setAppName("test").setMaster(master)
       .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
       .set(config.STORAGE_DECOMMISSION_ENABLED, true)
+      .set(config.STORAGE_RDD_DECOMMISSION_ENABLED, persist)
+      .set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED, shuffle)
+    // Just replicate blocks as fast as we can during testing, there isn't 
another
+    // workload we need to worry about.
+      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+
+    // Force fetching to local disk
+    if (shuffle) {
+      conf.set("spark.network.maxRemoteBlockSizeFetchToMem", "1")

Review comment:
       I am sorry here might be some misunderstanding we should test shuffle 
migration with both streamed and non-streamed upload.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to