holdenk commented on a change in pull request #28331:
URL: https://github.com/apache/spark/pull/28331#discussion_r432657731
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
##########
@@ -66,41 +90,96 @@ class BlockManagerDecommissionSuite extends SparkFunSuite
with LocalSparkContext
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEndEvents.append(taskEnd)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated):
Unit = {
+ blocksUpdated.append(blockUpdated)
+ }
})
+
// Cache the RDD lazily
- sleepyRdd.persist()
+ if (persist) {
+ testRdd.persist()
+ }
+
+ // Wait for all of the executors to start
+ TestUtils.waitUntilExecutorsUp(sc = sc,
+ numExecutors = numExecs,
+ timeout = 10000) // 10s
// Start the computation of RDD - this step will also cache the RDD
- val asyncCount = sleepyRdd.countAsync()
+ val asyncCount = testRdd.countAsync()
// Wait for the job to have started
sem.acquire(1)
// Give Spark a tiny bit to start the tasks after the listener says hello
- Thread.sleep(100)
+ Thread.sleep(50)
+
// Decommission one of the executor
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
- assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}")
+ assert(execs.size == numExecs, s"Expected ${numExecs} executors but found
${execs.size}")
+
val execToDecommission = execs.head
+ logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission)
// Wait for job to finish
- val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds)
+ val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
assert(asyncCountResult === 10)
// All 10 tasks finished, so accum should have been increased 10 times
assert(accum.value === 10)
// All tasks should be successful, nothing should have failed
sc.listenerBus.waitUntilEmpty()
- assert(taskEndEvents.size === 10) // 10 mappers
- assert(taskEndEvents.map(_.reason).toSet === Set(Success))
+ if (shuffle) {
+ // 10 mappers & 10 reducers which succeeded
+ assert(taskEndEvents.count(_.reason == Success) === 20,
+ s"Expected 20 tasks got ${taskEndEvents.size} (${taskEndEvents})")
+ } else {
+ // 10 mappers which executed successfully
+ assert(taskEndEvents.count(_.reason == Success) === 10,
+ s"Expected 10 tasks got ${taskEndEvents.size} (${taskEndEvents})")
+ }
- // Since the RDD is cached, so further usage of same RDD should use the
+ // Wait for our respective blocks to have migrated
+ eventually(timeout(15.seconds), interval(10.milliseconds)) {
+ if (persist) {
+ // One of our blocks should have moved.
+ val blockLocs = blocksUpdated.map{ update =>
Review comment:
Yeah I think there is some type issues with how isRDD is implemented
(namely it's checking the type and we've got type erasure going on). I'll fix
the isRDD impl and add that as a filter.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]