HeartSaVioR commented on code in PR #45305:
URL: https://github.com/apache/spark/pull/45305#discussion_r1538869646
##########
python/pyspark/sql/datasource.py:
##########
@@ -160,6 +160,29 @@ def writer(self, schema: StructType, overwrite: bool) ->
"DataSourceWriter":
message_parameters={"feature": "writer"},
)
+ def streamWriter(self, schema: StructType, overwrite: bool) ->
"DataSourceStreamWriter":
Review Comment:
What's the purpose of the flag for overwrite? Will this be matched with
complete mode? (truncate?)
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -230,4 +273,220 @@ class PythonStreamingDataSourceSuite extends
PythonDataSourceSuiteBase {
stream => stream.commit(offset)
}
}
+
+ Seq("append", "complete").foreach { mode =>
+ test(s"data source stream write - $mode mode") {
+ assume(shouldTestPandasUDFs)
+ val dataSource =
+ createUserDefinedPythonDataSource(dataSourceName,
simpleDataStreamWriterScript)
+ spark.dataSource.registerPython(dataSourceName, dataSource)
+ val inputData = MemoryStream[Int]
+ withTempDir { dir =>
+ val path = dir.getAbsolutePath
+ val checkpointDir = new File(path, "checkpoint")
+ checkpointDir.mkdir()
+ val outputDir = new File(path, "output")
+ outputDir.mkdir()
+ val streamDF = if (mode == "append") {
+ inputData.toDF()
+ } else {
+ // Complete mode only supports stateful aggregation
Review Comment:
complete mode requires sink to support truncate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]