rangadi commented on code in PR #41443:
URL: https://github.com/apache/spark/pull/41443#discussion_r1264121801
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2640,15 +2663,16 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
throw new UnsupportedOperationException(
s"WriteOperationV2:ModeValue not supported
${writeOperation.getModeValue}")
}
+ executeHolder.events.postFinished()
}
def handleWriteStreamOperationStart(
writeOp: WriteStreamOperationStart,
- userId: String,
- sessionId: String,
- responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+ responseObserver: StreamObserver[ExecutePlanResponse],
+ executeHolder: ExecuteHolder): Unit = {
val plan = transformRelation(writeOp.getInput)
- val dataset = Dataset.ofRows(session, logicalPlan = plan)
+ val tracker = executeHolder.events.createQueryPlanningTracker
+ val dataset = Dataset.ofRows(session, plan, tracker)
Review Comment:
Sure. This one excludes sink set up and actual start of the streaming query
(i.e. checking checkpoint location if any etc).
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2726,6 +2750,7 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
SparkConnectService.streamingSessionManager.registerNewStreamingQuery(
sessionHolder = SessionHolder(userId = userId, sessionId = sessionId,
session),
query = query)
+ executeHolder.events.postFinished()
Review Comment:
What is this? If it is just post of RPC, can we do this in higher in the
call stack?
Also, it won't be posted if there is an error above this (e.g. query fails
to start).
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2995,6 +3020,7 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
throw new IllegalArgumentException("Missing command in
StreamingQueryManagerCommand")
}
+ executeHolder.events.postFinished()
Review Comment:
Same.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2852,6 +2877,7 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
throw new IllegalArgumentException("Missing command in
StreamingQueryCommand")
}
+ executeHolder.events.postFinished()
Review Comment:
Again, it is not clear to me why we want to do this for each handler
separately. But if you think this is the right way to do it, go head.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]