sarutak opened a new pull request, #52766:
URL: https://github.com/apache/spark/pull/52766

   ### What changes were proposed in this pull request?
   This PR proposes to increase `kafka.metadata.max.age.ms` from `1` to `500` 
to make a test `Query with Trigger.AvailableNow should throw error when topic 
partitions got unavailable during subsequent batches` in 
`KafkaMicroBatchV1SourceWithConsumerSuite` stable.
   
   Recently, this test frequently fails.
   
   https://github.com/apache/spark/actions/runs/18872699886/job/53854858890
   https://github.com/apache/spark/actions/runs/18551681640/job/52880215577
   
   ```
   [info] - Query with Trigger.AvailableNow should throw error when topic 
partitions got unavailable during subsequent batches *** FAILED *** (1 minute)
   [info]   java.lang.AssertionError: assertion failed: Exception tree doesn't 
contain the expected exception with message: Some of partitions in Kafka 
topic(s) have been lost during running query with Trigger.AvailableNow.
   [info] org.scalatest.exceptions.TestFailedException: isPropagated was false 
Partition [topic-41, 1] metadata not propagated after timeout
   [info]       at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   [info]       at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   [info]       at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
   [info]       at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$waitUntilMetadataIsPropagated$1(KafkaTestUtils.scala:614)
   [info]       at 
org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:184)
   [info]       at 
org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:196)
   [info]       at 
org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
   [info]       at 
org.scalatest.concurrent.Eventually.eventually(Eventually.scala:348)
   [info]       at 
org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:347)
   [info]       at 
org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:457)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:613)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:378)
   [info]       at 
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:256)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:377)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:352)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:349)
   [info]       at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56)
   [info]       at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$19(MicroBatchExecution.scala:1063)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:176)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:284)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:138)
   [info]       at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
   [info]       at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
   [info]       at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
   [info]       at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:138)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:307)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:137)
   [info]       at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:91)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:249)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1054)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1054)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:513)
   [info]       at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:478)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:458)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:458)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:458)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347)
   [info]       at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   [info]       at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307)
   [info]       at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230)
   [info]   at scala.Predef$.assert(Predef.scala:279)
   [info]   at 
org.apache.spark.TestUtils$.assertExceptionMsg(TestUtils.scala:198)
   [info]   at 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$9(KafkaMicroBatchSourceSuite.scala:374)
   [info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   [info]   at 
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
   [info]   at 
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
   [info]   at 
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
   [info]   at 
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
   [info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68)
   [info]   at 
org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at 
org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68)
   [info]   at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
   [info]   at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
   [info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:323)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at 
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at 
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
   [info]   at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
   [info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
   [info]   at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
   [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
   [info]   at java.base/java.lang.Thread.run(Thread.java:840) 
   ```
   
   I found the cause is 
[getPartitionInfo](https://github.com/apache/spark/blob/4622e8584e37289b3e1c03d4694e6431cc10895d/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala#L604)
 which is invoked through 
[createTopic](https://github.com/apache/spark/blob/4622e8584e37289b3e1c03d4694e6431cc10895d/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L352)
 can consistently returns `None`. This means that the metadata is unexpectedly 
expired.
   
   I tried to increased `kafka.metadata.max.age.ms` to `10`, `100` and `500` 
and continue to run this test. As a result, I found this test never fails in 20 
min with `500` so I chose this value.
   
   ### Why are the changes needed?
   For test stability.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   As I mentioned above, this test never fails with this change on my 
environment even if I continued to run this test in 20 min.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to