HyukjinKwon commented on code in PR #53188:
URL: https://github.com/apache/spark/pull/53188#discussion_r2554569560


##########
python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py:
##########
@@ -15,13 +15,15 @@
 # limitations under the License.
 #
 import unittest
+import os
 
 from pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state import (
     GroupedApplyInPandasWithStateTestsMixin,
 )
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
[email protected](os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "To 
be reenabled")

Review Comment:
   ```
   ======================================================================
   ERROR [10.122s]: test_apply_in_pandas_with_state_basic 
(pyspark.sql.tests.connect.pandas.test_parity_pandas_grouped_map_with_state.GroupedApplyInPandasWithStateTests.test_apply_in_pandas_with_state_basic)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 117, in test_apply_in_pandas_with_state_basic
       self._test_apply_in_pandas_with_state_basic(func, check_results)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 91, in _test_apply_in_pandas_with_state_basic
       .start()
        ^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 656, in start
       return self._start_internal(
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 625, in _start_internal
       (_, properties, _) = self._session.client.execute_command(cmd)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1148, in execute_command
       data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
                                                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1560, in _execute_and_fetch
       for response in self._execute_and_fetch_as_iterator(
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1537, in _execute_and_fetch_as_iterator
       self._handle_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1811, in _handle_error
       self._handle_rpc_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1882, in _handle_rpc_error
       raise convert_exception(
   pyspark.errors.exceptions.connect.SparkException: Python worker failed to 
connect back.
   
   JVM stacktrace:
   org.apache.spark.SparkException
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:281)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   Caused by: java.net.SocketTimeoutException: Timed out while waiting for the 
Python worker to connect back
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   
   ======================================================================
   ERROR [10.032s]: test_apply_in_pandas_with_state_basic_fewer_data 
(pyspark.sql.tests.connect.pandas.test_parity_pandas_grouped_map_with_state.GroupedApplyInPandasWithStateTests.test_apply_in_pandas_with_state_basic_fewer_data)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 176, in test_apply_in_pandas_with_state_basic_fewer_data
       self._test_apply_in_pandas_with_state_basic(func, check_results)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 91, in _test_apply_in_pandas_with_state_basic
       .start()
        ^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 656, in start
       return self._start_internal(
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 625, in _start_internal
       (_, properties, _) = self._session.client.execute_command(cmd)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1148, in execute_command
       data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
                                                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1560, in _execute_and_fetch
       for response in self._execute_and_fetch_as_iterator(
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1537, in _execute_and_fetch_as_iterator
       self._handle_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1811, in _handle_error
       self._handle_rpc_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1882, in _handle_rpc_error
       raise convert_exception(
   pyspark.errors.exceptions.connect.SparkException: Python worker failed to 
connect back.
   
   JVM stacktrace:
   org.apache.spark.SparkException
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:281)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   Caused by: java.net.SocketTimeoutException: Timed out while waiting for the 
Python worker to connect back
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   
   ======================================================================
   ERROR [10.032s]: test_apply_in_pandas_with_state_basic_more_data 
(pyspark.sql.tests.connect.pandas.test_parity_pandas_grouped_map_with_state.GroupedApplyInPandasWithStateTests.test_apply_in_pandas_with_state_basic_more_data)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 164, in test_apply_in_pandas_with_state_basic_more_data
       self._test_apply_in_pandas_with_state_basic(func, check_results)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 91, in _test_apply_in_pandas_with_state_basic
       .start()
        ^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 656, in start
       return self._start_internal(
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 625, in _start_internal
       (_, properties, _) = self._session.client.execute_command(cmd)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1148, in execute_command
       data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
                                                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1560, in _execute_and_fetch
       for response in self._execute_and_fetch_as_iterator(
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1537, in _execute_and_fetch_as_iterator
       self._handle_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1811, in _handle_error
       self._handle_rpc_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1882, in _handle_rpc_error
       raise convert_exception(
   pyspark.errors.exceptions.connect.SparkException: Python worker failed to 
connect back.
   
   JVM stacktrace:
   org.apache.spark.SparkException
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:281)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   Caused by: java.net.SocketTimeoutException: Timed out while waiting for the 
Python worker to connect back
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   
   ======================================================================
   ERROR [10.035s]: test_apply_in_pandas_with_state_basic_no_state 
(pyspark.sql.tests.connect.pandas.test_parity_pandas_grouped_map_with_state.GroupedApplyInPandasWithStateTests.test_apply_in_pandas_with_state_basic_no_state)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 132, in test_apply_in_pandas_with_state_basic_no_state
       self._test_apply_in_pandas_with_state_basic(func, check_results)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 91, in _test_apply_in_pandas_with_state_basic
       .start()
        ^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 656, in start
       return self._start_internal(
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 625, in _start_internal
       (_, properties, _) = self._session.client.execute_command(cmd)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1148, in execute_command
       data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
                                                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1560, in _execute_and_fetch
       for response in self._execute_and_fetch_as_iterator(
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1537, in _execute_and_fetch_as_iterator
       self._handle_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1811, in _handle_error
       self._handle_rpc_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1882, in _handle_rpc_error
       raise convert_exception(
   pyspark.errors.exceptions.connect.SparkException: Python worker failed to 
connect back.
   
   JVM stacktrace:
   org.apache.spark.SparkException
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:281)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   Caused by: java.net.SocketTimeoutException: Timed out while waiting for the 
Python worker to connect back
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   
   ======================================================================
   ERROR [10.032s]: test_apply_in_pandas_with_state_basic_no_state_no_data 
(pyspark.sql.tests.connect.pandas.test_parity_pandas_grouped_map_with_state.GroupedApplyInPandasWithStateTests.test_apply_in_pandas_with_state_basic_no_state_no_data)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 143, in test_apply_in_pandas_with_state_basic_no_state_no_data
       self._test_apply_in_pandas_with_state_basic(func, check_results)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 91, in _test_apply_in_pandas_with_state_basic
       .start()
        ^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 656, in start
       return self._start_internal(
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 625, in _start_internal
       (_, properties, _) = self._session.client.execute_command(cmd)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1148, in execute_command
       data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
                                                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1560, in _execute_and_fetch
       for response in self._execute_and_fetch_as_iterator(
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1537, in _execute_and_fetch_as_iterator
       self._handle_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1811, in _handle_error
       self._handle_rpc_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1882, in _handle_rpc_error
       raise convert_exception(
   pyspark.errors.exceptions.connect.SparkException: Python worker failed to 
connect back.
   
   JVM stacktrace:
   org.apache.spark.SparkException
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:281)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   Caused by: java.net.SocketTimeoutException: Timed out while waiting for the 
Python worker to connect back
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   
   ======================================================================
   ERROR [10.030s]: test_apply_in_pandas_with_state_basic_with_null 
(pyspark.sql.tests.connect.pandas.test_parity_pandas_grouped_map_with_state.GroupedApplyInPandasWithStateTests.test_apply_in_pandas_with_state_basic_with_null)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 193, in test_apply_in_pandas_with_state_basic_with_null
       self._test_apply_in_pandas_with_state_basic(func, check_results)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/tests/pandas/test_pandas_grouped_map_with_state.py",
 line 91, in _test_apply_in_pandas_with_state_basic
       .start()
        ^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 656, in start
       return self._start_internal(
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/streaming/readwriter.py",
 line 625, in _start_internal
       (_, properties, _) = self._session.client.execute_command(cmd)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1148, in execute_command
       data, _, metrics, observed_metrics, properties = self._execute_and_fetch(
                                                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1560, in _execute_and_fetch
       for response in self._execute_and_fetch_as_iterator(
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1537, in _execute_and_fetch_as_iterator
       self._handle_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1811, in _handle_error
       self._handle_rpc_error(error)
     File 
"/home/runner/work/spark/spark-4.0/python/pyspark/sql/connect/client/core.py", 
line 1882, in _handle_rpc_error
       raise convert_exception(
   pyspark.errors.exceptions.connect.SparkException: Python worker failed to 
connect back.
   
   JVM stacktrace:
   org.apache.spark.SparkException
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:281)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   Caused by: java.net.SocketTimeoutException: Timed out while waiting for the 
Python worker to connect back
        at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:263)
        at 
org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:79)
        at 
org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:154)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3497)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2844)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:95)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)
   
   ----------------------------------------------------------------------
   Ran 9 tests in 77.762s
   
   FAILED (errors=6)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to