juliuszsompolski commented on code in PR #41443:
URL: https://github.com/apache/spark/pull/41443#discussion_r1264108802
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2726,6 +2750,7 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
SparkConnectService.streamingSessionManager.registerNewStreamingQuery(
sessionHolder = SessionHolder(userId = userId, sessionId = sessionId,
session),
query = query)
+ executeHolder.events.postFinished()
Review Comment:
@rangadi other streaming commands will just be posting a "started" and
"finished" event, with no insight inside them
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2852,6 +2877,7 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
throw new IllegalArgumentException("Missing command in
StreamingQueryCommand")
}
+ executeHolder.events.postFinished()
Review Comment:
@rangadi
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2640,15 +2663,16 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
throw new UnsupportedOperationException(
s"WriteOperationV2:ModeValue not supported
${writeOperation.getModeValue}")
}
+ executeHolder.events.postFinished()
}
def handleWriteStreamOperationStart(
writeOp: WriteStreamOperationStart,
- userId: String,
- sessionId: String,
- responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+ responseObserver: StreamObserver[ExecutePlanResponse],
+ executeHolder: ExecuteHolder): Unit = {
val plan = transformRelation(writeOp.getInput)
- val dataset = Dataset.ofRows(session, logicalPlan = plan)
+ val tracker = executeHolder.events.createQueryPlanningTracker
+ val dataset = Dataset.ofRows(session, plan, tracker)
Review Comment:
@rangadi the stream operation start will measure and emit the phases of
analyzing the the query plan
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2995,6 +3020,7 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
throw new IllegalArgumentException("Missing command in
StreamingQueryManagerCommand")
}
+ executeHolder.events.postFinished()
Review Comment:
@rangadi
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]