juliuszsompolski commented on code in PR #41748:
URL: https://github.com/apache/spark/pull/41748#discussion_r1246407273


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -653,12 +677,15 @@ class SparkSession private(
    *             map values: 1, "Steven", LocalDate.of(2023, 4, 2).
    *             Map value can be also a `Column` of literal expression, in 
that case
    *             it is taken as is.
+   * @param tracker A tracker that can notify when query is ready for execution
    *
    * @since 3.4.0

Review Comment:
   change to `@since 3.5.0`, since this is a new function.



##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -622,12 +622,15 @@ class SparkSession private(
    *             For example, 1, "Steven", LocalDate.of(2023, 4, 2).
    *             A value can be also a `Column` of literal expression, in that 
case
    *             it is taken as is.
+   * @param tracker A tracker that can notify when query is ready for execution

Review Comment:
   For API completeness, we also  should add `def sql(sqlText: String, tracker: 
QueryPlanningTracker)` and `def sql(sqlText: String, args: 
java.util.Map[String, Any], tracker: QueryPlanningTracker)`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -71,9 +71,15 @@ class QueryExecution(
     }
   }
 
-  lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) 
{
-    // We can't clone `logical` here, which will reset the `_analyzed` flag.
-    sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
+  lazy val analyzed: LogicalPlan = {
+    val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
+      // We can't clone `logical` here, which will reset the `_analyzed` flag.
+      sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
+    }
+    if (isEager(plan)) {
+      tracker.setReadyForExecution(plan)
+    }

Review Comment:
   Since this is about eagerly executed commands, I would place it in 
`eagerlyExecuteCommands`, just before creating `val qe` there. That's where it 
actually triggers the command execution, so you also won't need the `isEager` 
helper.
   
   Add comment there:
   ```
   // Since Command execution will eagerly take place here, and in most cases 
be the bulk of time and effort,
   // with the rest of processing of the root plan being just outputting 
command results,
   // for eagerly executed commands we mark this place as beginning of 
execution.
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala:
##########
@@ -263,6 +269,28 @@ class QueryExecutionSuite extends SharedSparkSession {
     assert(projectQe.executedPlan.isInstanceOf[CommandResultExec])
     val cmdResultExec = projectQe.executedPlan.asInstanceOf[CommandResultExec]
     assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec])
+    assert(mockCallback.tracker != null)
+  }
+
+  test("SPARK-44145: non eagerly executed command setReadyForExecution") {
+    val mockCallback = MockCallback()
+
+    val showTables = ShowTables(UnresolvedNamespace(Seq.empty[String]), None)
+    val showTablesQe = new QueryExecution(
+      spark,
+      showTables,
+      new QueryPlanningTracker(mockCallback.callback),
+      CommandExecutionMode.SKIP)
+    showTablesQe.assertExecutedPlanPrepared()

Review Comment:
   add
   ```
   showTablesQe.assertCommandExecuted
   assert(mockCallback.tracker == null)
   showTablesQe.assertSparkPlanPrepared
   assert(mockCallback.tracker == null)
   ```
   above



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala:
##########
@@ -244,14 +245,19 @@ class QueryExecutionSuite extends SharedSparkSession {
   }
 
   test("SPARK-35378: Eagerly execute non-root Command") {
-    def qe(logicalPlan: LogicalPlan): QueryExecution = new 
QueryExecution(spark, logicalPlan)
+    val mockCallback = MockCallbackEagerCommand()
+    def qe(logicalPlan: LogicalPlan): QueryExecution = new QueryExecution(
+      spark,
+      logicalPlan,
+      new QueryPlanningTracker(mockCallback.callback))
 
     val showTables = ShowTables(UnresolvedNamespace(Seq.empty[String]), None)
     val showTablesQe = qe(showTables)
     assert(showTablesQe.commandExecuted.isInstanceOf[CommandResult])

Review Comment:
   `assert(mockCallback.tracker == null)` before 
`showTablesQe.commandExecuted`, and `assert(mockCallback.tracker != null)` after



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala:
##########
@@ -263,6 +269,28 @@ class QueryExecutionSuite extends SharedSparkSession {
     assert(projectQe.executedPlan.isInstanceOf[CommandResultExec])
     val cmdResultExec = projectQe.executedPlan.asInstanceOf[CommandResultExec]
     assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec])
+    assert(mockCallback.tracker != null)
+  }
+
+  test("SPARK-44145: non eagerly executed command setReadyForExecution") {
+    val mockCallback = MockCallback()
+
+    val showTables = ShowTables(UnresolvedNamespace(Seq.empty[String]), None)
+    val showTablesQe = new QueryExecution(
+      spark,
+      showTables,
+      new QueryPlanningTracker(mockCallback.callback),
+      CommandExecutionMode.SKIP)
+    showTablesQe.assertExecutedPlanPrepared()
+    assert(mockCallback.tracker != null)
+  }
+
+  test("SPARK-44145: Plan setReadyForExecution") {
+    val mockCallback = MockCallback()
+    val plan: LogicalPlan = 
org.apache.spark.sql.catalyst.plans.logical.Range(0, 1, 1, 1)
+    val df = Dataset.ofRows(spark, plan, new 
QueryPlanningTracker(mockCallback.callback))
+    df.queryExecution.assertExecutedPlanPrepared()

Review Comment:
   let's assert for all stages in this test:
   ```
   df.queryExecution.assertAnalyzed
   assert(mockCallback.tracker == null)
   df.queryExecution.assertOptimized
   assert(mockCallback.tracker == null)
   df.queryExecution.assertSparkPlanPrepared
   assert(mockCallback.tracker == null)
   ````



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala:
##########
@@ -120,6 +127,20 @@ class QueryPlanningTracker {
     ret
   }
 
+  /**
+   * Set when the query has been analysed and is ready for execution.
+   * This is after analysis for eager commands and after planning
+   * for other queries.
+   * see @link org.apache.spark.sql.execution.CommandExecutionMode
+   */
+  def setReadyForExecution(analyzedPlan: LogicalPlan): Unit = {
+    if (readyForExecution) {
+      throw new IllegalStateException("Cannot setReadyForExecution more than 
once")
+    }

Review Comment:
   It passes tests, but I'm not 100% sure it would always hold. I think it's 
better to just `return` here to make sure that the callback is only called 
once. If you look at `QueryExecution.eagerlyExecutedCommands` it could have 
multiple commands in the query tree, that you e.g. union the results together 
or sth. Although I don't know of such use cases in practice, I think it's just 
OK to return here and make it be called the first time you reach executions.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -169,13 +175,19 @@ class QueryExecution(
     // We need to materialize the optimizedPlan here, before tracking the 
planning phase, to ensure
     // that the optimization time is not counted as part of the planning phase.
     assertOptimized()
-    executePhase(QueryPlanningTracker.PLANNING) {
+    val plan = executePhase(QueryPlanningTracker.PLANNING) {
       // clone the plan to avoid sharing the plan instance between different 
stages like analyzing,
       // optimizing and planning.
       QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
     }
+    if (!isEager(analyzed)) {
+      tracker.setReadyForExecution(analyzed)
+    }

Review Comment:
   if you let `setReadyForExecution` be called multiple times, you can drop the 
if here.
   Add comment
   ```
   // Note: For eagerly executed command it might have already been called in
   // `eagerlyExecutedCommand` and is a noop here.
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -169,13 +175,19 @@ class QueryExecution(
     // We need to materialize the optimizedPlan here, before tracking the 
planning phase, to ensure
     // that the optimization time is not counted as part of the planning phase.
     assertOptimized()
-    executePhase(QueryPlanningTracker.PLANNING) {
+    val plan = executePhase(QueryPlanningTracker.PLANNING) {
       // clone the plan to avoid sharing the plan instance between different 
stages like analyzing,
       // optimizing and planning.
       QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
     }
+    if (!isEager(analyzed)) {
+      tracker.setReadyForExecution(analyzed)
+    }
+    plan
   }
 
+  def assertExecutedPlanPrepared(): Unit = executedPlan

Review Comment:
   add `assertSparkPlanPrepared()` as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to