cloud-fan commented on code in PR #56095:
URL: https://github.com/apache/spark/pull/56095#discussion_r3300808105
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala:
##########
@@ -277,6 +277,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
@transient
private val runningSubqueries = new ArrayBuffer[ExecSubqueryExpression]
+ @transient private lazy val subqueryLock: AnyRef = new AnyRef
Review Comment:
Suggest matching the existing Spark idiom for ad-hoc lock objects —
`AdaptiveSparkPlanExec.lock`, `BlockManager.{asyncReregisterLock,
peerFetchLock}`, `QueryExecution.observedMetricsLock`, `Executor.lock`, etc.
all use `@transient private val xxxLock = new Object()`.
`AdaptiveSparkPlanExec` is the closest peer (also a `SparkPlan`, also
`Serializable`) and uses exactly that pattern.
```suggestion
@transient private val subqueryLock = new Object()
```
Your point that `lazy` would defend against a deserialized-then-prepared
plan is fair, but the very next line up — `runningSubqueries` — makes the same
assumption non-defensively, so as long as that gap stands, this one matches it.
Separately, the name `subqueryLock` is a bit narrow: it now also guards the
`prepared` flag and the user-defined `doPrepare()` body. `prepareLock` would
describe the role better — but only worth changing if you're touching this line
anyway.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala:
##########
@@ -329,7 +331,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
* @note `prepare` method has already walked down the tree, so the
implementation doesn't have
* to call children's `prepare` methods.
*
- * This will only be called once, protected by `this`.
+ * This will only be called once, protected by [[subqueryLock]].
Review Comment:
The new wording references `subqueryLock`, which is `private` — subclasses
overriding `doPrepare()` can't see it, so the link points at a name future
readers won't be able to navigate to. The pre-PR `protected by 'this'` was
informative because `this` was in scope; pointing at a private name is a step
back.
```suggestion
* This will only be called once, protected by an internal lock.
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala:
##########
@@ -168,6 +178,55 @@ class SparkPlanSuite extends SharedSparkSession {
}
}
}
+
+ test("waitForSubqueries must not hold the plan's monitor while awaiting
subquery results") {
+ val enteredLatch = new CountDownLatch(1)
+ val releaseLatch = new CountDownLatch(1)
+
+ val subqueryExec = TestSubqueryExec(LocalTableScanExec(Nil, Nil, None))
+ val subqueryExpr = BlockingSubquery(subqueryExec, ExprId(0), enteredLatch,
releaseLatch)
+ val plan = TestPlanWithSubquery(subqueryExpr)
+
+ val executor =
ThreadUtils.newDaemonSingleThreadExecutor("test-wait-for-subqueries")
+ implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
+
+ plan.testPrepare()
+ val futureA = Future { plan.testWaitForSubqueries() }
+
+ assert(enteredLatch.await(10, TimeUnit.SECONDS),
+ "Thread A did not enter updateResult() within 10s")
+
+ val threadB = new Thread(() => plan.synchronized {})
+ threadB.setDaemon(true)
+ threadB.start()
+
+ val bean = ManagementFactory.getThreadMXBean
+ val deadline = System.currentTimeMillis() + 5000L
+ var threadBBlocked = false
+ var waiting = true
+ while (waiting) {
+ if (!threadB.isAlive || System.currentTimeMillis() > deadline) {
+ waiting = false
+ } else {
+ val state =
Option(bean.getThreadInfo(threadB.getId)).map(_.getThreadState).orNull
+ if (state == Thread.State.BLOCKED) {
+ threadBBlocked = true
+ waiting = false
+ } else if (state != null) {
+ Thread.sleep(1)
+ }
+ }
+ }
+
+ releaseLatch.countDown()
+ ThreadUtils.awaitResult(futureA, Duration(10, "seconds"))
+ threadB.join(5000L)
+ executor.shutdown()
Review Comment:
Cleanup isn't in a `finally`. If `enteredLatch.await(10, SECONDS)` returns
false (or any assert/exception fires before line 221), `futureA`'s thread sits
on `releaseLatch.await(30s)` and the executor never shuts down. Suggest
wrapping the body from line 193 onward in:
```scala
try {
// existing body, ending with the !threadBBlocked assert
} finally {
releaseLatch.countDown()
executor.shutdown()
}
```
so a failed run doesn't leak threads for 30s.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala:
##########
@@ -168,6 +178,55 @@ class SparkPlanSuite extends SharedSparkSession {
}
}
}
+
+ test("waitForSubqueries must not hold the plan's monitor while awaiting
subquery results") {
Review Comment:
nit: every other test in this suite is prefixed with `SPARK-xxxxx:`. Suggest
matching the convention:
```suggestion
test("SPARK-57041: waitForSubqueries must not hold the plan's monitor " +
"while awaiting subquery results") {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]