holdenk commented on a change in pull request #29226:
URL: https://github.com/apache/spark/pull/29226#discussion_r461133372



##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks 
land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it 
is subsequently
+      // decommissioned below.

Review comment:
       This goes against the purpose of this test: making sure that an executor 
with a running task that receives a decom has the block migrated. It is not 
migrating during decommissioning in the same way.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks 
land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it 
is subsequently
+      // decommissioned below.

Review comment:
       So I think another way we could make sure the test covers what we want 
is to run a job repeatedly until all 3 executors come up.
   
   The block manager (in decom state) does indeed refuses puts, but RDD 
computation on the executor goes through `getOrElseUpdate` which immediately 
calls `doPutIterator` if there is not a cache hit *before* the iterator starts 
being computed. Since the check to see if the block manager is decommissioning 
occurs before the start of the computation, not at the end we want to ensure 
that block can be put (and then later migrated).
   
   

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks 
land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it 
is subsequently
+      // decommissioned below.

Review comment:
       Maybe we should add a comment where we do the `isDecommissioning` check 
to explain that it is intentionally done there so that we don't reject blocks 
which have already started computation. Do you think that would help?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks 
land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it 
is subsequently
+      // decommissioned below.

Review comment:
       I don't believe we need another block manager PR to realize it, I think 
this is just a flaky test because we took out the original sleep and tried to 
use TestUtils which doesn't do a good enough job of waiting for the executor to 
fully come up.
   
   Since doPut is called *before* the task starts computation, we don't throw 
away any of the in-progress data.
   
   I'll make an alternate PR to this one to illustrate my understanding and 
hopefully we can iron it out and make the code path clear for everyone :) 

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -87,36 +113,30 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     }
 
     // Listen for the job & block updates
-    val taskStartSem = new Semaphore(0)
-    val broadcastSem = new Semaphore(0)
     val executorRemovedSem = new Semaphore(0)
-    val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+    val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
     val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
-    sc.addSparkListener(new SparkListener {
 
+    def getCandidateExecutorToDecom: Option[String] = if (whenToDecom == 
TaskStarted) {
+      accum.value.asScala.headOption

Review comment:
       I don't think this is going to work as intended, accumulators send 
updates back at the end of the task, unless something has changed.

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -87,36 +113,30 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     }
 
     // Listen for the job & block updates
-    val taskStartSem = new Semaphore(0)
-    val broadcastSem = new Semaphore(0)
     val executorRemovedSem = new Semaphore(0)
-    val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+    val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
     val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
-    sc.addSparkListener(new SparkListener {
 
+    def getCandidateExecutorToDecom: Option[String] = if (whenToDecom == 
TaskStarted) {
+      accum.value.asScala.headOption

Review comment:
       If you don't believe that is the case, can we add an assertion in here 
that none of the tasks have finished yet?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to