This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new ef8773a Revert "[SPARK-38563][PYTHON] Upgrade to Py4J 0.10.9.4" ef8773a is described below commit ef8773ad5e90738a00408de87d2fe8566dc4acdc Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu Mar 17 17:58:40 2022 -0700 Revert "[SPARK-38563][PYTHON] Upgrade to Py4J 0.10.9.4" ### What changes were proposed in this pull request? This reverts commit 3bbf346d9ca984faa0c3e67cd1387a13b2bd1e37 from branch-3.3 to recover Apache Spark 3.3 on Python 3.10. ### Why are the changes needed? Py4J 0.10.9.4 has a regression which doesn't support Python 3.10. ### Does this PR introduce _any_ user-facing change? No. This is not released yet. ### How was this patch tested? Python UT with Python 3.10. Closes #35904 from dongjoon-hyun/SPARK-38563-3.3. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- bin/pyspark | 2 +- bin/pyspark2.cmd | 2 +- core/pom.xml | 2 +- .../org/apache/spark/api/python/PythonUtils.scala | 2 +- dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +- docs/job-scheduling.md | 2 +- python/docs/Makefile | 2 +- python/docs/make2.bat | 2 +- python/docs/source/getting_started/install.rst | 2 +- python/lib/py4j-0.10.9.3-src.zip | Bin 0 -> 42021 bytes python/lib/py4j-0.10.9.4-src.zip | Bin 42404 -> 0 bytes python/pyspark/context.py | 6 ++-- python/pyspark/util.py | 35 ++++++++++++++++++--- python/setup.py | 2 +- sbin/spark-config.sh | 2 +- 16 files changed, 45 insertions(+), 20 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index 1e16c56..4840589 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -50,7 +50,7 @@ export PYSPARK_DRIVER_PYTHON_OPTS # Add the PySpark classes to the Python path: export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH" -export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:$PYTHONPATH" +export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:$PYTHONPATH" # Load the PySpark shell.py script when ./pyspark is used interactively: export OLD_PYTHONSTARTUP="$PYTHONSTARTUP" diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index f20c320..a19627a 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" ( ) set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH% -set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.4-src.zip;%PYTHONPATH% +set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.3-src.zip;%PYTHONPATH% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py diff --git a/core/pom.xml b/core/pom.xml index 953c76b..9d3b170 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -423,7 +423,7 @@ <dependency> <groupId>net.sf.py4j</groupId> <artifactId>py4j</artifactId> - <version>0.10.9.4</version> + <version>0.10.9.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index a9c35369..8daba86 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} private[spark] object PythonUtils { - val PY4J_ZIP_NAME = "py4j-0.10.9.4-src.zip" + val PY4J_ZIP_NAME = "py4j-0.10.9.3-src.zip" /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */ def sparkPythonPath: String = { diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index f2db663..bcbf8b9 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -233,7 +233,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar pickle/1.2//pickle-1.2.jar protobuf-java/2.5.0//protobuf-java-2.5.0.jar -py4j/0.10.9.4//py4j-0.10.9.4.jar +py4j/0.10.9.3//py4j-0.10.9.3.jar remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index c56b4c9..8ca7880 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -221,7 +221,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar pickle/1.2//pickle-1.2.jar protobuf-java/2.5.0//protobuf-java-2.5.0.jar -py4j/0.10.9.4//py4j-0.10.9.4.jar +py4j/0.10.9.3//py4j-0.10.9.3.jar remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index f44ed82..4ed2aa9 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -304,5 +304,5 @@ via `sc.setJobGroup` in a separate PVM thread, which also disallows to cancel th later. `pyspark.InheritableThread` is recommended to use together for a PVM thread to inherit the inheritable attributes - such as local properties in a JVM thread. + such as local properties in a JVM thread, and to avoid resource leak. diff --git a/python/docs/Makefile b/python/docs/Makefile index 2628530..9cb1a17 100644 --- a/python/docs/Makefile +++ b/python/docs/Makefile @@ -21,7 +21,7 @@ SPHINXBUILD ?= sphinx-build SOURCEDIR ?= source BUILDDIR ?= build -export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.4-src.zip) +export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.3-src.zip) # Put it first so that "make" without argument is like "make help". help: diff --git a/python/docs/make2.bat b/python/docs/make2.bat index 26ef220..2e4e2b5 100644 --- a/python/docs/make2.bat +++ b/python/docs/make2.bat @@ -25,7 +25,7 @@ if "%SPHINXBUILD%" == "" ( set SOURCEDIR=source set BUILDDIR=build -set PYTHONPATH=..;..\lib\py4j-0.10.9.4-src.zip +set PYTHONPATH=..;..\lib\py4j-0.10.9.3-src.zip if "%1" == "" goto help diff --git a/python/docs/source/getting_started/install.rst b/python/docs/source/getting_started/install.rst index 3503be0..15a1240 100644 --- a/python/docs/source/getting_started/install.rst +++ b/python/docs/source/getting_started/install.rst @@ -157,7 +157,7 @@ Package Minimum supported version Note `pandas` 1.0.5 Optional for Spark SQL `NumPy` 1.7 Required for MLlib DataFrame-based API `pyarrow` 1.0.0 Optional for Spark SQL -`Py4J` 0.10.9.4 Required +`Py4J` 0.10.9.3 Required `pandas` 1.0.5 Required for pandas API on Spark `pyarrow` 1.0.0 Required for pandas API on Spark `Numpy` 1.14 Required for pandas API on Spark diff --git a/python/lib/py4j-0.10.9.3-src.zip b/python/lib/py4j-0.10.9.3-src.zip new file mode 100644 index 0000000..428f3ac Binary files /dev/null and b/python/lib/py4j-0.10.9.3-src.zip differ diff --git a/python/lib/py4j-0.10.9.4-src.zip b/python/lib/py4j-0.10.9.4-src.zip deleted file mode 100644 index 51b3404..0000000 Binary files a/python/lib/py4j-0.10.9.4-src.zip and /dev/null differ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 59b5fa7..e47f162 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1365,7 +1365,7 @@ class SparkContext: to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead. If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread - local inheritance. + local inheritance, and preventing resource leak. Examples -------- @@ -1405,7 +1405,7 @@ class SparkContext: Notes ----- If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread - local inheritance. + local inheritance, and preventing resource leak. """ self._jsc.setLocalProperty(key, value) @@ -1423,7 +1423,7 @@ class SparkContext: Notes ----- If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread - local inheritance. + local inheritance, and preventing resource leak. """ self._jsc.setJobDescription(value) diff --git a/python/pyspark/util.py b/python/pyspark/util.py index b7b972a..5abbbb9 100644 --- a/python/pyspark/util.py +++ b/python/pyspark/util.py @@ -331,10 +331,13 @@ def inheritable_thread_target(f: Callable) -> Callable: @functools.wraps(f) def wrapped(*args: Any, **kwargs: Any) -> Any: - # Set local properties in child thread. - assert SparkContext._active_spark_context is not None - SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties) - return f(*args, **kwargs) + try: + # Set local properties in child thread. + assert SparkContext._active_spark_context is not None + SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties) + return f(*args, **kwargs) + finally: + InheritableThread._clean_py4j_conn_for_current_thread() return wrapped else: @@ -374,7 +377,10 @@ class InheritableThread(threading.Thread): assert hasattr(self, "_props") assert SparkContext._active_spark_context is not None SparkContext._active_spark_context._jsc.sc().setLocalProperties(self._props) - return target(*a, **k) + try: + return target(*a, **k) + finally: + InheritableThread._clean_py4j_conn_for_current_thread() super(InheritableThread, self).__init__( target=copy_local_properties, *args, **kwargs # type: ignore[misc] @@ -395,6 +401,25 @@ class InheritableThread(threading.Thread): self._props = SparkContext._active_spark_context._jsc.sc().getLocalProperties().clone() return super(InheritableThread, self).start() + @staticmethod + def _clean_py4j_conn_for_current_thread() -> None: + from pyspark import SparkContext + + jvm = SparkContext._jvm + assert jvm is not None + thread_connection = jvm._gateway_client.get_thread_connection() + if thread_connection is not None: + try: + # Dequeue is shared across other threads but it's thread-safe. + # If this function has to be invoked one more time in the same thead + # Py4J will create a new connection automatically. + jvm._gateway_client.deque.remove(thread_connection) + except ValueError: + # Should never reach this point + return + finally: + thread_connection.close() + if __name__ == "__main__": if "pypy" not in platform.python_implementation().lower() and sys.version_info[:2] >= (3, 7): diff --git a/python/setup.py b/python/setup.py index ab9b64f..673b146 100755 --- a/python/setup.py +++ b/python/setup.py @@ -258,7 +258,7 @@ try: license='http://www.apache.org/licenses/LICENSE-2.0', # Don't forget to update python/docs/source/getting_started/install.rst # if you're updating the versions or dependencies. - install_requires=['py4j==0.10.9.4'], + install_requires=['py4j==0.10.9.3'], extras_require={ 'ml': ['numpy>=1.15'], 'mllib': ['numpy>=1.15'], diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh index 341eb05..f27b6fe 100755 --- a/sbin/spark-config.sh +++ b/sbin/spark-config.sh @@ -28,6 +28,6 @@ export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}" # Add the PySpark classes to the PYTHONPATH: if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}" - export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:${PYTHONPATH}" + export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:${PYTHONPATH}" export PYSPARK_PYTHONPATH_SET=1 fi --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org