This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d7a5fdf1dd [SPARK-41996][SQL][SS] Fix kafka test to verify lost 
partitions to account for slow Kafka operations
3d7a5fdf1dd is described below

commit 3d7a5fdf1ddf1e9748b568d66ab366f3c0ff5e55
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Thu Jan 12 12:49:57 2023 +0900

    [SPARK-41996][SQL][SS] Fix kafka test to verify lost partitions to account 
for slow Kafka operations
    
    ### What changes were proposed in this pull request?
    Fix kafka test to verify lost partitions to account for slow Kafka 
operations
    
    Basically its possible that kafka operations around topic deletion, 
partition creation etc can exceed the streaming query timeout thereby failing 
the query and test incorrectly. This change updates the exit timeout.
    
    ### Why are the changes needed?
    Change is required to avoid test flakiness in the event of Kafka operations 
becoming slower
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Test only change
    Reran the tests multiple times:
    ```
    [info] Assembly jar up to date: 
/Users/anish.shrigondekar/spark/spark/connector/protobuf/target/scala-2.12/spark-protobuf-assembly-3.4.0-SNAPSHOT.jar
    [info] - Query with Trigger.AvailableNow should throw error when topic 
partitions got unavailable during subsequent batches (6 seconds, 440 
milliseconds)
    [info] - Query with Trigger.AvailableNow should throw error when offset(s) 
in planned topic partitions got unavailable during subsequent batches (6 
seconds, 331 milliseconds)
    [info] Run completed in 17 seconds, 269 milliseconds.
    [info] Total number of tests run: 2
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    [success] Total time: 52 s, completed Jan 11, 2023, 6:05:24
    ```
    
    Closes #39520 from anishshri-db/SPARK-41996.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 18d63a9a4ef..d63b9805e55 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -358,10 +358,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
         .start()
     }
 
+    // SPARK-41996 - Increase query termination timeout to ensure that
+    // Kafka operations can be completed
+    val queryTimeout = 300.seconds
     val exc = intercept[Exception] {
       val query = startTriggerAvailableNowQuery()
       try {
-        assert(query.awaitTermination(streamingTimeout.toMillis))
+        assert(query.awaitTermination(queryTimeout.toMillis))
       } finally {
         query.stop()
       }
@@ -409,10 +412,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
         .start()
     }
 
+    // SPARK-41996 - Increase query termination timeout to ensure that
+    // Kafka operations can be completed
+    val queryTimeout = 300.seconds
     val exc = intercept[StreamingQueryException] {
       val query = startTriggerAvailableNowQuery()
       try {
-        assert(query.awaitTermination(streamingTimeout.toMillis))
+        assert(query.awaitTermination(queryTimeout.toMillis))
       } finally {
         query.stop()
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to