agrawaldevesh commented on a change in pull request #29226:
URL: https://github.com/apache/spark/pull/29226#discussion_r461139578
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()
- // Wait for the job to have started.
- taskStartSem.acquire(1)
- // Wait for each executor + driver to have it's broadcast info delivered.
- broadcastSem.acquire((numExecs + 1))
-
// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
- // Give Spark a tiny bit to start executing after the broadcast blocks
land.
- // For me this works at 100, set to 300 for system variance.
- Thread.sleep(300)
+ // Wait for one of the tasks to succeed and finish writing its blocks.
+ // This way we know that this executor had real data to migrate when it
is subsequently
+ // decommissioned below.
Review comment:
In which case, I don't follow how this is supposed to work in the first
place. Consider the root cause: You decommission an executor while a (result)
task is running. The block manager associated with the executor enters the
decom state. The task tries to write the RDD persisted block. The block manager
fails that because it is decom'd, thereby failing the task. The task reruns
somewhere else and the job succeeds. Now you go to check if that executor has
migrated the block: That check fails because the block was never written in the
first place.
So I am relaxing the notion of "migrate during" to mean "migrate while the
job is running". The question is "which executor" do you want to decommission
and force a migration off ? Picking a wrong executor as above can mean that no
migrations indeed happen because no real blocks were written (or written and
thence discarded).
If that's the intent of the test then I think we need to change the block
manager decommissioning (production) code to realize the intent.
Without this PR: This test is fundamentally racy and thus provides low
signal. The race is between the task's end and the block manager's
decommission: If the task ends successfully before the decom, the test passes.
Otherwise the test fails.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()
- // Wait for the job to have started.
- taskStartSem.acquire(1)
- // Wait for each executor + driver to have it's broadcast info delivered.
- broadcastSem.acquire((numExecs + 1))
-
// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
- // Give Spark a tiny bit to start executing after the broadcast blocks
land.
- // For me this works at 100, set to 300 for system variance.
- Thread.sleep(300)
+ // Wait for one of the tasks to succeed and finish writing its blocks.
+ // This way we know that this executor had real data to migrate when it
is subsequently
+ // decommissioned below.
Review comment:
> The block manager (in decom state) does indeed refuses puts, but RDD
computation on the executor goes through `getOrElseUpdate` which immediately
calls `doPutIterator` if there is not a cache hit _before_ the iterator starts
being computed.
I don't see this. This is what I see: (BM stands for BlockManager)
- RDD.getOrCompute -> BM.getOrElseUpdate -> BM.doPutIterator -> BM.doPut
BM.doPut throws BlockSavedOnDecommissionedBlockManagerException if
`isDecommissioning`. And this fails the job. I am not sure what it should do
instead off the top of my head since I am new to this codepath. But it is
certainly not continuing on with the computation (as far as I can tell).
If the intent of the test is indeed to test decommissioning "Before the task
has ended", I think then we need another BlockManager PR to actually realize it
:-)
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()
- // Wait for the job to have started.
- taskStartSem.acquire(1)
- // Wait for each executor + driver to have it's broadcast info delivered.
- broadcastSem.acquire((numExecs + 1))
-
// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
- // Give Spark a tiny bit to start executing after the broadcast blocks
land.
- // For me this works at 100, set to 300 for system variance.
- Thread.sleep(300)
+ // Wait for one of the tasks to succeed and finish writing its blocks.
+ // This way we know that this executor had real data to migrate when it
is subsequently
+ // decommissioned below.
Review comment:
Cool ! Looking forward to your alternate PR to fix this.
> I don't believe we need another block manager PR to realize it, I think
this is just a flaky test because we took out the original sleep and tried to
use TestUtils which doesn't do a good enough job of waiting for the executor to
fully come up.
I don't think the issue is that the executor does not come up. It does come
up. The issue I think is that it tries to run the task and fails because it is
decommissioned.
> Since doPut is called _before_ the task starts computation, we don't throw
away any of the in-progress data.
I agree that doPut is called _before_ the task starts computation. I didn't
follow what you mean by "in-progress data" ?. Per my understanding, the task is
simply failing before the iterator is even created.
Please check this for yourself by doing a decommission in the `onTaskStart`
listener callback and ensuring that the task has a long enough sleep time to
ensure that it waits for this decommissioning to happen.
>
> I'll make an alternate PR to this one to illustrate my understanding and
hopefully we can iron it out and make the code path clear for everyone :)
Hurray :-)
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -36,21 +37,34 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
val numExecs = 3
val numParts = 3
+ test(s"verify that an already running task which is going to cache data
succeeds " +
+ s"on a decommissioned executor after task start") {
+ runDecomTest(true, false, true, 1)
+ }
+
+ test(s"verify that an already running task which is going to cache data
succeeds " +
+ s"on a decommissioned executor after iterator start") {
+ runDecomTest(true, false, true, 2)
+ }
+
Review comment:
@holdenk .. this is what I meant: I have added two tests to test the
decom after "Task Start".
The first test is a negative test. It defines task start as when the driver
says that the task is "launched" and as we discussed, it is expected to fail
because decommissioning will be triggered before `BlockManager.doPut` is called
and thus fail the task without starting the iterator.
The second test is what you want mentioned in your "intention": We want the
decommissioning after the "iterator start" (ie user code started) to trigger a
migration: The block manager will write the block anyway and it will be
migrated.
The first negative test is just to demonstrate that it is a case we don't
handle yet. Since we agree that it is not a case worth fixing for production, I
will eventually delete it.
So now we cover the full gamut of interesting decom points in task: When it
is just launched, When user code is activated, After it has ended.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -36,21 +37,34 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
val numExecs = 3
val numParts = 3
+ test(s"verify that an already running task which is going to cache data
succeeds " +
+ s"on a decommissioned executor after task start") {
+ runDecomTest(true, false, true, 1)
+ }
+
+ test(s"verify that an already running task which is going to cache data
succeeds " +
+ s"on a decommissioned executor after iterator start") {
+ runDecomTest(true, false, true, 2)
+ }
+
Review comment:
@holdenk .. this is what I meant: I have added two tests to test the
decom after "Task Start".
The first test is a negative test. It defines task start as when the driver
says that the task is "launched" and as we discussed, it is expected to fail
because decommissioning will be triggered before `BlockManager.doPut` is called
and thus fail the task without starting the iterator.
The second test is what you want mentioned in your "intention": We want the
decommissioning after the "iterator start" (ie user code started) to trigger a
migration: The block manager will write the block anyway and it will be
migrated. Do you think this second test indeed captures the intention of the
test ?
The first negative test is just to demonstrate that it is a case we don't
handle yet. Since we agree that it is not a case worth fixing for production, I
will eventually delete it.
So now we cover the full gamut of interesting decom points in task: When it
is just launched, When user code is activated, After it has ended.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()
- // Wait for the job to have started.
- taskStartSem.acquire(1)
- // Wait for each executor + driver to have it's broadcast info delivered.
- broadcastSem.acquire((numExecs + 1))
-
// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
- // Give Spark a tiny bit to start executing after the broadcast blocks
land.
- // For me this works at 100, set to 300 for system variance.
- Thread.sleep(300)
+ // Wait for one of the tasks to succeed and finish writing its blocks.
+ // This way we know that this executor had real data to migrate when it
is subsequently
+ // decommissioned below.
Review comment:
@holdenk, Please take a look at the PR once again. I have added another
test to specifically capture the intent of decommissioning after the task has
started but before the task has ended.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]