pengzhon-db commented on code in PR #40586:
URL: https://github.com/apache/spark/pull/40586#discussion_r1157815748


##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -0,0 +1,161 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import sys
+from typing import TYPE_CHECKING, Any, cast, Dict, List, Optional
+
+from pyspark.errors import StreamingQueryException
+import pyspark.sql.connect.proto as pb2
+from pyspark.sql.streaming.query import (
+    StreamingQuery as PySparkStreamingQuery,
+)
+
+__all__ = [
+    "StreamingQuery",  # TODO(SPARK-43032): "StreamingQueryManager"
+]
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.session import SparkSession
+
+
+class StreamingQuery:
+    def __init__(
+        self, session: "SparkSession", queryId: str, runId: str, name: 
Optional[str] = None
+    ) -> None:
+        self._session = session
+        self._query_id = queryId
+        self._run_id = runId
+        self._name = name
+
+    @property
+    def id(self) -> str:
+        return self._query_id
+
+    id.__doc__ = PySparkStreamingQuery.id.__doc__
+
+    @property
+    def runId(self) -> str:
+        return self._run_id
+
+    runId.__doc__ = PySparkStreamingQuery.runId.__doc__
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    name.__doc__ = PySparkStreamingQuery.name.__doc__
+
+    @property
+    def isActive(self) -> bool:
+        return self._fetch_status().is_active
+
+    isActive.__doc__ = PySparkStreamingQuery.isActive.__doc__
+
+    def awaitTermination(self, timeout: Optional[int] = None) -> 
Optional[bool]:
+        raise NotImplementedError()
+
+    awaitTermination.__doc__ = PySparkStreamingQuery.awaitTermination.__doc__
+
+    @property
+    def status(self) -> Dict[str, Any]:
+        proto = self._fetch_status()
+        return {
+            "message": proto.status_message,
+            "isDataAvailable": proto.is_data_available,
+            "isTriggerActive": proto.is_trigger_active,
+        }
+
+    status.__doc__ = PySparkStreamingQuery.status.__doc__
+
+    @property
+    def recentProgress(self) -> List[Dict[str, Any]]:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.recent_progress = True
+        progress = 
self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
+        return [json.loads(p) for p in progress]
+
+    recentProgress.__doc__ = PySparkStreamingQuery.recentProgress.__doc__
+
+    @property
+    def lastProgress(self) -> Optional[Dict[str, Any]]:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.last_progress = True
+        progress = 
self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
+        if len(progress) > 0:
+            return json.loads(progress[-1])
+        else:
+            return None
+
+    lastProgress.__doc__ = PySparkStreamingQuery.lastProgress.__doc__
+
+    def processAllAvailable(self) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.process_all_available = True
+        self._execute_streaming_query_cmd(cmd)
+
+    processAllAvailable.__doc__ = 
PySparkStreamingQuery.processAllAvailable.__doc__
+
+    def stop(self) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.stop = True
+        self._execute_streaming_query_cmd(cmd)
+
+    stop.__doc__ = PySparkStreamingQuery.stop.__doc__
+
+    def explain(self, extended: bool = False) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.explain.extended = extended
+        result = self._execute_streaming_query_cmd(cmd).explain.result
+        print(result)
+
+    explain.__doc__ = PySparkStreamingQuery.explain.__doc__
+
+    def exception(self) -> Optional[StreamingQueryException]:
+        raise NotImplementedError()
+
+    exception.__doc__ = PySparkStreamingQuery.exception.__doc__
+
+    def _fetch_status(
+        self, recent_progress_limit=0

Review Comment:
   `recent_progress_limit` is not used



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,126 @@ message WriteOperationV2 {
   // (Optional) A condition for overwrite saving mode
   Expression overwrite_condition = 8;
 }
+
+// Starts write stream operation as streaming query. Query ID and Run ID of 
the streaming
+// query are returned.
+message WriteStreamOperationStart {
+
+  // (Required) The output of the `input` streaming relation will be written.
+  Relation input = 1;
+
+  // The following fields directly map to API for DataStreamWriter().
+  // Consult API documentation unless explicitly documented here.
+
+  string format = 2;
+  map<string, string> options = 3;
+  repeated string partitioning_column_names = 4;
+
+  oneof trigger {
+    string processing_time_interval = 5;
+    bool available_now = 6;
+    bool once = 7;
+    string continuous_checkpoint_interval = 8;
+  }
+
+  string output_mode = 9;
+  string query_name = 10;
+
+  // The destination is optional. When set, it can be a path or a table name.
+  oneof sink_destination {
+    string path = 11;
+    string table_name = 12;
+  }
+}
+
+message WriteStreamOperationStartResult {
+
+  // (Required) Query instance. See `StreamingQueryInstanceId`.
+  StreamingQueryInstanceId query_id = 1;
+
+  // An optional query name.
+  string name = 2;
+
+  // TODO: How do we indicate errors?
+  // TODO: Consider adding status, last progress etc here.
+}
+
+// A tuple that uniquely identifies an instance of streaming query run. It 
consists of `id` that
+// persists across the streaming runs and `run_id` that changes between each 
run of the
+// streaming query that resumes from the checkpoint.
+message StreamingQueryInstanceId {
+
+  // (Required) The unique id of this query that persists across restarts from 
checkpoint data.
+  // That is, this id is generated when a query is started for the first time, 
and
+  // will be the same every time it is restarted from checkpoint data.
+  string id = 1;
+
+  // (Required) The unique id of this run of the query. That is, every 
start/restart of a query
+  // will generate a unique run_id. Therefore, every time a query is restarted 
from
+  // checkpoint, it will have the same `id` but different `run_id`s.
+  string run_id = 2;
+}
+
+// Commands for a streaming query.
+message StreamingQueryCommand {
+
+  // (Required) Query instance. See `StreamingQueryInstanceId`.
+  StreamingQueryInstanceId query_id = 1;
+
+  oneof command {
+    // See documentation for the corresponding API method in StreamingQuery.
+
+    // status() API.
+    bool status = 2;
+    // lastProgress() API.
+    bool last_progress = 3;
+    // recentProgress() API.
+    bool recent_progress = 4;
+    // stop() API. Stops the query.
+    bool stop = 5;
+    // processAllAvailable() API. Waits till all the available data is 
processed
+    bool process_all_available = 6;
+    // explain() API. Returns logical and physical plans.
+    ExplainCommand explain = 7;
+
+    // TODO(SPARK-42960) Add more commands: await_termination(), exception() 
etc.
+  }
+
+  message ExplainCommand {
+    // TODO: Consider reusing Explain from AnalyzePlanRequest message.
+    //       We can not do this right now since it base.proto imports this 
file.
+    bool extended = 1;
+  }
+

Review Comment:
   nit: remove empty line



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -723,6 +724,33 @@ def to_jcols(
 
     melt = unpivot
 
+    def withWatermark(self, eventTime: str, delayThreshold: str) -> 
"DataFrame":
+        # TODO: reuse error handling code in sql.DataFrame.withWatermark()
+        if not eventTime or type(eventTime) is not str:

Review Comment:
   nit: why not use `isinstance(eventTime, str)` as other place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to