This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 767b2b5a8dc [SPARK-43429][CONNECT] Deflake SparkSessionSuite 767b2b5a8dc is described below commit 767b2b5a8dc8d655ab6787845a87556f15456aaa Author: Herman van Hovell <her...@databricks.com> AuthorDate: Wed Aug 9 20:42:20 2023 +0900 [SPARK-43429][CONNECT] Deflake SparkSessionSuite ### What changes were proposed in this pull request? This PR tries to fix flakiness in the `SparkSessionSuite.active session in multiple threads` test. There was a chance that modification could happen before the other thread could check the state. This PR decouples modifcations from checks. ### Why are the changes needed? Flaky tests are no bueno. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? It is a test. Closes #42406 from hvanhovell/SPARK-43429-deflake. Authored-by: Herman van Hovell <her...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 27c5a1f9f0e322fad0da300afdb75eadd8224b15) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../org/apache/spark/sql/SparkSessionSuite.scala | 32 ++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala index f06744399f8..2d7ded2d688 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala @@ -171,42 +171,74 @@ class SparkSessionSuite extends ConnectFunSuite { try { val script1 = execute { phaser => + // Step 0 - check initial state phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) + // Step 1 - new active session in script 2 + phaser.arriveAndAwaitAdvance() + + // Step2 - script 1 is unchanged, script 2 has new active session phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) + + // Step 3 - close session 1, no more default session in both scripts + phaser.arriveAndAwaitAdvance() session1.close() + // Step 4 - no default session, same active session. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(session2)) + + // Step 5 - clear active session in script 1 + phaser.arriveAndAwaitAdvance() SparkSession.clearActiveSession() + // Step 6 - no default/no active session in script 1, script2 unchanged. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.isEmpty) + + // Step 7 - close active session in script2 + phaser.arriveAndAwaitAdvance() } val script2 = execute { phaser => + // Step 0 - check initial state phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) + + // Step 1 - new active session in script 2 + phaser.arriveAndAwaitAdvance() SparkSession.clearActiveSession() val internalSession = SparkSession.builder().remote(connectionString3).getOrCreate() + // Step2 - script 1 is unchanged, script 2 has new active session phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(internalSession)) + // Step 3 - close session 1, no more default session in both scripts + phaser.arriveAndAwaitAdvance() + + // Step 4 - no default session, same active session. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(internalSession)) + // Step 5 - clear active session in script 1 + phaser.arriveAndAwaitAdvance() + + // Step 6 - no default/no active session in script 1, script2 unchanged. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(internalSession)) + + // Step 7 - close active session in script2 + phaser.arriveAndAwaitAdvance() internalSession.close() assert(SparkSession.getActiveSession.isEmpty) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org