HeartSaVioR commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1538869646


##########
python/pyspark/sql/datasource.py:
##########
@@ -160,6 +160,29 @@ def writer(self, schema: StructType, overwrite: bool) -> 
"DataSourceWriter":
             message_parameters={"feature": "writer"},
         )
 
+    def streamWriter(self, schema: StructType, overwrite: bool) -> 
"DataSourceStreamWriter":

Review Comment:
   What's the purpose of the flag for overwrite? Will this be matched with 
complete mode? (truncate?)



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -230,4 +273,220 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
       stream => stream.commit(offset)
     }
   }
+
+  Seq("append", "complete").foreach { mode =>
+    test(s"data source stream write - $mode mode") {
+      assume(shouldTestPandasUDFs)
+      val dataSource =
+        createUserDefinedPythonDataSource(dataSourceName, 
simpleDataStreamWriterScript)
+      spark.dataSource.registerPython(dataSourceName, dataSource)
+      val inputData = MemoryStream[Int]
+      withTempDir { dir =>
+        val path = dir.getAbsolutePath
+        val checkpointDir = new File(path, "checkpoint")
+        checkpointDir.mkdir()
+        val outputDir = new File(path, "output")
+        outputDir.mkdir()
+        val streamDF = if (mode == "append") {
+          inputData.toDF()
+        } else {
+          // Complete mode only supports stateful aggregation

Review Comment:
   complete mode requires sink to support truncate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to