allisonwang-db commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1531233108
##########
python/pyspark/sql/datasource.py:
##########
@@ -513,6 +536,71 @@ def abort(self, messages: List["WriterCommitMessage"]) ->
None:
...
+class DataSourceStreamWriter(ABC):
+ """
+ A base class for data stream writers. Data stream writers are responsible
for writing
+ the data to the streaming sink.
+
+ .. versionadded: 4.0.0
+ """
+
+ @abstractmethod
+ def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+ """
+ Writes data into the streaming sink.
+
+ This method is called on executors to write data to the streaming data
sink in
+ each microbatch. It accepts an iterator of input data and returns a
single row
+ representing a commit message, or None if there is no commit message.
+
+ The driver collects commit messages, if any, from all executors and
passes them
+ to the ``commit`` method if all tasks run successfully. If any task
fails, the
+ ``abort`` method will be called with the collected commit messages.
+
+ Parameters
+ ----------
+ iterator : Iterator[Row]
+ An iterator of input data.
+
+ Returns
+ -------
+ WriterCommitMessage : a serializable commit message
+ """
+ ...
+
+ def commit(self, messages: List["WriterCommitMessage"], batchId: int) ->
None:
+ """
+ Commits this microbatch with a list of commit messages.
+
+ This method is invoked on the driver when all tasks run successfully.
The
+ commit messages are collected from the ``write`` method call from each
task,
+ and are passed to this method. The implementation should use the
commit messages
+ to commit the microbatch in the streaming sink.
+
+ Parameters
+ ----------
+ messages : List[WriterCommitMessage]
+ A list of commit messages.
+ """
Review Comment:
how about `batchId`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream,
DataOutputStream}
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker,
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
+import org.apache.spark.sql.connector.write.WriterCommitMessage
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.StructType
+
+class PythonStreamingSinkCommitRunner(
Review Comment:
Can we add a scaladoc for this class?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingWrite.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.python
+
+import org.apache.spark.JobArtifactSet
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write._
+import
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory,
StreamingWrite}
+import org.apache.spark.sql.types.StructType
+
+class PythonStreamingWrite(
Review Comment:
ditto for scaladoc.
##########
python/pyspark/sql/worker/python_streaming_sink_runner.py:
##########
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+from typing import IO
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+ read_bool,
+ read_int,
+ read_long,
+ write_int,
+ SpecialLengths,
+)
+from pyspark.sql.datasource import DataSource, WriterCommitMessage
+from pyspark.sql.types import (
+ _parse_datatype_json_string,
+ StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+ check_python_version,
+ read_command,
+ pickleSer,
+ send_accumulator_updates,
+ setup_memory_limits,
+ setup_spark_files,
+ utf8_deserializer,
+)
+
+
+def main(infile: IO, outfile: IO) -> None:
+ try:
+ check_python_version(infile)
+ setup_spark_files(infile)
+
+ memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
Review Comment:
nit: you might want to use a different config for streaming workers.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -230,4 +273,220 @@ class PythonStreamingDataSourceSuite extends
PythonDataSourceSuiteBase {
stream => stream.commit(offset)
}
}
+
+ Seq("append", "complete").foreach { mode =>
+ test(s"data source stream write - $mode mode") {
+ assume(shouldTestPandasUDFs)
+ val dataSource =
+ createUserDefinedPythonDataSource(dataSourceName,
simpleDataStreamWriterScript)
+ spark.dataSource.registerPython(dataSourceName, dataSource)
+ val inputData = MemoryStream[Int]
+ withTempDir { dir =>
+ val path = dir.getAbsolutePath
+ val checkpointDir = new File(path, "checkpoint")
+ checkpointDir.mkdir()
+ val outputDir = new File(path, "output")
+ outputDir.mkdir()
+ val streamDF = if (mode == "append") {
+ inputData.toDF()
+ } else {
+ // Complete mode only supports stateful aggregation
Review Comment:
Forgive my limited knowledge of streaming, but what does 'complete' mode
mean here? Is it the same as setting 'overwrite=true'?
##########
python/pyspark/sql/datasource.py:
##########
@@ -513,6 +536,71 @@ def abort(self, messages: List["WriterCommitMessage"]) ->
None:
...
+class DataSourceStreamWriter(ABC):
+ """
+ A base class for data stream writers. Data stream writers are responsible
for writing
+ the data to the streaming sink.
+
+ .. versionadded: 4.0.0
+ """
+
+ @abstractmethod
+ def write(self, iterator: Iterator[Row]) -> "WriterCommitMessage":
+ """
+ Writes data into the streaming sink.
+
+ This method is called on executors to write data to the streaming data
sink in
+ each microbatch. It accepts an iterator of input data and returns a
single row
+ representing a commit message, or None if there is no commit message.
+
+ The driver collects commit messages, if any, from all executors and
passes them
+ to the ``commit`` method if all tasks run successfully. If any task
fails, the
+ ``abort`` method will be called with the collected commit messages.
+
+ Parameters
+ ----------
+ iterator : Iterator[Row]
+ An iterator of input data.
+
+ Returns
+ -------
+ WriterCommitMessage : a serializable commit message
+ """
+ ...
+
+ def commit(self, messages: List["WriterCommitMessage"], batchId: int) ->
None:
+ """
+ Commits this microbatch with a list of commit messages.
+
+ This method is invoked on the driver when all tasks run successfully.
The
+ commit messages are collected from the ``write`` method call from each
task,
+ and are passed to this method. The implementation should use the
commit messages
+ to commit the microbatch in the streaming sink.
+
+ Parameters
+ ----------
+ messages : List[WriterCommitMessage]
+ A list of commit messages.
+ """
+ ...
+
+ def abort(self, messages: List["WriterCommitMessage"], batchId: int) ->
None:
+ """
+ Aborts this microbatch due to task failures.
+
+ This method is invoked on the driver when one or more tasks failed.
The commit
+ messages are collected from the ``write`` method call from each task,
and are
+ passed to this method. The implementation should use the commit
messages to
+ abort the microbatch in the streaming sink.
+
+ Parameters
+ ----------
+ messages : List[WriterCommitMessage]
+ A list of commit messages.
Review Comment:
ditto for `batchId`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]