sarutak opened a new pull request, #52766: URL: https://github.com/apache/spark/pull/52766
### What changes were proposed in this pull request? This PR proposes to increase `kafka.metadata.max.age.ms` from `1` to `500` to make a test `Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches` in `KafkaMicroBatchV1SourceWithConsumerSuite` stable. Recently, this test frequently fails. https://github.com/apache/spark/actions/runs/18872699886/job/53854858890 https://github.com/apache/spark/actions/runs/18551681640/job/52880215577 ``` [info] - Query with Trigger.AvailableNow should throw error when topic partitions got unavailable during subsequent batches *** FAILED *** (1 minute) [info] java.lang.AssertionError: assertion failed: Exception tree doesn't contain the expected exception with message: Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow. [info] org.scalatest.exceptions.TestFailedException: isPropagated was false Partition [topic-41, 1] metadata not propagated after timeout [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$waitUntilMetadataIsPropagated$1(KafkaTestUtils.scala:614) [info] at org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184) [info] at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196) [info] at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226) [info] at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348) [info] at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347) [info] at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:457) [info] at org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:613) [info] at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:378) [info] at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:256) [info] at org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:377) [info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:352) [info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:349) [info] at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56) [info] at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$19(MicroBatchExecution.scala:1063) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:176) [info] at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:284) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:138) [info] at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) [info] at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) [info] at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106) [info] at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:138) [info] at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:307) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:137) [info] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) [info] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:91) [info] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:249) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1054) [info] at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1054) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:513) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:478) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:458) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:458) [info] at org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40) [info] at org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38) [info] at org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60) [info] at org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65) [info] at org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:458) [info] at org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) [info] at org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307) [info] at org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230) [info] at scala.Predef$.assert(Predef.scala:279) [info] at org.apache.spark.TestUtils$.assertExceptionMsg(TestUtils.scala:198) [info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$9(KafkaMicroBatchSourceSuite.scala:374) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) [info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) [info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) [info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68) [info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:323) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) [info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564) [info] at org.scalatest.Suite.run(Suite.scala:1114) [info] at org.scalatest.Suite.run$(Suite.scala:1096) [info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68) [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [info] at java.base/java.lang.Thread.run(Thread.java:840) ``` I found the cause is [getPartitionInfo](https://github.com/apache/spark/blob/4622e8584e37289b3e1c03d4694e6431cc10895d/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala#L604) which is invoked through [createTopic](https://github.com/apache/spark/blob/4622e8584e37289b3e1c03d4694e6431cc10895d/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L352) can consistently returns `None`. This means that the metadata is unexpectedly expired. I tried to increased `kafka.metadata.max.age.ms` to `10`, `100` and `500` and continue to run this test. As a result, I found this test never fails in 20 min with `500` so I chose this value. ### Why are the changes needed? For test stability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? As I mentioned above, this test never fails with this change on my environment even if I continued to run this test in 20 min. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
