sarutak opened a new pull request, #52575: URL: https://github.com/apache/spark/pull/52575
### What changes were proposed in this pull request? In Spark Connect environment, `QueryExecution#observedMetrics` can be called by two threads concurrently. * Thread1(ObservationManager) ``` private def tryComplete(qe: QueryExecution): Unit = { val allMetrics = qe.observedMetrics qe.logical.foreach { case c: CollectMetrics => allMetrics.get(c.name).foreach { metrics => val observation = observations.remove((c.name, c.dataframeId)) if (observation != null) { observation.setMetricsAndNotify(metrics) } } case _ => } } ``` * Thread2(SparkConnectPlanExecution) ``` private def createObservedMetricsResponse( sessionId: String, observationAndPlanIds: Map[String, Long], dataframe: DataFrame): Option[ExecutePlanResponse] = { val observedMetrics = dataframe.queryExecution.observedMetrics.collect { case (name, row) if !executeHolder.observations.contains(name) => val values = SparkConnectPlanExecution.toObservedMetricsValues(row) name -> values } ```` This can cause race condition issues. We can see CI failure caused by this issue. https://github.com/apache/spark/actions/runs/18422173471/job/52497913985 ``` ====================================================================== ERROR [0.181s]: test_observe_with_map_type (pyspark.sql.tests.connect.test_parity_observation.DataFrameObservationParityTests.test_observe_with_map_type) ---------------------------------------------------------------------- Traceback (most recent call last): File "/__w/spark/spark/python/pyspark/testing/utils.py", line 228, in wrapper lastValue = condition(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/tests/test_observation.py", line 226, in test_observe_with_map_type assertDataFrameEqual(df, [Row(id=id) for id in range(10)]) File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1098, in assertDataFrameEqual actual_list = actual.collect() ^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 1817, in collect table, schema = self._to_table() ^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 1830, in _to_table table, schema, self._execution_info = self._session.client.to_table( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 946, in to_table table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req, observations) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1642, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator( File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1619, in _execute_and_fetch_as_iterator self._handle_error(error) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1893, in _handle_error self._handle_rpc_error(error) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1966, in _handle_rpc_error raise convert_exception( pyspark.errors.exceptions.connect.IllegalArgumentException: requirement failed JVM stacktrace: java.lang.IllegalArgumentException at scala.Predef$.require(Predef.scala:324) at org.apache.spark.sql.catalyst.util.ArrayBasedMapData.<init>(ArrayBasedMapData.scala:31) at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.build(ArrayBasedMapBuilder.scala:130) at org.apache.spark.sql.catalyst.expressions.CreateMap.eval(complexTypeCreator.scala:260) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:162) at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:84) at org.apache.spark.sql.execution.AggregatingAccumulator.$anonfun$value$2(AggregatingAccumulator.scala:199) at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:162) at org.apache.spark.sql.execution.AggregatingAccumulator.withSQLConf(AggregatingAccumulator.scala:106) at org.apache.spark.sql.execution.AggregatingAccumulator.value(AggregatingAccumulator.scala:188) at org.apache.spark.sql.execution.CollectMetricsExec.collectedMetrics(CollectMetricsExec.scala:59) at org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:111) at org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:109) at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338) at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1(AdaptiveSparkPlanHelper.scala:86) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1$adapted(AdaptiveSparkPlanHelper.scala:86) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach(AdaptiveSparkPlanHelper.scala:45) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach$(AdaptiveSparkPlanHelper.scala:44) at org.apache.spark.sql.execution.CollectMetricsExec$.foreach(CollectMetricsExec.scala:101) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect(AdaptiveSparkPlanHelper.scala:86) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect$(AdaptiveSparkPlanHelper.scala:83) at org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:101) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collectWithSubqueries$1(AdaptiveSparkPlanHelper.scala:113) at scala.collection.immutable.List.flatMap(List.scala:294) at scala.collection.immutable.List.flatMap(List.scala:79) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries(AdaptiveSparkPlanHelper.scala:113) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries$(AdaptiveSparkPlanHelper.scala:112) at org.apache.spark.sql.execution.CollectMetricsExec$.collectWithSubqueries(CollectMetricsExec.scala:101) at org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:109) at org.apache.spark.sql.execution.QueryExecution.observedMetrics(QueryExecution.scala:276) at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.createObservedMetricsResponse(SparkConnectPlanExecution.scala:322) at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:82) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:394) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:394) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:113) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:184) at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:103) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:112) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:393) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:333) ``` This test failure can be reproduced by inserting sleep into `ArrayBasedMapBuilder.scala` like as follows. ``` private def reset(): Unit = { keyToIndex.clear() keys.clear() + Thread.sleep(10) values.clear() } ``` And then, run the test as follows. ``` $ python/run-tests --modules=pyspark-connect --parallelism=1 --testnames pyspark.sql.tests.connect.test_parity_observation --python-executables=python3.11 ``` To fix this issue, this PR proposes to change `QueryExecution#observedMetrics` within a synchronized block. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Ran the problematic test with inserting sleep like as mentioned above, and confirmed the test passed. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org