This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 1d43683c5f0 [SPARK-44637][CONNECT] Synchronize accesses to ExecuteResponseObserver 1d43683c5f0 is described below commit 1d43683c5f0ad1aed25cfd9d4361fed866b3d1af Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Wed Aug 2 15:07:12 2023 -0400 [SPARK-44637][CONNECT] Synchronize accesses to ExecuteResponseObserver ### What changes were proposed in this pull request? Function `removeResponsesUntilId` is used by `ReleaseExecute` RPC, and that needs to be synchronized against `removeCachedResponses` running from `consumeResponse` for `ExecutePlan` or `ReattachExecute` RPC. In general, all public accesses to ExecuteResponseObserver should best be synchronized. ### Why are the changes needed? Fix synchronization bug caught by testing of python client. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Caught in https://github.com/apache/spark/pull/42235, but want to fix separately because this is a server side change. Closes #42299 from juliuszsompolski/SPARK-44637. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 26c7e55f19993ef265b8730503c1ffa4ee697347) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../apache/spark/sql/connect/execution/ExecuteResponseObserver.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 5966e6cf0fc..8af0f72b8da 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -179,7 +179,7 @@ private[connect] class ExecuteResponseObserver[T <: MessageLite](val executeHold } /** Get the index in the stream for given response id. */ - def getResponseIndexById(responseId: String): Long = { + def getResponseIndexById(responseId: String): Long = synchronized { responseIdToIndex.getOrElse( responseId, throw new SparkSQLException( @@ -188,7 +188,7 @@ private[connect] class ExecuteResponseObserver[T <: MessageLite](val executeHold } /** Remove cached responses up to and including response with given id. */ - def removeResponsesUntilId(responseId: String): Unit = { + def removeResponsesUntilId(responseId: String): Unit = synchronized { val index = getResponseIndexById(responseId) removeResponsesUntilIndex(index) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org