hvanhovell commented on code in PR #41443:
URL: https://github.com/apache/spark/pull/41443#discussion_r1238788471


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2323,16 +2325,107 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
     responseObserver.onNext(
       ExecutePlanResponse
         .newBuilder()
-        .setSessionId(sessionId)
+        .setSessionId(sessionHolder.sessionId)
         .setSqlCommandResult(result)
         .build())
 
     // Send Metrics
     
responseObserver.onNext(SparkConnectStreamHandler.createMetricsResponse(sessionId,
 df))
   }
 
+  /**
+   * Executes a SQL query substituting named parameters by the given 
arguments, returning the
+   * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not 
for SELECT queries.
+   *
+   * Copied from @org.apache.spark.sql.SparkSession.sql, but with additional 
callback triggered
+   * after an eager command has been analyzed, but before it has been executed.
+   *
+   * @param sqlText
+   *   A SQL statement with named parameters to execute.
+   * @param args
+   *   A map of parameter names to Java/Scala objects that can be converted to 
SQL literal
+   *   expressions. See <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
+   *   Supported Data Types</a> for supported value types in Scala/Java. For 
example, map keys:
+   *   "rank", "name", "birthdate"; map values: 1, "Steven", 
LocalDate.of(2023, 4, 2). Map value
+   *   can be also a `Column` of literal expression, in that case it is taken 
as is.
+   * @param analyzedCallback
+   *   Callback triggered before an eager command has been analyzed, but 
before it has been
+   *   executed.
+   */
+  def datasetFromSql(
+      sqlText: String,
+      args: Map[String, Any],
+      analyzedCallback: QueryExecution => Unit = _ => ()): DataFrame = {
+    val tracker = new QueryPlanningTracker
+    val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
+      val parsedPlan = 
sessionHolder.session.sessionState.sqlParser.parsePlan(sqlText)
+      if (args.nonEmpty) {
+        ParameterizedQuery(parsedPlan, args.mapValues(lit(_).expr).toMap)
+      } else {
+        parsedPlan
+      }
+    }
+    datasetFromRows(plan, analyzedCallback, Some(tracker))
+  }
+
+  /**
+   * Copied from @link org.apache.spark.sql.Dataset.ofRows, but with 
additional callback triggered
+   * after an eager command has been analyzed, but before it has been executed.
+   *
+   * @param plan
+   *   Plan to execute
+   * @param tracker
+   *   A simple utility for tracking runtime and associated stats in query 
planning. Specified
+   *   when planning tasks precedes analysis (i.e. parsing)
+   * @param analyzedCallback
+   *   Callback triggered before an eager command has been analyzed, but 
before it has been
+   *   executed.
+   */
+  def datasetFromRows(
+      plan: LogicalPlan,
+      analyzedCallback: QueryExecution => Unit = _ => (),
+      tracker: Option[QueryPlanningTracker] = None): Dataset[Row] = {
+    val qe =
+      new QueryExecution(sessionHolder.session, plan, tracker.getOrElse(new 
QueryPlanningTracker))
+    qe.assertAnalyzed()
+    analyzedCallback(qe)

Review Comment:
   Yeah I think using the QueryPlanningTracker is better. That way we don't 
have to rely on the internal workings of QueryExecution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to