jiwen624 commented on code in PR #56095:
URL: https://github.com/apache/spark/pull/56095#discussion_r3301087842
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala:
##########
@@ -168,6 +178,55 @@ class SparkPlanSuite extends SharedSparkSession {
}
}
}
+
+ test("waitForSubqueries must not hold the plan's monitor while awaiting
subquery results") {
+ val enteredLatch = new CountDownLatch(1)
+ val releaseLatch = new CountDownLatch(1)
+
+ val subqueryExec = TestSubqueryExec(LocalTableScanExec(Nil, Nil, None))
+ val subqueryExpr = BlockingSubquery(subqueryExec, ExprId(0), enteredLatch,
releaseLatch)
+ val plan = TestPlanWithSubquery(subqueryExpr)
+
+ val executor =
ThreadUtils.newDaemonSingleThreadExecutor("test-wait-for-subqueries")
+ implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
+
+ plan.testPrepare()
+ val futureA = Future { plan.testWaitForSubqueries() }
+
+ assert(enteredLatch.await(10, TimeUnit.SECONDS),
+ "Thread A did not enter updateResult() within 10s")
+
+ val threadB = new Thread(() => plan.synchronized {})
+ threadB.setDaemon(true)
+ threadB.start()
+
+ val bean = ManagementFactory.getThreadMXBean
+ val deadline = System.currentTimeMillis() + 5000L
+ var threadBBlocked = false
+ var waiting = true
+ while (waiting) {
+ if (!threadB.isAlive || System.currentTimeMillis() > deadline) {
+ waiting = false
+ } else {
+ val state =
Option(bean.getThreadInfo(threadB.getId)).map(_.getThreadState).orNull
+ if (state == Thread.State.BLOCKED) {
+ threadBBlocked = true
+ waiting = false
+ } else if (state != null) {
+ Thread.sleep(1)
+ }
+ }
+ }
+
+ releaseLatch.countDown()
+ ThreadUtils.awaitResult(futureA, Duration(10, "seconds"))
+ threadB.join(5000L)
+ executor.shutdown()
Review Comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]