This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 988439d [SPARK-37987][SS] Fix flaky test StreamingAggregationSuite.changing schema of state when restarting query 988439d is described below commit 988439d7287482c465f7da6c8e9c14303488158f Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Mon Jan 24 17:33:24 2022 +0900 [SPARK-37987][SS] Fix flaky test StreamingAggregationSuite.changing schema of state when restarting query ### What changes were proposed in this pull request? This PR fixes a flaky test `StreamingAggregationSuite.changing schema of state when restarting query`, via adjusting the number of shuffle partition to 1. The flakiness was due to the optimization on schema verification - we only verify it in partition 0 since it is costly and redundant to verify the schema for all partitions. Other partitions are still possible to provide other errors which are considered as unexpected. ### Why are the changes needed? This PR fixes a flaky test. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Ran test suite 10 times locally. Closes #35298 from HeartSaVioR/SPARK-37987. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 3b540ad822a53a8cb94159dc8aa3c66d34085e3e) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/streaming/StreamingAggregationSuite.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 77334ad..8a7bb8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -766,7 +766,11 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } testQuietlyWithAllStateVersions("changing schema of state when restarting query", - (SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key, "false")) { + (SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key, "false"), + // Since we only do the check in partition 0 and other partitions still may fail with + // different errors, we change the number of shuffle partitions to 1 to make the test + // result to be deterministic. + (SQLConf.SHUFFLE_PARTITIONS.key, "1")) { withTempDir { tempDir => val (inputData, aggregated) = prepareTestForChangingSchemaOfState(tempDir) @@ -790,7 +794,11 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { testQuietlyWithAllStateVersions("changing schema of state when restarting query -" + " schema check off", (SQLConf.STATE_SCHEMA_CHECK_ENABLED.key, "false"), - (SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key, "false")) { + (SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key, "false"), + // Since we only do the check in partition 0 and other partitions still may fail with + // different errors, we change the number of shuffle partitions to 1 to make the test + // result to be deterministic. + (SQLConf.SHUFFLE_PARTITIONS.key, "1")) { withTempDir { tempDir => val (inputData, aggregated) = prepareTestForChangingSchemaOfState(tempDir) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org