rangadi commented on code in PR #40586:
URL: https://github.com/apache/spark/pull/40586#discussion_r1156569175


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,118 @@ message WriteOperationV2 {
   // (Optional) A condition for overwrite saving mode
   Expression overwrite_condition = 8;
 }
+
+// Starts write stream operation as streaming query. Query ID and Run ID of 
the streaming
+// query are returned.
+message WriteStreamOperationStart {
+
+  // (Required) The output of the `input` streaming relation will be written.
+  Relation input = 1;
+
+  // The following fields directly map to API for DataStreamWriter().
+  // Consult API documentation unless explicitly documented here.
+
+  string format = 2;
+  map<string, string> options = 3;
+  repeated string partitioning_column_names = 4;
+
+  oneof trigger {
+    string processing_time_interval = 5;
+    bool available_now = 6;
+    bool one_time = 7;
+    string continuous_checkpoint_interval = 8;
+  }
+
+  string output_mode = 9;
+  string query_name = 10;
+
+  // The destination is optional. When set, it can be a path or a table name.
+  oneof sink_destination {
+    string path = 11;
+    string table_name = 12;
+  }
+}
+
+message WriteStreamOperationStartResult {
+
+  // (Required)
+  string query_id = 1;
+
+  // (Required)
+  string run_id = 2;
+
+  // An optional query name.
+  string name = 3;
+
+  // TODO: How do we indicate errors?
+  // TODO: Consider adding StreamingQueryStatusResult here.
+}
+
+// Commands for a streaming query.
+message StreamingQueryCommand {
+
+  // (Required) query id of the streaming query.
+  string query_id = 1;
+  // (Required) run id of the streaming query.
+  string run_id = 2;
+
+  // A running query is identified by both run_id and query_id.
+
+  oneof command_type {
+    // Status of the query. Used to support multiple status related API like 
lastProgress().
+    StatusCommand status = 3;
+    // Stops the query.
+    bool stop = 4;
+    // Waits till all the available data is processed. See 
processAllAvailable() API doc.
+    bool process_all_available = 5;
+    // Returns logical and physical plans.
+    ExplainCommand explain = 6;
+
+    // TODO(SPARK-42960) Add more commands: await_termination(), exception() 
etc.
+  }
+
+  message StatusCommand {
+    // A limit on how many progress reports to return.
+    int32 recent_progress_limit = 1;
+  }
+
+  message ExplainCommand {
+    // TODO: Consider reusing Explain from AnalyzePlanRequest message.
+    //       We can not do this right now since it base.proto imports this 
file.
+    bool extended = 1;
+  }
+
+}
+
+// Response for commands on a streaming query.
+message StreamingQueryCommandResult {
+  // (Required)
+  string query_id = 1;
+
+  oneof result_type {
+    StatusResult status = 2;
+    ExplainResult explain = 3;
+  }
+
+  message StatusResult {
+    // This status includes all the available to status, including progress 
messages.
+
+    // Fields from Scala 'StreamingQueryStatus' struct
+    string status_message = 1;
+    bool is_data_available = 2;
+    bool is_trigger_active = 3;
+
+    bool is_active = 4;
+
+    // Progress reports as an array of json strings.
+    repeated string recent_progress_json = 5;

Review Comment:
   Progress is sent only when requested (this keeps the number of RPCs lower). 
See `recent_progress_limit` in `StatusComand` above. 
   Let me know if you prefer to split these into multiple commands in this PR. 



##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import sys
+from typing import TYPE_CHECKING, Any, cast, Dict, List, Optional
+
+from pyspark.errors import StreamingQueryException
+import pyspark.sql.connect.proto as pb2
+from pyspark.sql.streaming.query import (
+    StreamingQuery as PySparkStreamingQuery,
+)
+
+__all__ = [
+    "StreamingQuery",  # TODO(WIP): "StreamingQueryManager"
+]
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.session import SparkSession
+
+
+class StreamingQuery:
+
+    def __init__(self, session: "SparkSession", queryId: str, runId: str, 
name: Optional[str] = None) -> None:
+        self._session = session
+        self._query_id = queryId
+        self._run_id = runId
+        self._name = name
+
+    @property
+    def id(self) -> str:
+        return self._query_id
+
+    id.__doc__ = PySparkStreamingQuery.id.__doc__
+
+    @property
+    def runId(self) -> str:
+        return self._run_id
+
+    runId.__doc__ = PySparkStreamingQuery.runId.__doc__
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    name.__doc__ = PySparkStreamingQuery.name.__doc__
+
+    @property
+    def isActive(self) -> bool:
+        return self._fetch_status().is_active
+
+    isActive.__doc__ = PySparkStreamingQuery.isActive.__doc__
+
+    def awaitTermination(self, timeout: Optional[int] = None) -> 
Optional[bool]:
+        raise NotImplementedError()
+
+    awaitTermination.__doc__ = PySparkStreamingQuery.awaitTermination.__doc__
+
+    @property
+    def status(self) -> Dict[str, Any]:
+        proto = self._fetch_status()
+        return {
+            "message": proto.status_message,
+            "isDataAvailable": proto.is_data_available,
+            "isTriggerActive": proto.is_trigger_active
+        }
+
+    status.__doc__ = PySparkStreamingQuery.status.__doc__
+
+    @property
+    def recentProgress(self) -> List[Dict[str, Any]]:
+        progress = 
list(self._fetch_status(recent_progress_limit=-1).recent_progress_json)
+        return [json.loads(p) for p in progress]
+
+    recentProgress.__doc__ = PySparkStreamingQuery.recentProgress.__doc__
+
+    @property
+    def lastProgress(self) -> Optional[Dict[str, Any]]:
+        progress = 
list(self._fetch_status(recent_progress_limit=1).recent_progress_json)
+        if len(progress) > 0:
+            return json.loads(progress[-1])
+        else:
+            return None
+
+    lastProgress.__doc__ = PySparkStreamingQuery.lastProgress.__doc__
+
+    def processAllAvailable(self) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.process_all_available = True
+        self._execute_streaming_query_cmd(cmd)
+
+    processAllAvailable.__doc__ = 
PySparkStreamingQuery.processAllAvailable.__doc__
+
+    def stop(self) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.stop = True
+        self._execute_streaming_query_cmd(cmd)
+
+    stop.__doc__ = PySparkStreamingQuery.stop.__doc__
+
+    def explain(self, extended: bool = False) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.explain.extended = extended
+        result = self._execute_streaming_query_cmd(cmd).explain.result
+        print(result)
+
+    processAllAvailable.__doc__ = 
PySparkStreamingQuery.processAllAvailable.__doc__
+
+    def exception(self) -> Optional[StreamingQueryException]:
+        raise NotImplementedError()
+
+    processAllAvailable.__doc__ = 
PySparkStreamingQuery.processAllAvailable.__doc__

Review Comment:
   yes. Fixed. 



##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import sys
+from typing import TYPE_CHECKING, Any, cast, Dict, List, Optional
+
+from pyspark.errors import StreamingQueryException
+import pyspark.sql.connect.proto as pb2
+from pyspark.sql.streaming.query import (
+    StreamingQuery as PySparkStreamingQuery,
+)
+
+__all__ = [
+    "StreamingQuery",  # TODO(WIP): "StreamingQueryManager"
+]
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.session import SparkSession
+
+
+class StreamingQuery:
+
+    def __init__(self, session: "SparkSession", queryId: str, runId: str, 
name: Optional[str] = None) -> None:
+        self._session = session
+        self._query_id = queryId
+        self._run_id = runId
+        self._name = name
+
+    @property
+    def id(self) -> str:
+        return self._query_id
+
+    id.__doc__ = PySparkStreamingQuery.id.__doc__
+
+    @property
+    def runId(self) -> str:
+        return self._run_id
+
+    runId.__doc__ = PySparkStreamingQuery.runId.__doc__
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    name.__doc__ = PySparkStreamingQuery.name.__doc__
+
+    @property
+    def isActive(self) -> bool:
+        return self._fetch_status().is_active
+
+    isActive.__doc__ = PySparkStreamingQuery.isActive.__doc__
+
+    def awaitTermination(self, timeout: Optional[int] = None) -> 
Optional[bool]:
+        raise NotImplementedError()
+
+    awaitTermination.__doc__ = PySparkStreamingQuery.awaitTermination.__doc__
+
+    @property
+    def status(self) -> Dict[str, Any]:
+        proto = self._fetch_status()
+        return {
+            "message": proto.status_message,
+            "isDataAvailable": proto.is_data_available,
+            "isTriggerActive": proto.is_trigger_active
+        }
+
+    status.__doc__ = PySparkStreamingQuery.status.__doc__
+
+    @property
+    def recentProgress(self) -> List[Dict[str, Any]]:
+        progress = 
list(self._fetch_status(recent_progress_limit=-1).recent_progress_json)

Review Comment:
   Changed it to 10000 and added a comment. Server only keeps 100.



##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import sys
+from typing import TYPE_CHECKING, Any, cast, Dict, List, Optional
+
+from pyspark.errors import StreamingQueryException
+import pyspark.sql.connect.proto as pb2
+from pyspark.sql.streaming.query import (
+    StreamingQuery as PySparkStreamingQuery,
+)
+
+__all__ = [
+    "StreamingQuery",  # TODO(WIP): "StreamingQueryManager"
+]
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.session import SparkSession
+
+
+class StreamingQuery:
+
+    def __init__(self, session: "SparkSession", queryId: str, runId: str, 
name: Optional[str] = None) -> None:
+        self._session = session
+        self._query_id = queryId
+        self._run_id = runId
+        self._name = name
+
+    @property
+    def id(self) -> str:
+        return self._query_id
+
+    id.__doc__ = PySparkStreamingQuery.id.__doc__
+
+    @property
+    def runId(self) -> str:
+        return self._run_id
+
+    runId.__doc__ = PySparkStreamingQuery.runId.__doc__
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    name.__doc__ = PySparkStreamingQuery.name.__doc__
+
+    @property
+    def isActive(self) -> bool:
+        return self._fetch_status().is_active
+
+    isActive.__doc__ = PySparkStreamingQuery.isActive.__doc__
+
+    def awaitTermination(self, timeout: Optional[int] = None) -> 
Optional[bool]:
+        raise NotImplementedError()
+
+    awaitTermination.__doc__ = PySparkStreamingQuery.awaitTermination.__doc__
+
+    @property
+    def status(self) -> Dict[str, Any]:
+        proto = self._fetch_status()
+        return {
+            "message": proto.status_message,
+            "isDataAvailable": proto.is_data_available,
+            "isTriggerActive": proto.is_trigger_active
+        }
+
+    status.__doc__ = PySparkStreamingQuery.status.__doc__
+
+    @property
+    def recentProgress(self) -> List[Dict[str, Any]]:
+        progress = 
list(self._fetch_status(recent_progress_limit=-1).recent_progress_json)
+        return [json.loads(p) for p in progress]
+
+    recentProgress.__doc__ = PySparkStreamingQuery.recentProgress.__doc__
+
+    @property
+    def lastProgress(self) -> Optional[Dict[str, Any]]:
+        progress = 
list(self._fetch_status(recent_progress_limit=1).recent_progress_json)
+        if len(progress) > 0:
+            return json.loads(progress[-1])
+        else:
+            return None
+
+    lastProgress.__doc__ = PySparkStreamingQuery.lastProgress.__doc__
+
+    def processAllAvailable(self) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.process_all_available = True
+        self._execute_streaming_query_cmd(cmd)
+
+    processAllAvailable.__doc__ = 
PySparkStreamingQuery.processAllAvailable.__doc__
+
+    def stop(self) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.stop = True
+        self._execute_streaming_query_cmd(cmd)
+
+    stop.__doc__ = PySparkStreamingQuery.stop.__doc__
+
+    def explain(self, extended: bool = False) -> None:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.explain.extended = extended
+        result = self._execute_streaming_query_cmd(cmd).explain.result
+        print(result)
+
+    processAllAvailable.__doc__ = 
PySparkStreamingQuery.processAllAvailable.__doc__

Review Comment:
   yes. Fixed both. 



##########
python/pyspark/sql/connect/streaming/readwriter.py:
##########
@@ -0,0 +1,485 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.sql.connect.utils import check_dependencies
+
+check_dependencies(__name__)
+
+import sys
+from typing import cast, overload, Callable, Dict, List, Optional, 
TYPE_CHECKING, Union
+
+from pyspark.sql.connect.plan import DataSource, LogicalPlan, 
WriteStreamOperation
+import pyspark.sql.connect.proto as pb2
+from pyspark.sql.connect.readwriter import OptionUtils, to_str
+from pyspark.sql.connect.streaming.query import StreamingQuery
+from pyspark.sql.streaming.readwriter import (
+    DataStreamReader as PySparkDataStreamReader,
+    DataStreamWriter as PySparkDataStreamWriter,
+)
+from pyspark.sql.types import Row, StructType
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.session import SparkSession
+    from pyspark.sql.connect._typing import OptionalPrimitiveType
+    from pyspark.sql.connect.dataframe import DataFrame
+
+__all__ = ["DataStreamReader", "DataStreamWriter"]
+
+
+class DataStreamReader(OptionUtils):
+
+    def __init__(self, client: "SparkSession") -> None:
+        self._format: Optional[str] = None
+        self._schema = ""
+        self._client = client
+        self._options: Dict[str, str] = {}
+
+    def _df(self, plan: LogicalPlan) -> "DataFrame":
+        from pyspark.sql.connect.dataframe import DataFrame
+
+        return DataFrame.withPlan(plan, self._client)
+
+    def format(self, source: str) -> "DataStreamReader":
+        self._format = source
+        return self
+
+    format.__doc__ = PySparkDataStreamReader.format.__doc__
+
+    def schema(self, schema: Union[StructType, str]) -> "DataStreamReader":
+        if isinstance(schema, StructType):
+            self._schema = schema.json()
+        elif isinstance(schema, str):
+            self._schema = schema
+        else:
+            raise TypeError("schema should be StructType or string")
+        return self
+
+    schema.__doc__ = PySparkDataStreamReader.schema.__doc__
+
+    def option(self, key: str, value: "OptionalPrimitiveType") -> 
"DataStreamReader":
+        self._options[key] = str(value)
+        return self
+
+    option.__doc__ = PySparkDataStreamReader.option.__doc__
+
+    def options(self, **options: "OptionalPrimitiveType") -> 
"DataStreamReader":
+        for k in options:
+            self.option(k, to_str(options[k]))
+        return self
+
+    options.__doc__ = PySparkDataStreamReader.options.__doc__
+
+    def load(
+        self,
+        path: Optional[str] = None,
+        format: Optional[str] = None,
+        schema: Optional[Union[StructType, str]] = None,
+        **options: "OptionalPrimitiveType",
+    ) -> "DataFrame":
+        if format is not None:
+            self.format(format)
+        if schema is not None:
+            self.schema(schema)
+        self.options(**options)
+        if path is not None:
+            if type(path) != str or len(path.strip()) == 0:
+                raise ValueError(
+                    "If the path is provided for stream, it needs to be a "
+                    + "non-empty string. List of paths are not supported."
+                )
+
+        plan = DataSource(
+            format=self._format,
+            schema=self._schema,
+            options=self._options,
+            paths=[path] if path else None,
+            is_streaming=True
+        )
+
+        return self._df(plan)
+
+    load.__doc__ = PySparkDataStreamReader.load.__doc__
+
+    def json(
+        self,
+        path: str,
+        schema: Optional[Union[StructType, str]] = None,
+        primitivesAsString: Optional[Union[bool, str]] = None,
+        prefersDecimal: Optional[Union[bool, str]] = None,
+        allowComments: Optional[Union[bool, str]] = None,
+        allowUnquotedFieldNames: Optional[Union[bool, str]] = None,
+        allowSingleQuotes: Optional[Union[bool, str]] = None,
+        allowNumericLeadingZero: Optional[Union[bool, str]] = None,
+        allowBackslashEscapingAnyCharacter: Optional[Union[bool, str]] = None,
+        mode: Optional[str] = None,
+        columnNameOfCorruptRecord: Optional[str] = None,
+        dateFormat: Optional[str] = None,
+        timestampFormat: Optional[str] = None,
+        multiLine: Optional[Union[bool, str]] = None,
+        allowUnquotedControlChars: Optional[Union[bool, str]] = None,
+        lineSep: Optional[str] = None,
+        locale: Optional[str] = None,
+        dropFieldIfAllNull: Optional[Union[bool, str]] = None,
+        encoding: Optional[str] = None,
+        pathGlobFilter: Optional[Union[bool, str]] = None,
+        recursiveFileLookup: Optional[Union[bool, str]] = None,
+        allowNonNumericNumbers: Optional[Union[bool, str]] = None,
+    ) -> "DataFrame":
+        self._set_opts(
+            schema=schema,
+            primitivesAsString=primitivesAsString,
+            prefersDecimal=prefersDecimal,
+            allowComments=allowComments,
+            allowUnquotedFieldNames=allowUnquotedFieldNames,
+            allowSingleQuotes=allowSingleQuotes,
+            allowNumericLeadingZero=allowNumericLeadingZero,
+            
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+            mode=mode,
+            columnNameOfCorruptRecord=columnNameOfCorruptRecord,
+            dateFormat=dateFormat,
+            timestampFormat=timestampFormat,
+            multiLine=multiLine,
+            allowUnquotedControlChars=allowUnquotedControlChars,
+            lineSep=lineSep,
+            locale=locale,
+            dropFieldIfAllNull=dropFieldIfAllNull,
+            encoding=encoding,
+            pathGlobFilter=pathGlobFilter,
+            recursiveFileLookup=recursiveFileLookup,
+            allowNonNumericNumbers=allowNonNumericNumbers,
+        )
+        if isinstance(path, str):
+            return self.load(path=path, format="json")
+        else:
+            raise TypeError("path can be only a single string")
+
+    json.__doc__ = PySparkDataStreamReader.json.__doc__
+
+    # def orc() TODO
+    # def parquet() TODO
+    # def text() TODO
+
+    def csv(
+        self,
+        path: str,
+        schema: Optional[Union[StructType, str]] = None,
+        sep: Optional[str] = None,
+        encoding: Optional[str] = None,
+        quote: Optional[str] = None,
+        escape: Optional[str] = None,
+        comment: Optional[str] = None,
+        header: Optional[Union[bool, str]] = None,
+        inferSchema: Optional[Union[bool, str]] = None,
+        ignoreLeadingWhiteSpace: Optional[Union[bool, str]] = None,
+        ignoreTrailingWhiteSpace: Optional[Union[bool, str]] = None,
+        nullValue: Optional[str] = None,
+        nanValue: Optional[str] = None,
+        positiveInf: Optional[str] = None,
+        negativeInf: Optional[str] = None,
+        dateFormat: Optional[str] = None,
+        timestampFormat: Optional[str] = None,
+        maxColumns: Optional[Union[int, str]] = None,
+        maxCharsPerColumn: Optional[Union[int, str]] = None,
+        maxMalformedLogPerPartition: Optional[Union[int, str]] = None,
+        mode: Optional[str] = None,
+        columnNameOfCorruptRecord: Optional[str] = None,
+        multiLine: Optional[Union[bool, str]] = None,
+        charToEscapeQuoteEscaping: Optional[Union[bool, str]] = None,
+        enforceSchema: Optional[Union[bool, str]] = None,
+        emptyValue: Optional[str] = None,
+        locale: Optional[str] = None,
+        lineSep: Optional[str] = None,
+        pathGlobFilter: Optional[Union[bool, str]] = None,
+        recursiveFileLookup: Optional[Union[bool, str]] = None,
+        unescapedQuoteHandling: Optional[str] = None,
+    ) -> "DataFrame":
+        self._set_opts(
+            schema=schema,
+            sep=sep,
+            encoding=encoding,
+            quote=quote,
+            escape=escape,
+            comment=comment,
+            header=header,
+            inferSchema=inferSchema,
+            ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+            ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
+            nullValue=nullValue,
+            nanValue=nanValue,
+            positiveInf=positiveInf,
+            negativeInf=negativeInf,
+            dateFormat=dateFormat,
+            timestampFormat=timestampFormat,
+            maxColumns=maxColumns,
+            maxCharsPerColumn=maxCharsPerColumn,
+            maxMalformedLogPerPartition=maxMalformedLogPerPartition,
+            mode=mode,
+            columnNameOfCorruptRecord=columnNameOfCorruptRecord,
+            multiLine=multiLine,
+            charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
+            enforceSchema=enforceSchema,
+            emptyValue=emptyValue,
+            locale=locale,
+            lineSep=lineSep,
+            pathGlobFilter=pathGlobFilter,
+            recursiveFileLookup=recursiveFileLookup,
+            unescapedQuoteHandling=unescapedQuoteHandling,
+        )
+        if isinstance(path, str):
+            return self.load(path=path, format="csv")
+        else:
+            raise TypeError("path can be only a single string")
+
+    csv.__doc__ = PySparkDataStreamReader.csv.__doc__
+
+    # def table() TODO
+
+
+DataStreamReader.__doc__ = PySparkDataStreamReader.__doc__
+
+
+class DataStreamWriter:
+
+    def __init__(self, plan: "LogicalPlan", session: "SparkSession") -> None:
+        self._session = session
+        self._write_stream = WriteStreamOperation(plan)
+        self._write_proto = self._write_stream.write_op
+
+    def outputMode(self, outputMode: str) -> "DataStreamWriter":
+        self._write_proto.output_mode = outputMode
+        return self
+
+    outputMode.__doc__ = PySparkDataStreamWriter.outputMode.__doc__
+
+    def format(self, source: str) -> "DataStreamWriter":
+        self._write_proto.format = source
+        return self
+
+    format.__doc__ = PySparkDataStreamWriter.format.__doc__
+
+    def option(self, key: str, value: "OptionalPrimitiveType") -> 
"DataStreamWriter":
+        self._write_proto.options[key] = to_str(value)
+        return self
+
+    option.__doc__ = PySparkDataStreamWriter.option.__doc__
+    
+    def options(self, **options: "OptionalPrimitiveType") -> 
"DataStreamWriter":
+        for k in options:
+            self.option(k, options[k])        
+        return self
+
+    options.__doc__ = PySparkDataStreamWriter.options.__doc__
+    
+    @overload
+    def partitionBy(self, *cols: str) -> "DataStreamWriter":
+        ...
+
+    @overload
+    def partitionBy(self, __cols: List[str]) -> "DataStreamWriter":
+        ...
+
+    def partitionBy(self, *cols: str) -> "DataStreamWriter":  # type: 
ignore[misc]
+        if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+            cols = cols[0]
+        self._write_proto.partitioning_cols = cast(List[str], cols)
+        return self
+
+    partitionBy.__doc__ = PySparkDataStreamWriter.partitionBy.__doc__
+
+    def queryName(self, queryName: str) -> "DataStreamWriter":
+        self._write_proto.query_name = queryName
+        return self
+
+    queryName.__doc__ = PySparkDataStreamWriter.queryName.__doc__
+
+    @overload
+    def trigger(self, *, processingTime: str) -> "DataStreamWriter":
+        ...
+
+    @overload
+    def trigger(self, *, once: bool) -> "DataStreamWriter":
+        ...
+
+    @overload
+    def trigger(self, *, continuous: str) -> "DataStreamWriter":
+        ...
+
+    @overload
+    def trigger(self, *, availableNow: bool) -> "DataStreamWriter":
+        ...
+
+    def trigger(
+        self,
+        *,
+        processingTime: Optional[str] = None,
+        once: Optional[bool] = None,
+        continuous: Optional[str] = None,
+        availableNow: Optional[bool] = None,
+    ) -> "DataStreamWriter":
+        params = [processingTime, once, continuous, availableNow]
+
+        if params.count(None) == 4:
+            raise ValueError("No trigger provided")
+        elif params.count(None) < 3:
+            raise ValueError("Multiple triggers not allowed.")
+
+        if processingTime is not None:
+            if type(processingTime) != str or len(processingTime.strip()) == 0:
+                raise ValueError(
+                    "Value for processingTime must be a non empty string. Got: 
%s" % processingTime
+                )
+            self._write_proto.processing_time_interval = processingTime.strip()
+
+        elif once is not None:
+            if once is not True:
+                raise ValueError("Value for once must be True. Got: %s" % once)

Review Comment:
   This is copied from existing implementation. The interface is only for 
setting a trigger. `once=false` is not needed and can't be supported. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to