sarutak opened a new pull request, #52575:
URL: https://github.com/apache/spark/pull/52575

   ### What changes were proposed in this pull request?
   In Spark Connect environment, `QueryExecution#observedMetrics` can be called 
by two threads concurrently.
   
   * Thread1(ObservationManager)
   ```
   private def tryComplete(qe: QueryExecution): Unit = {
     val allMetrics = qe.observedMetrics
     qe.logical.foreach {
       case c: CollectMetrics =>
         allMetrics.get(c.name).foreach { metrics =>
           val observation = observations.remove((c.name, c.dataframeId))
           if (observation != null) {
             observation.setMetricsAndNotify(metrics)
           }
         }
       case _ =>
     }
   }
   ```
   
   * Thread2(SparkConnectPlanExecution)
   ```
   private def createObservedMetricsResponse(
       sessionId: String,
       observationAndPlanIds: Map[String, Long],
       dataframe: DataFrame): Option[ExecutePlanResponse] = {
     val observedMetrics = dataframe.queryExecution.observedMetrics.collect {
       case (name, row) if !executeHolder.observations.contains(name) =>
         val values = SparkConnectPlanExecution.toObservedMetricsValues(row)
         name -> values
     } 
   ````
   
   This can cause race condition issues. We can see CI failure caused by this 
issue.
   https://github.com/apache/spark/actions/runs/18422173471/job/52497913985
   
   ```
   ======================================================================
   ERROR [0.181s]: test_observe_with_map_type 
(pyspark.sql.tests.connect.test_parity_observation.DataFrameObservationParityTests.test_observe_with_map_type)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/__w/spark/spark/python/pyspark/testing/utils.py", line 228, in 
wrapper
       lastValue = condition(*args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/tests/test_observation.py", line 
226, in test_observe_with_map_type
       assertDataFrameEqual(df, [Row(id=id) for id in range(10)])
     File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1098, in 
assertDataFrameEqual
       actual_list = actual.collect()
                     ^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 
1817, in collect
       table, schema = self._to_table()
                       ^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 
1830, in _to_table
       table, schema, self._execution_info = self._session.client.to_table(
                                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
946, in to_table
       table, schema, metrics, observed_metrics, _ = 
self._execute_and_fetch(req, observations)
                                                     
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1642, in _execute_and_fetch
       for response in self._execute_and_fetch_as_iterator(
     File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1619, in _execute_and_fetch_as_iterator
       self._handle_error(error)
     File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1893, in _handle_error
       self._handle_rpc_error(error)
     File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1966, in _handle_rpc_error
       raise convert_exception(
   pyspark.errors.exceptions.connect.IllegalArgumentException: requirement 
failed
   
   JVM stacktrace:
   java.lang.IllegalArgumentException
        at scala.Predef$.require(Predef.scala:324)
        at 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData.<init>(ArrayBasedMapData.scala:31)
        at 
org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.build(ArrayBasedMapBuilder.scala:130)
        at 
org.apache.spark.sql.catalyst.expressions.CreateMap.eval(complexTypeCreator.scala:260)
        at 
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:162)
        at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:84)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.$anonfun$value$2(AggregatingAccumulator.scala:199)
        at 
org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:162)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.withSQLConf(AggregatingAccumulator.scala:106)
        at 
org.apache.spark.sql.execution.AggregatingAccumulator.value(AggregatingAccumulator.scala:188)
        at 
org.apache.spark.sql.execution.CollectMetricsExec.collectedMetrics(CollectMetricsExec.scala:59)
        at 
org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:111)
        at 
org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:109)
        at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
        at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1(AdaptiveSparkPlanHelper.scala:86)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1$adapted(AdaptiveSparkPlanHelper.scala:86)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach(AdaptiveSparkPlanHelper.scala:45)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach$(AdaptiveSparkPlanHelper.scala:44)
        at 
org.apache.spark.sql.execution.CollectMetricsExec$.foreach(CollectMetricsExec.scala:101)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect(AdaptiveSparkPlanHelper.scala:86)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect$(AdaptiveSparkPlanHelper.scala:83)
        at 
org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:101)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collectWithSubqueries$1(AdaptiveSparkPlanHelper.scala:113)
        at scala.collection.immutable.List.flatMap(List.scala:294)
        at scala.collection.immutable.List.flatMap(List.scala:79)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries(AdaptiveSparkPlanHelper.scala:113)
        at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries$(AdaptiveSparkPlanHelper.scala:112)
        at 
org.apache.spark.sql.execution.CollectMetricsExec$.collectWithSubqueries(CollectMetricsExec.scala:101)
        at 
org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:109)
        at 
org.apache.spark.sql.execution.QueryExecution.observedMetrics(QueryExecution.scala:276)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.createObservedMetricsResponse(SparkConnectPlanExecution.scala:322)
        at 
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:82)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:394)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:394)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:113)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:184)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:103)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:112)
        at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:393)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
        at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:333)
   ```
   
   This test failure can be reproduced by inserting sleep into 
`ArrayBasedMapBuilder.scala` like as follows.
   ```
      private def reset(): Unit = {
        keyToIndex.clear()
        keys.clear()
   +    Thread.sleep(10)
        values.clear()
      }
   ```
   
   And then, run the test as follows.
   ```
   $ python/run-tests --modules=pyspark-connect --parallelism=1 --testnames 
pyspark.sql.tests.connect.test_parity_observation  
--python-executables=python3.11
   ```
   
   To fix this issue, this PR proposes to change 
`QueryExecution#observedMetrics` within a synchronized block.
   
   ### Why are the changes needed?
   Bug fix.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Ran the problematic test with inserting sleep like as mentioned above, and 
confirmed the test passed.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to