This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e359f210c45 [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc 
improvement for StreamingForeachBatchHelper
e359f210c45 is described below

commit e359f210c45abc17f0bcd32c9a86faf678caff75
Author: Raghu Angadi <raghu.ang...@databricks.com>
AuthorDate: Thu Oct 19 14:15:43 2023 +0900

    [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for 
StreamingForeachBatchHelper
    
    ### What changes were proposed in this pull request?
    
    Couple of minor improvements to `StreamingForeachBatchHelper`:
    
      * Make `RunnerCleaner` private and add ScalaDoc.
      * Update contract for `pythonForeachBatchWrapper()` to inform that call 
should eventually should `close()` the `AutoClosable` returned.
    
    In addition, it also fixes a flake in Protobuf unit test.
    
    ### Why are the changes needed?
      - Code readability improvement.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
     - Existing tests.
     - For protobuf suite, verified with seed set to '399'. It fails before 
this PR and passes after.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #43424 from rangadi/feb-scaladoc.
    
    Authored-by: Raghu Angadi <raghu.ang...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../apache/spark/sql/connect/planner/SparkConnectPlanner.scala   | 2 +-
 .../spark/sql/connect/planner/StreamingForeachBatchHelper.scala  | 9 ++++++---
 .../sql/connect/service/SparkConnectSessionHodlerSuite.scala     | 4 +++-
 .../spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala | 2 +-
 4 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index fa964c02a25..299f4f8830a 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2927,7 +2927,7 @@ class SparkConnectPlanner(
     }
 
     // This is filled when a foreach batch runner started for Python.
-    var foreachBatchRunnerCleaner: 
Option[StreamingForeachBatchHelper.RunnerCleaner] = None
+    var foreachBatchRunnerCleaner: Option[AutoCloseable] = None
 
     if (writeOp.hasForeachBatch) {
       val foreachBatchFn = writeOp.getForeachBatch.getFunctionCase match {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
index b8097b23550..ce75ba3eb59 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
@@ -40,7 +40,9 @@ object StreamingForeachBatchHelper extends Logging {
 
   type ForeachBatchFnType = (DataFrame, Long) => Unit
 
-  case class RunnerCleaner(runner: StreamingPythonRunner) extends 
AutoCloseable {
+  // Visible for testing.
+  /** An AutoClosable to clean up resources on query termination. Stops Python 
worker. */
+  private[connect] case class RunnerCleaner(runner: StreamingPythonRunner) 
extends AutoCloseable {
     override def close(): Unit = {
       try runner.stop()
       catch {
@@ -98,11 +100,12 @@ object StreamingForeachBatchHelper extends Logging {
   /**
    * Starts up Python worker and initializes it with Python function. Returns 
a foreachBatch
    * function that sets up the session and Dataframe cache and and interacts 
with the Python
-   * worker to execute user's function.
+   * worker to execute user's function. In addition, it returns an 
AutoClosable. The caller must
+   * ensure it is closed so that worker process and related resources are 
released.
    */
   def pythonForeachBatchWrapper(
       pythonFn: SimplePythonFunction,
-      sessionHolder: SessionHolder): (ForeachBatchFnType, RunnerCleaner) = {
+      sessionHolder: SessionHolder): (ForeachBatchFnType, AutoCloseable) = {
 
     val port = SparkConnectService.localPort
     val connectUrl = s"sc://localhost:$port/;user_id=${sessionHolder.userId}"
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
index a6451de8fc2..910c2a2650c 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.api.python.SimplePythonFunction
 import org.apache.spark.sql.IntegratedUDFTestUtils
 import org.apache.spark.sql.connect.common.InvalidPlanInput
 import org.apache.spark.sql.connect.planner.{PythonStreamingQueryListener, 
StreamingForeachBatchHelper}
+import 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper.RunnerCleaner
 import org.apache.spark.sql.test.SharedSparkSession
 
 class SparkConnectSessionHolderSuite extends SharedSparkSession {
@@ -206,7 +207,8 @@ class SparkConnectSessionHolderSuite extends 
SharedSparkSession {
       sessionHolder.streamingForeachBatchRunnerCleanerCache
         .registerCleanerForQuery(query2, cleaner2)
 
-      val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+      val (runner1, runner2) =
+        (cleaner1.asInstanceOf[RunnerCleaner].runner, 
cleaner2.asInstanceOf[RunnerCleaner].runner)
 
       // assert both python processes are running
       assert(!runner1.isWorkerStopped().get)
diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index d3e63a11a66..6135cb2d592 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -137,7 +137,7 @@ class ProtobufCatalystDataConversionSuite
       while (
         data != null &&
         (data.get(0) == defaultValue ||
-          (dt == BinaryType &&
+          (dt.fields(0).dataType == BinaryType &&
             data.get(0).asInstanceOf[Array[Byte]].isEmpty)))
         data = generator().asInstanceOf[Row]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to