allisonwang-db commented on code in PR #46651:
URL: https://github.com/apache/spark/pull/46651#discussion_r1607067727


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingSinkCommitRunner.scala:
##########
@@ -39,78 +35,22 @@ import org.apache.spark.sql.types.StructType
  * from the socket, then commit or abort a microbatch.
  */
 class PythonStreamingSinkCommitRunner(

Review Comment:
   After this change, is the PyhtonStreamingSinkCommitRunner the same as the 
batch one now?



##########
python/pyspark/sql/worker/python_streaming_sink_runner.py:
##########
@@ -34,22 +33,32 @@
     _parse_datatype_json_string,
     StructType,
 )
-from pyspark.util import handle_worker_exception
+from pyspark.util import handle_worker_exception, local_connect_and_auth
 from pyspark.worker_util import (
     check_python_version,
     read_command,
     pickleSer,
     send_accumulator_updates,
+    setup_broadcasts,
     setup_memory_limits,
     setup_spark_files,
     utf8_deserializer,
 )
 
 
 def main(infile: IO, outfile: IO) -> None:
+    """
+    Main method for committing or aborting a data source streaming write 
operation.
+
+    This process is invoked from the 
`PythonStreamingSinkCommitRunner.runInPython`
+    method in the StreamingWrite implementation of the PythonTableProvider. It 
is

Review Comment:
   We don't have a PythonTableProvider. Do you mean PythonTable or 
PythonDataSourceV2?



##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -210,7 +210,8 @@ def main(infile: IO, outfile: IO) -> None:
     # Read information about how to connect back to the JVM from the 
environment.
     java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
     auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
-    (sock_file, _) = local_connect_and_auth(java_port, auth_secret)
+    (sock_file, sock) = local_connect_and_auth(java_port, auth_secret)
+    sock.settimeout(None)

Review Comment:
   nit: can we add a short comment here on why we need to set this timeout?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to