This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c11585ac296e [SPARK-47751][PYTHON][CONNECT] Make pyspark.worker_utils compatible with pyspark-connect c11585ac296e is described below commit c11585ac296eb726e6356bfcc7628a2c948e1d2f Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Sun Apr 7 18:11:12 2024 +0900 [SPARK-47751][PYTHON][CONNECT] Make pyspark.worker_utils compatible with pyspark-connect ### What changes were proposed in this pull request? This PR proposes to make `pyspark.worker_utils` compatible with `pyspark-connect`. ### Why are the changes needed? In order for `pyspark-connect` to work without classic PySpark packages and dependencies. Spark Connect does not support `Broadcast` and `Accumulator`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Yes, at https://github.com/apache/spark/pull/45870. Once CI is setup there, it will be tested there properly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45914 from HyukjinKwon/SPARK-47751. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/worker_util.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/python/pyspark/worker_util.py b/python/pyspark/worker_util.py index f3c59c91ea2c..22389decac2f 100644 --- a/python/pyspark/worker_util.py +++ b/python/pyspark/worker_util.py @@ -32,10 +32,8 @@ try: except ImportError: has_resource_module = False -from pyspark.accumulators import _accumulatorRegistry -from pyspark.core.broadcast import Broadcast, _broadcastRegistry +from pyspark.util import is_remote_only from pyspark.errors import PySparkRuntimeError -from pyspark.core.files import SparkFiles from pyspark.util import local_connect_and_auth from pyspark.serializers import ( read_bool, @@ -59,8 +57,11 @@ def add_path(path: str) -> None: def read_command(serializer: FramedSerializer, file: IO) -> Any: + if not is_remote_only(): + from pyspark.core.broadcast import Broadcast + command = serializer._read_with_length(file) - if isinstance(command, Broadcast): + if not is_remote_only() and isinstance(command, Broadcast): command = serializer.loads(command.value) return command @@ -125,8 +126,12 @@ def setup_spark_files(infile: IO) -> None: """ # fetch name of workdir spark_files_dir = utf8_deserializer.loads(infile) - SparkFiles._root_directory = spark_files_dir - SparkFiles._is_running_on_worker = True + + if not is_remote_only(): + from pyspark.core.files import SparkFiles + + SparkFiles._root_directory = spark_files_dir + SparkFiles._is_running_on_worker = True # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH add_path(spark_files_dir) # *.py files that were added will be copied here @@ -142,6 +147,9 @@ def setup_broadcasts(infile: IO) -> None: """ Set up broadcasted variables. """ + if not is_remote_only(): + from pyspark.core.broadcast import Broadcast, _broadcastRegistry + # fetch names and values of broadcast variables needs_broadcast_decryption_server = read_bool(infile) num_broadcast_variables = read_int(infile) @@ -175,6 +183,11 @@ def send_accumulator_updates(outfile: IO) -> None: """ Send the accumulator updates back to JVM. """ - write_int(len(_accumulatorRegistry), outfile) - for aid, accum in _accumulatorRegistry.items(): - pickleSer._write_with_length((aid, accum._value), outfile) + if not is_remote_only(): + from pyspark.accumulators import _accumulatorRegistry + + write_int(len(_accumulatorRegistry), outfile) + for aid, accum in _accumulatorRegistry.items(): + pickleSer._write_with_length((aid, accum._value), outfile) + else: + write_int(0, outfile) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org