rangadi commented on code in PR #41443:
URL: https://github.com/apache/spark/pull/41443#discussion_r1264121801


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2640,15 +2663,16 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
         throw new UnsupportedOperationException(
           s"WriteOperationV2:ModeValue not supported 
${writeOperation.getModeValue}")
     }
+    executeHolder.events.postFinished()
   }
 
   def handleWriteStreamOperationStart(
       writeOp: WriteStreamOperationStart,
-      userId: String,
-      sessionId: String,
-      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+      responseObserver: StreamObserver[ExecutePlanResponse],
+      executeHolder: ExecuteHolder): Unit = {
     val plan = transformRelation(writeOp.getInput)
-    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+    val tracker = executeHolder.events.createQueryPlanningTracker
+    val dataset = Dataset.ofRows(session, plan, tracker)

Review Comment:
   Sure. This one excludes sink set up and actual start of the streaming query 
(i.e. checking checkpoint location if any etc). 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2726,6 +2750,7 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
     SparkConnectService.streamingSessionManager.registerNewStreamingQuery(
       sessionHolder = SessionHolder(userId = userId, sessionId = sessionId, 
session),
       query = query)
+    executeHolder.events.postFinished()

Review Comment:
   What is this? If it is just post of RPC, can we do this in higher in the 
call stack?
   Also, it won't be posted if there is an error above this (e.g. query fails 
to start).



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2995,6 +3020,7 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
         throw new IllegalArgumentException("Missing command in 
StreamingQueryManagerCommand")
     }
 
+    executeHolder.events.postFinished()

Review Comment:
   Same. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2852,6 +2877,7 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
         throw new IllegalArgumentException("Missing command in 
StreamingQueryCommand")
     }
 
+    executeHolder.events.postFinished()

Review Comment:
   Again, it is not clear to me why we want to do this for each handler 
separately. But if you think this is the right way to do it, go head.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to