This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3d7a5fdf1dd [SPARK-41996][SQL][SS] Fix kafka test to verify lost partitions to account for slow Kafka operations 3d7a5fdf1dd is described below commit 3d7a5fdf1ddf1e9748b568d66ab366f3c0ff5e55 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Thu Jan 12 12:49:57 2023 +0900 [SPARK-41996][SQL][SS] Fix kafka test to verify lost partitions to account for slow Kafka operations ### What changes were proposed in this pull request? Fix kafka test to verify lost partitions to account for slow Kafka operations Basically its possible that kafka operations around topic deletion, partition creation etc can exceed the streaming query timeout thereby failing the query and test incorrectly. This change updates the exit timeout. ### Why are the changes needed? Change is required to avoid test flakiness in the event of Kafka operations becoming slower ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test only change Reran the tests multiple times: ``` [info] Assembly jar up to date: /Users/anish.shrigondekar/spark/spark/connector/protobuf/target/scala-2.12/spark-protobuf-assembly-3.4.0-SNAPSHOT.jar [info] - Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches (6 seconds, 440 milliseconds) [info] - Query with Trigger.AvailableNow should throw error when offset(s) in planned topic partitions got unavailable during subsequent batches (6 seconds, 331 milliseconds) [info] Run completed in 17 seconds, 269 milliseconds. [info] Total number of tests run: 2 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 52 s, completed Jan 11, 2023, 6:05:24 ``` Closes #39520 from anishshri-db/SPARK-41996. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 18d63a9a4ef..d63b9805e55 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -358,10 +358,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with .start() } + // SPARK-41996 - Increase query termination timeout to ensure that + // Kafka operations can be completed + val queryTimeout = 300.seconds val exc = intercept[Exception] { val query = startTriggerAvailableNowQuery() try { - assert(query.awaitTermination(streamingTimeout.toMillis)) + assert(query.awaitTermination(queryTimeout.toMillis)) } finally { query.stop() } @@ -409,10 +412,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with .start() } + // SPARK-41996 - Increase query termination timeout to ensure that + // Kafka operations can be completed + val queryTimeout = 300.seconds val exc = intercept[StreamingQueryException] { val query = startTriggerAvailableNowQuery() try { - assert(query.awaitTermination(streamingTimeout.toMillis)) + assert(query.awaitTermination(queryTimeout.toMillis)) } finally { query.stop() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org