rangadi commented on code in PR #40586:
URL: https://github.com/apache/spark/pull/40586#discussion_r1156583318


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1969,6 +2017,142 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
   }
 
+  def handleWriteStreamOperationStart(
+      writeOp: WriteStreamOperationStart,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    val plan = transformRelation(writeOp.getInput)
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val writer = dataset.writeStream
+
+    if (writeOp.getFormat.nonEmpty) {
+      writer.format(writeOp.getFormat)
+    }
+
+    writer.options(writeOp.getOptionsMap)
+
+    if (writeOp.getPartitioningColumnNamesCount > 0) {
+      
writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*)
+    }
+
+    writeOp.getTriggerCase match {
+      case TriggerCase.PROCESSING_TIME_INTERVAL =>
+        
writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval))
+      case TriggerCase.AVAILABLE_NOW =>
+        writer.trigger(Trigger.AvailableNow())
+      case TriggerCase.ONE_TIME =>
+        writer.trigger(Trigger.Once())
+      case TriggerCase.CONTINUOUS_CHECKPOINT_INTERVAL =>
+        
writer.trigger(Trigger.Continuous(writeOp.getContinuousCheckpointInterval))
+      case TriggerCase.TRIGGER_NOT_SET =>
+    }
+
+    if (writeOp.getOutputMode.nonEmpty) {
+      writer.outputMode(writeOp.getOutputMode)
+    }
+
+    if (writeOp.getQueryName.nonEmpty) {
+      writer.queryName(writeOp.getQueryName)
+    }
+
+    val query = writeOp.getPath match {
+      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+      case "" => writer.start()
+      case path => writer.start(path)
+    }
+
+    val result = WriteStreamOperationStartResult
+      .newBuilder()
+      .setQueryId(query.id.toString)
+      .setRunId(query.runId.toString)
+      .setName(Option(query.name).getOrElse(""))
+      .build()
+
+    responseObserver.onNext(
+      ExecutePlanResponse
+        .newBuilder()
+        .setSessionId(sessionId)
+        .setWriteStreamOperationStartResult(result)
+        .build())
+  }
+
+  def handleStreamingQueryCommand(
+      command: StreamingQueryCommand,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+
+    val queryId = command.getQueryId
+
+    val respBuilder = StreamingQueryCommandResult
+      .newBuilder()
+      .setQueryId(command.getQueryId)
+
+    val query = Option(session.streams.get(queryId)) match {
+      case Some(query) if query.runId.toString == command.getRunId =>
+        query
+      case Some(query) =>
+        throw new IllegalArgumentException(
+          s"Run id mismatch for query id $queryId. Run id in the request 
${command.getRunId} " +
+            s"does not match one on the server ${query.runId}. The query might 
have restarted.")
+      case None =>
+        throw new IllegalArgumentException(s"Streaming query $queryId is not 
found")
+      // TODO(SPARK-42962): Handle this better. May be cache stopped queries 
for a few minutes.
+    }
+
+    command.getCommandTypeCase match {
+      case StreamingQueryCommand.CommandTypeCase.STATUS =>
+        val recentProgress: Seq[String] = 
command.getStatus.getRecentProgressLimit match {
+          case 0 => Seq.empty
+          case limit if limit < 0 =>
+            query.recentProgress.map(_.json) // All the cached progresses.
+          case limit => query.recentProgress.takeRight(limit).map(_.json) // 
Most recent
+        }
+
+        val queryStatus = query.status
+
+        val statusResult = StreamingQueryCommandResult.StatusResult
+          .newBuilder()
+          .setStatusMessage(queryStatus.message)
+          .setIsDataAvailable(queryStatus.isDataAvailable)
+          .setIsTriggerActive(queryStatus.isTriggerActive)
+          .setIsActive(query.isActive)
+          .addAllRecentProgressJson(recentProgress.asJava)
+          .build()
+
+        respBuilder.setStatus(statusResult)
+
+      case StreamingQueryCommand.CommandTypeCase.STOP =>
+        query.stop()
+
+      case StreamingQueryCommand.CommandTypeCase.PROCESS_ALL_AVAILABLE =>
+        query.processAllAvailable()

Review Comment:
   Left a comment. We need to handle this better as part of session management 
improvements. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to