This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 721cba5 [SPARK-31791][CORE][TEST] Improve cache block migration test reliability 721cba5 is described below commit 721cba540292d8d76102b18922dabe2a7d918dc5 Author: Holden Karau <hka...@apple.com> AuthorDate: Fri May 22 18:19:41 2020 -0700 [SPARK-31791][CORE][TEST] Improve cache block migration test reliability ### What changes were proposed in this pull request? Increase the timeout and register the listener earlier to avoid any race condition of the job starting before the listener is registered. ### Why are the changes needed? The test is currently semi-flaky. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I'm currently running the following bash script on my dev machine to verify the flakiness decreases. It has gotten to 356 iterations without any test failures so I believe issue is fixed. ``` set -ex ./build/sbt clean compile package ((failures=0)) for (( i=0;i<1000;++i )); do echo "Run $i" ((failed=0)) ./build/sbt "core/testOnly org.apache.spark.scheduler.WorkerDecommissionSuite" || ((failed=1)) echo "Resulted in $failed" ((failures=failures+failed)) echo "Current status is failures: $failures out of $i runs" done ``` Closes #28614 from holdenk/SPARK-31791-improve-cache-block-migration-test-reliability. Authored-by: Holden Karau <hka...@apple.com> Signed-off-by: Holden Karau <hka...@apple.com> --- .../spark/scheduler/WorkerDecommissionSuite.scala | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 8c6f86a..148d20e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -22,7 +22,8 @@ import java.util.concurrent.Semaphore import scala.concurrent.TimeoutException import scala.concurrent.duration._ -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, + TestUtils} import org.apache.spark.internal.config import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} @@ -48,12 +49,6 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { test("verify a task with all workers decommissioned succeeds") { val input = sc.parallelize(1 to 10) - // Do a count to wait for the executors to be registered. - input.count() - val sleepyRdd = input.mapPartitions{ x => - Thread.sleep(50) - x - } // Listen for the job val sem = new Semaphore(0) sc.addSparkListener(new SparkListener { @@ -61,22 +56,31 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { sem.release() } }) + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = 2, + timeout = 10000) // 10s + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(5000) // 5s + x + } // Start the task. val asyncCount = sleepyRdd.countAsync() // Wait for the job to have started sem.acquire(1) + // Give it time to make it to the worker otherwise we'll block + Thread.sleep(2000) // 2s // Decommission all the executors, this should not halt the current task. // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() execs.foreach(execId => sched.decommissionExecutor(execId)) - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 10.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) // Try and launch task after decommissioning, this should fail val postDecommissioned = input.map(x => x) val postDecomAsyncCount = postDecommissioned.countAsync() val thrown = intercept[java.util.concurrent.TimeoutException]{ - val result = ThreadUtils.awaitResult(postDecomAsyncCount, 10.seconds) + val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds) } assert(postDecomAsyncCount.isCompleted === false, "After exec decommission new task could not launch") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org