juliuszsompolski commented on code in PR #41748:
URL: https://github.com/apache/spark/pull/41748#discussion_r1246407273
##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -653,12 +677,15 @@ class SparkSession private(
* map values: 1, "Steven", LocalDate.of(2023, 4, 2).
* Map value can be also a `Column` of literal expression, in
that case
* it is taken as is.
+ * @param tracker A tracker that can notify when query is ready for execution
*
* @since 3.4.0
Review Comment:
change to `@since 3.5.0`, since this is a new function.
##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -622,12 +622,15 @@ class SparkSession private(
* For example, 1, "Steven", LocalDate.of(2023, 4, 2).
* A value can be also a `Column` of literal expression, in that
case
* it is taken as is.
+ * @param tracker A tracker that can notify when query is ready for execution
Review Comment:
For API completeness, we also should add `def sql(sqlText: String, tracker:
QueryPlanningTracker)` and `def sql(sqlText: String, args:
java.util.Map[String, Any], tracker: QueryPlanningTracker)`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -71,9 +71,15 @@ class QueryExecution(
}
}
- lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS)
{
- // We can't clone `logical` here, which will reset the `_analyzed` flag.
- sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
+ lazy val analyzed: LogicalPlan = {
+ val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
+ // We can't clone `logical` here, which will reset the `_analyzed` flag.
+ sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
+ }
+ if (isEager(plan)) {
+ tracker.setReadyForExecution(plan)
+ }
Review Comment:
Since this is about eagerly executed commands, I would place it in
`eagerlyExecuteCommands`, just before creating `val qe` there. That's where it
actually triggers the command execution, so you also won't need the `isEager`
helper.
Add comment there:
```
// Since Command execution will eagerly take place here, and in most cases
be the bulk of time and effort,
// with the rest of processing of the root plan being just outputting
command results,
// for eagerly executed commands we mark this place as beginning of
execution.
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala:
##########
@@ -263,6 +269,28 @@ class QueryExecutionSuite extends SharedSparkSession {
assert(projectQe.executedPlan.isInstanceOf[CommandResultExec])
val cmdResultExec = projectQe.executedPlan.asInstanceOf[CommandResultExec]
assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec])
+ assert(mockCallback.tracker != null)
+ }
+
+ test("SPARK-44145: non eagerly executed command setReadyForExecution") {
+ val mockCallback = MockCallback()
+
+ val showTables = ShowTables(UnresolvedNamespace(Seq.empty[String]), None)
+ val showTablesQe = new QueryExecution(
+ spark,
+ showTables,
+ new QueryPlanningTracker(mockCallback.callback),
+ CommandExecutionMode.SKIP)
+ showTablesQe.assertExecutedPlanPrepared()
Review Comment:
add
```
showTablesQe.assertCommandExecuted
assert(mockCallback.tracker == null)
showTablesQe.assertSparkPlanPrepared
assert(mockCallback.tracker == null)
```
above
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala:
##########
@@ -244,14 +245,19 @@ class QueryExecutionSuite extends SharedSparkSession {
}
test("SPARK-35378: Eagerly execute non-root Command") {
- def qe(logicalPlan: LogicalPlan): QueryExecution = new
QueryExecution(spark, logicalPlan)
+ val mockCallback = MockCallbackEagerCommand()
+ def qe(logicalPlan: LogicalPlan): QueryExecution = new QueryExecution(
+ spark,
+ logicalPlan,
+ new QueryPlanningTracker(mockCallback.callback))
val showTables = ShowTables(UnresolvedNamespace(Seq.empty[String]), None)
val showTablesQe = qe(showTables)
assert(showTablesQe.commandExecuted.isInstanceOf[CommandResult])
Review Comment:
`assert(mockCallback.tracker == null)` before
`showTablesQe.commandExecuted`, and `assert(mockCallback.tracker != null)` after
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala:
##########
@@ -263,6 +269,28 @@ class QueryExecutionSuite extends SharedSparkSession {
assert(projectQe.executedPlan.isInstanceOf[CommandResultExec])
val cmdResultExec = projectQe.executedPlan.asInstanceOf[CommandResultExec]
assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec])
+ assert(mockCallback.tracker != null)
+ }
+
+ test("SPARK-44145: non eagerly executed command setReadyForExecution") {
+ val mockCallback = MockCallback()
+
+ val showTables = ShowTables(UnresolvedNamespace(Seq.empty[String]), None)
+ val showTablesQe = new QueryExecution(
+ spark,
+ showTables,
+ new QueryPlanningTracker(mockCallback.callback),
+ CommandExecutionMode.SKIP)
+ showTablesQe.assertExecutedPlanPrepared()
+ assert(mockCallback.tracker != null)
+ }
+
+ test("SPARK-44145: Plan setReadyForExecution") {
+ val mockCallback = MockCallback()
+ val plan: LogicalPlan =
org.apache.spark.sql.catalyst.plans.logical.Range(0, 1, 1, 1)
+ val df = Dataset.ofRows(spark, plan, new
QueryPlanningTracker(mockCallback.callback))
+ df.queryExecution.assertExecutedPlanPrepared()
Review Comment:
let's assert for all stages in this test:
```
df.queryExecution.assertAnalyzed
assert(mockCallback.tracker == null)
df.queryExecution.assertOptimized
assert(mockCallback.tracker == null)
df.queryExecution.assertSparkPlanPrepared
assert(mockCallback.tracker == null)
````
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala:
##########
@@ -120,6 +127,20 @@ class QueryPlanningTracker {
ret
}
+ /**
+ * Set when the query has been analysed and is ready for execution.
+ * This is after analysis for eager commands and after planning
+ * for other queries.
+ * see @link org.apache.spark.sql.execution.CommandExecutionMode
+ */
+ def setReadyForExecution(analyzedPlan: LogicalPlan): Unit = {
+ if (readyForExecution) {
+ throw new IllegalStateException("Cannot setReadyForExecution more than
once")
+ }
Review Comment:
It passes tests, but I'm not 100% sure it would always hold. I think it's
better to just `return` here to make sure that the callback is only called
once. If you look at `QueryExecution.eagerlyExecutedCommands` it could have
multiple commands in the query tree, that you e.g. union the results together
or sth. Although I don't know of such use cases in practice, I think it's just
OK to return here and make it be called the first time you reach executions.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -169,13 +175,19 @@ class QueryExecution(
// We need to materialize the optimizedPlan here, before tracking the
planning phase, to ensure
// that the optimization time is not counted as part of the planning phase.
assertOptimized()
- executePhase(QueryPlanningTracker.PLANNING) {
+ val plan = executePhase(QueryPlanningTracker.PLANNING) {
// clone the plan to avoid sharing the plan instance between different
stages like analyzing,
// optimizing and planning.
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
+ if (!isEager(analyzed)) {
+ tracker.setReadyForExecution(analyzed)
+ }
Review Comment:
if you let `setReadyForExecution` be called multiple times, you can drop the
if here.
Add comment
```
// Note: For eagerly executed command it might have already been called in
// `eagerlyExecutedCommand` and is a noop here.
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -169,13 +175,19 @@ class QueryExecution(
// We need to materialize the optimizedPlan here, before tracking the
planning phase, to ensure
// that the optimization time is not counted as part of the planning phase.
assertOptimized()
- executePhase(QueryPlanningTracker.PLANNING) {
+ val plan = executePhase(QueryPlanningTracker.PLANNING) {
// clone the plan to avoid sharing the plan instance between different
stages like analyzing,
// optimizing and planning.
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
+ if (!isEager(analyzed)) {
+ tracker.setReadyForExecution(analyzed)
+ }
+ plan
}
+ def assertExecutedPlanPrepared(): Unit = executedPlan
Review Comment:
add `assertSparkPlanPrepared()` as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]