HeartSaVioR commented on code in PR #46651:
URL: https://github.com/apache/spark/pull/46651#discussion_r1606169725


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -763,6 +799,36 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
     }
   }
 
+  // Verify that commit runner work correctly with large timeout interval.
+  test(s"data source stream write, trigger interval=20 seconds") {
+    assume(shouldTestPandasUDFs)
+    val dataSource =
+      createUserDefinedPythonDataSource(dataSourceName, 
simpleDataStreamWriterScript)
+    spark.dataSource.registerPython(dataSourceName, dataSource)
+    val inputData = MemoryStream[Int](numPartitions = 3)
+    val df = inputData.toDF()
+    withTempDir { dir =>
+      val path = dir.getAbsolutePath
+      val checkpointDir = new File(path, "checkpoint")
+      checkpointDir.mkdir()
+      val outputDir = new File(path, "output")
+      outputDir.mkdir()
+      val q = df
+        .writeStream
+        .format(dataSourceName)
+        .option("checkpointLocation", checkpointDir.getAbsolutePath)
+        .trigger(ProcessingTimeTrigger(20 * 1000))
+        .start(outputDir.getAbsolutePath)
+      eventually(timeout(waitTimeout * 5)) {
+        inputData.addData(1 to 3)
+        assert(q.lastProgress.batchId >= 2)

Review Comment:
   Since we have control over the input data once you apply my suggestion, why 
not check the output as well?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##########
@@ -763,6 +799,36 @@ class PythonStreamingDataSourceSuite extends 
PythonDataSourceSuiteBase {
     }
   }
 
+  // Verify that commit runner work correctly with large timeout interval.
+  test(s"data source stream write, trigger interval=20 seconds") {
+    assume(shouldTestPandasUDFs)
+    val dataSource =
+      createUserDefinedPythonDataSource(dataSourceName, 
simpleDataStreamWriterScript)
+    spark.dataSource.registerPython(dataSourceName, dataSource)
+    val inputData = MemoryStream[Int](numPartitions = 3)
+    val df = inputData.toDF()
+    withTempDir { dir =>
+      val path = dir.getAbsolutePath
+      val checkpointDir = new File(path, "checkpoint")
+      checkpointDir.mkdir()
+      val outputDir = new File(path, "output")
+      outputDir.mkdir()
+      val q = df
+        .writeStream
+        .format(dataSourceName)
+        .option("checkpointLocation", checkpointDir.getAbsolutePath)
+        .trigger(ProcessingTimeTrigger(20 * 1000))
+        .start(outputDir.getAbsolutePath)
+      eventually(timeout(waitTimeout * 5)) {
+        inputData.addData(1 to 3)

Review Comment:
   Do we intentionally add 1 to 3 into source every 15 ms (default)? You can 
just call addData a few time before eventually as MemoryStream would produce 
the data for single call as a single microbatch. (2 times = 2 batches)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingWrite.scala:
##########
@@ -60,11 +57,23 @@ class PythonStreamingWrite(
   }
 
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
-    pythonStreamingSinkCommitRunner.commitOrAbort(messages, epochId, false)
+    new PythonStreamingSinkCommitRunner(
+      createDataSourceFunc,
+      info.schema(),
+      messages,
+      batchId = epochId,
+      overwrite = isTruncate,
+      abort = false).runInPython()
   }
 
   override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
-    pythonStreamingSinkCommitRunner.commitOrAbort(messages, epochId, true)
+    new PythonStreamingSinkCommitRunner(

Review Comment:
   same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingWrite.scala:
##########
@@ -60,11 +57,23 @@ class PythonStreamingWrite(
   }
 
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
-    pythonStreamingSinkCommitRunner.commitOrAbort(messages, epochId, false)
+    new PythonStreamingSinkCommitRunner(

Review Comment:
   Does this mean we can't re-use the python process, or that's handled in 
different way? It's not a blocker, just wanted to understand. (This wouldn't be 
really good for workload which has shorter batch interval. If this is former, 
we'd need to file a TODO JIRA ticket to make it better.)



##########
python/pyspark/sql/worker/python_streaming_sink_runner.py:
##########
@@ -82,36 +91,36 @@ def main(infile: IO, outfile: IO) -> None:
         overwrite = read_bool(infile)
         # Instantiate data source reader.
         try:
+            # Create the data source writer instance.
             writer = data_source.streamWriter(schema=schema, 
overwrite=overwrite)
-            # Initialization succeed.
+
+            # Receive the commit messages.
+            num_messages = read_int(infile)
+            commit_messages = []
+            for _ in range(num_messages):
+                message = pickleSer._read_with_length(infile)
+                if message is not None and not isinstance(message, 
WriterCommitMessage):
+                    raise PySparkAssertionError(
+                        error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+                        message_parameters={
+                            "expected": "an instance of WriterCommitMessage",
+                            "actual": f"'{type(message).__name__}'",
+                        },
+                    )
+                commit_messages.append(message)
+
+            batch_id = read_long(infile)
+            abort = read_bool(infile)
+
+            # Commit or abort the Python data source write.
+            # Note the commit messages can be None if there are failed tasks.
+            if abort:
+                writer.abort(commit_messages, batch_id)  # type: 
ignore[arg-type]
+            else:
+                writer.commit(commit_messages, batch_id)  # type: 
ignore[arg-type]
+                # Send a status code back to JVM.

Review Comment:
   nit: indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to