HyukjinKwon commented on code in PR #53188:
URL: https://github.com/apache/spark/pull/53188#discussion_r2554660367
##########
python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py:
##########
@@ -25,6 +26,7 @@
from pyspark.errors.exceptions.connect import
StreamingPythonRunnerInitializationException
[email protected](os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "To
be reenabled")
Review Comment:
```
======================================================================
ERROR [10.037s]: test_accessing_spark_session
(pyspark.sql.tests.connect.streaming.test_parity_foreach_batch.StreamingForeachBatchParityTests.test_accessing_spark_session)
----------------------------------------------------------------------
Traceback (most recent call last):
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py",
line 117, in test_accessing_spark_session
q = df.writeStream.foreachBatch(func).start()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
line 656, in start
return self._start_internal(
^^^^^^^^^^^^^^^^^^^^^
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
line 625, in _start_internal
(_, properties, _) = self._session.client.execute_command(cmd)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py",
line 1148, in execute_command
data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py",
line 1560, in _execute_and_fetch
for response in self._execute_and_fetch_as_iterator(
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py",
line 1537, in _execute_and_fetch_as_iterator
self._handle_error(error)
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py",
line 1811, in _handle_error
self._handle_rpc_error(error)
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py",
line 1882, in _handle_rpc_error
raise convert_exception(
pyspark.errors.exceptions.connect.SparkException: Python worker failed to
connect back.
JVM stacktrace:
org.apache.spark.SparkException
at
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:281)
at
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
at
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
Caused by: java.net.SocketTimeoutException: Timed out while waiting for the
Python worker to connect back
at
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
at
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
at
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py",
line 121, in test_accessing_spark_session
if q:
^
UnboundLocalError: cannot access local variable 'q' where it is not
associated with a value
======================================================================
ERROR [10.027s]: test_accessing_spark_session_through_df
(pyspark.sql.tests.connect.streaming.test_parity_foreach_batch.StreamingForeachBatchParityTests.test_accessing_spark_session_through_df)
----------------------------------------------------------------------
Traceback (most recent call last):
File
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py",
line 132, in test_accessing_spark_session_through_df
q = df.writeStream.foreachBatch(func).start()
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
Caused by: java.net.SocketTimeoutException: Timed out while waiting for the
Python worker to connect back
at
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
at
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
at
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
----------------------------------------------------------------------
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]