holdenk commented on a change in pull request #29226:
URL: https://github.com/apache/spark/pull/29226#discussion_r461133372
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()
- // Wait for the job to have started.
- taskStartSem.acquire(1)
- // Wait for each executor + driver to have it's broadcast info delivered.
- broadcastSem.acquire((numExecs + 1))
-
// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
- // Give Spark a tiny bit to start executing after the broadcast blocks
land.
- // For me this works at 100, set to 300 for system variance.
- Thread.sleep(300)
+ // Wait for one of the tasks to succeed and finish writing its blocks.
+ // This way we know that this executor had real data to migrate when it
is subsequently
+ // decommissioned below.
Review comment:
This goes against the purpose of this test: making sure that an executor
with a running task that receives a decom has the block migrated. It is not
migrating during decommissioning in the same way.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()
- // Wait for the job to have started.
- taskStartSem.acquire(1)
- // Wait for each executor + driver to have it's broadcast info delivered.
- broadcastSem.acquire((numExecs + 1))
-
// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
- // Give Spark a tiny bit to start executing after the broadcast blocks
land.
- // For me this works at 100, set to 300 for system variance.
- Thread.sleep(300)
+ // Wait for one of the tasks to succeed and finish writing its blocks.
+ // This way we know that this executor had real data to migrate when it
is subsequently
+ // decommissioned below.
Review comment:
So I think another way we could make sure the test covers what we want
is to run a job repeatedly until all 3 executors come up.
The block manager (in decom state) does indeed refuses puts, but RDD
computation on the executor goes through `getOrElseUpdate` which immediately
calls `doPutIterator` if there is not a cache hit *before* the iterator starts
being computed. Since the check to see if the block manager is decommissioning
occurs before the start of the computation, not at the end we want to ensure
that block can be put (and then later migrated).
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()
- // Wait for the job to have started.
- taskStartSem.acquire(1)
- // Wait for each executor + driver to have it's broadcast info delivered.
- broadcastSem.acquire((numExecs + 1))
-
// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
- // Give Spark a tiny bit to start executing after the broadcast blocks
land.
- // For me this works at 100, set to 300 for system variance.
- Thread.sleep(300)
+ // Wait for one of the tasks to succeed and finish writing its blocks.
+ // This way we know that this executor had real data to migrate when it
is subsequently
+ // decommissioned below.
Review comment:
Maybe we should add a comment where we do the `isDecommissioning` check
to explain that it is intentionally done there so that we don't reject blocks
which have already started computation. Do you think that would help?
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -125,27 +118,26 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()
- // Wait for the job to have started.
- taskStartSem.acquire(1)
- // Wait for each executor + driver to have it's broadcast info delivered.
- broadcastSem.acquire((numExecs + 1))
-
// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
- // Give Spark a tiny bit to start executing after the broadcast blocks
land.
- // For me this works at 100, set to 300 for system variance.
- Thread.sleep(300)
+ // Wait for one of the tasks to succeed and finish writing its blocks.
+ // This way we know that this executor had real data to migrate when it
is subsequently
+ // decommissioned below.
Review comment:
I don't believe we need another block manager PR to realize it, I think
this is just a flaky test because we took out the original sleep and tried to
use TestUtils which doesn't do a good enough job of waiting for the executor to
fully come up.
Since doPut is called *before* the task starts computation, we don't throw
away any of the in-progress data.
I'll make an alternate PR to this one to illustrate my understanding and
hopefully we can iron it out and make the code path clear for everyone :)
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -87,36 +113,30 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
}
// Listen for the job & block updates
- val taskStartSem = new Semaphore(0)
- val broadcastSem = new Semaphore(0)
val executorRemovedSem = new Semaphore(0)
- val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+ val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
- sc.addSparkListener(new SparkListener {
+ def getCandidateExecutorToDecom: Option[String] = if (whenToDecom ==
TaskStarted) {
+ accum.value.asScala.headOption
Review comment:
I don't think this is going to work as intended, accumulators send
updates back at the end of the task, unless something has changed.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -87,36 +113,30 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
}
// Listen for the job & block updates
- val taskStartSem = new Semaphore(0)
- val broadcastSem = new Semaphore(0)
val executorRemovedSem = new Semaphore(0)
- val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+ val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
- sc.addSparkListener(new SparkListener {
+ def getCandidateExecutorToDecom: Option[String] = if (whenToDecom ==
TaskStarted) {
+ accum.value.asScala.headOption
Review comment:
If you don't believe that is the case, can we add an assertion in here
that none of the tasks have finished yet?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]