agrawaldevesh commented on a change in pull request #29226:
URL: https://github.com/apache/spark/pull/29226#discussion_r461139578



##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks 
land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it 
is subsequently
+      // decommissioned below.

Review comment:
       In which case, I don't follow how this is supposed to work in the first 
place. Consider the root cause: You decommission an executor while a (result) 
task is running. The block manager associated with the executor enters the 
decom state. The task tries to write the RDD persisted block. The block manager 
fails that because it is decom'd, thereby failing the task. The task reruns 
somewhere else and the job succeeds. Now you go to check if that executor has 
migrated the block: That check fails because the block was never written in the 
first place.
   
   So I am relaxing the notion of "migrate during" to mean "migrate while the 
job is running". The question is "which executor" do you want to decommission 
and force a migration off ? Picking a wrong executor as above can mean that no 
migrations indeed happen because no real blocks were written (or written and 
thence discarded).
   
   If that's the intent of the test then I think we need to change the block 
manager decommissioning (production) code to realize the intent. 
   
   Without this PR: This test is fundamentally racy and thus provides low 
signal. The race is between the task's end and the block manager's 
decommission: If the task ends successfully before the decom, the test passes. 
Otherwise the test fails.
   
   

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks 
land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it 
is subsequently
+      // decommissioned below.

Review comment:
       > The block manager (in decom state) does indeed refuses puts, but RDD 
computation on the executor goes through `getOrElseUpdate` which immediately 
calls `doPutIterator` if there is not a cache hit _before_ the iterator starts 
being computed.
   
   I don't see this. This is what I see: (BM stands for BlockManager)
   - RDD.getOrCompute -> BM.getOrElseUpdate -> BM.doPutIterator -> BM.doPut 
   
   BM.doPut throws BlockSavedOnDecommissionedBlockManagerException if 
`isDecommissioning`. And this fails the job. I am not sure what it should do 
instead off the top of my head since I am new to this codepath. But it is 
certainly not continuing on with the computation (as far as I can tell).
   
   If the intent of the test is indeed to test decommissioning "Before the task 
has ended", I think then we need another BlockManager PR to actually realize it 
:-) 

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks 
land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it 
is subsequently
+      // decommissioned below.

Review comment:
       Cool ! Looking forward to your alternate PR to fix this.
   
   > I don't believe we need another block manager PR to realize it, I think 
this is just a flaky test because we took out the original sleep and tried to 
use TestUtils which doesn't do a good enough job of waiting for the executor to 
fully come up.
   
   I don't think the issue is that the executor does not come up. It does come 
up. The issue I think is that it tries to run the task and fails because it is 
decommissioned.
   
   > Since doPut is called _before_ the task starts computation, we don't throw 
away any of the in-progress data.
   
   I agree that doPut is called _before_ the task starts computation. I didn't 
follow what you mean by "in-progress data" ?. Per my understanding, the task is 
simply failing before the iterator is even created. 
   
   Please check this for yourself by doing a decommission in the `onTaskStart` 
listener callback and ensuring that the task has a long enough sleep time to 
ensure that it waits for this decommissioning to happen.
   
   > 
   > I'll make an alternate PR to this one to illustrate my understanding and 
hopefully we can iron it out and make the code path clear for everyone :)
   
   Hurray :-) 
   

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -36,21 +37,34 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
   val numExecs = 3
   val numParts = 3
 
+  test(s"verify that an already running task which is going to cache data 
succeeds " +
+    s"on a decommissioned executor after task start") {
+    runDecomTest(true, false, true, 1)
+  }
+
+  test(s"verify that an already running task which is going to cache data 
succeeds " +
+    s"on a decommissioned executor after iterator start") {
+    runDecomTest(true, false, true, 2)
+  }
+

Review comment:
       @holdenk .. this is what I meant: I have added two tests to test the 
decom after "Task Start". 
   
   The first test is a negative test. It defines task start as when the driver 
says that the task is "launched" and as we discussed, it is expected to fail 
because decommissioning will be triggered before `BlockManager.doPut` is called 
and thus fail the task without starting the iterator.
   
   The second test is what you want mentioned in your "intention": We want the 
decommissioning after the "iterator start" (ie user code started) to trigger a 
migration: The block manager will write the block anyway and it will be 
migrated.
   
   The first negative test is just to demonstrate that it is a case we don't 
handle yet. Since we agree that it is not a case worth fixing for production, I 
will eventually delete it. 
   
   So now we cover the full gamut of interesting decom points in task: When it 
is just launched, When user code is activated, After it has ended. 

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -36,21 +37,34 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
   val numExecs = 3
   val numParts = 3
 
+  test(s"verify that an already running task which is going to cache data 
succeeds " +
+    s"on a decommissioned executor after task start") {
+    runDecomTest(true, false, true, 1)
+  }
+
+  test(s"verify that an already running task which is going to cache data 
succeeds " +
+    s"on a decommissioned executor after iterator start") {
+    runDecomTest(true, false, true, 2)
+  }
+

Review comment:
       @holdenk .. this is what I meant: I have added two tests to test the 
decom after "Task Start". 
   
   The first test is a negative test. It defines task start as when the driver 
says that the task is "launched" and as we discussed, it is expected to fail 
because decommissioning will be triggered before `BlockManager.doPut` is called 
and thus fail the task without starting the iterator.
   
   The second test is what you want mentioned in your "intention": We want the 
decommissioning after the "iterator start" (ie user code started) to trigger a 
migration: The block manager will write the block anyway and it will be 
migrated. Do you think this second test indeed captures the intention of the 
test ?
   
   The first negative test is just to demonstrate that it is a case we don't 
handle yet. Since we agree that it is not a case worth fixing for production, I 
will eventually delete it. 
   
   So now we cover the full gamut of interesting decom points in task: When it 
is just launched, When user code is activated, After it has ended. 
   

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     // Start the computation of RDD - this step will also cache the RDD
     val asyncCount = testRdd.countAsync()
 
-    // Wait for the job to have started.
-    taskStartSem.acquire(1)
-    // Wait for each executor + driver to have it's broadcast info delivered.
-    broadcastSem.acquire((numExecs + 1))
-
     // Make sure the job is either mid run or otherwise has data to migrate.
     if (migrateDuring) {
-      // Give Spark a tiny bit to start executing after the broadcast blocks 
land.
-      // For me this works at 100, set to 300 for system variance.
-      Thread.sleep(300)
+      // Wait for one of the tasks to succeed and finish writing its blocks.
+      // This way we know that this executor had real data to migrate when it 
is subsequently
+      // decommissioned below.

Review comment:
       @holdenk, Please take a look at the PR once again. I have added another 
test to specifically capture the intent of decommissioning after the task has 
started but before the task has ended. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to