This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aff9eab9039 [SPARK-45511][SS] Fix state reader suite flakiness by clean up resources after each test run aff9eab9039 is described below commit aff9eab90392f22c0037abdf50e6894615e4dbf9 Author: Chaoqin Li <chaoqin...@databricks.com> AuthorDate: Fri Nov 17 07:27:28 2023 +0900 [SPARK-45511][SS] Fix state reader suite flakiness by clean up resources after each test run ### What changes were proposed in this pull request? Fix state reader suite flakiness by clean up resources after each test. The reason we have to clean up StateStore per test is due to maintenance task. When we run the streaming query, state store is being initialized in to the executor, and registration is performed against the coordinator in driver. The lifecycle of the state store provider is not strictly tied to the the lifecycle of the streaming query - the executor closes the state store provider when coordinator indicates to the executor that the state store provider is no longer valid, which is not [...] This means maintenance task against the provider can run after test A. We are clearing the temp directory in test A after the test A has completed, which can break the operation being performed against state store provider being used in test A. E.g. directory no longer exists while maintenance task is running. This won't be an issue in practice because we do not expect the checkpoint location to be temporary, but it is indeed an issue for how we setup and cleanup env for tests. ### Why are the changes needed? To deflake the test. Closes #43831 from chaoqin-li1123/fix_state_reader_suite. Authored-by: Chaoqin Li <chaoqin...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../datasources/v2/state/StateDataSourceTestBase.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala index 890a716bbef..f5392cc823f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala @@ -20,6 +20,7 @@ import java.sql.Timestamp import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ @@ -28,6 +29,17 @@ import org.apache.spark.sql.streaming.util.StreamManualClock trait StateDataSourceTestBase extends StreamTest with StateStoreMetricsTest { import testImplicits._ + override def beforeEach(): Unit = { + super.beforeEach() + spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + override def afterEach(): Unit = { + // Stop maintenance tasks because they may access already deleted checkpoint. + StateStore.stop() + super.afterEach() + } + protected def runCompositeKeyStreamingAggregationQuery(checkpointRoot: String): Unit = { val inputData = MemoryStream[Int] val aggregated = getCompositeKeyStreamingAggregationQuery(inputData) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org