hvanhovell commented on code in PR #41443:
URL: https://github.com/apache/spark/pull/41443#discussion_r1238788471
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2323,16 +2325,107 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
responseObserver.onNext(
ExecutePlanResponse
.newBuilder()
- .setSessionId(sessionId)
+ .setSessionId(sessionHolder.sessionId)
.setSqlCommandResult(result)
.build())
// Send Metrics
responseObserver.onNext(SparkConnectStreamHandler.createMetricsResponse(sessionId,
df))
}
+ /**
+ * Executes a SQL query substituting named parameters by the given
arguments, returning the
+ * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not
for SELECT queries.
+ *
+ * Copied from @org.apache.spark.sql.SparkSession.sql, but with additional
callback triggered
+ * after an eager command has been analyzed, but before it has been executed.
+ *
+ * @param sqlText
+ * A SQL statement with named parameters to execute.
+ * @param args
+ * A map of parameter names to Java/Scala objects that can be converted to
SQL literal
+ * expressions. See <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
+ * Supported Data Types</a> for supported value types in Scala/Java. For
example, map keys:
+ * "rank", "name", "birthdate"; map values: 1, "Steven",
LocalDate.of(2023, 4, 2). Map value
+ * can be also a `Column` of literal expression, in that case it is taken
as is.
+ * @param analyzedCallback
+ * Callback triggered before an eager command has been analyzed, but
before it has been
+ * executed.
+ */
+ def datasetFromSql(
+ sqlText: String,
+ args: Map[String, Any],
+ analyzedCallback: QueryExecution => Unit = _ => ()): DataFrame = {
+ val tracker = new QueryPlanningTracker
+ val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
+ val parsedPlan =
sessionHolder.session.sessionState.sqlParser.parsePlan(sqlText)
+ if (args.nonEmpty) {
+ ParameterizedQuery(parsedPlan, args.mapValues(lit(_).expr).toMap)
+ } else {
+ parsedPlan
+ }
+ }
+ datasetFromRows(plan, analyzedCallback, Some(tracker))
+ }
+
+ /**
+ * Copied from @link org.apache.spark.sql.Dataset.ofRows, but with
additional callback triggered
+ * after an eager command has been analyzed, but before it has been executed.
+ *
+ * @param plan
+ * Plan to execute
+ * @param tracker
+ * A simple utility for tracking runtime and associated stats in query
planning. Specified
+ * when planning tasks precedes analysis (i.e. parsing)
+ * @param analyzedCallback
+ * Callback triggered before an eager command has been analyzed, but
before it has been
+ * executed.
+ */
+ def datasetFromRows(
+ plan: LogicalPlan,
+ analyzedCallback: QueryExecution => Unit = _ => (),
+ tracker: Option[QueryPlanningTracker] = None): Dataset[Row] = {
+ val qe =
+ new QueryExecution(sessionHolder.session, plan, tracker.getOrElse(new
QueryPlanningTracker))
+ qe.assertAnalyzed()
+ analyzedCallback(qe)
Review Comment:
Yeah I think using the QueryPlanningTracker is better. That way we don't
have to rely on the internal workings of QueryExecution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]