[ 
https://issues.apache.org/jira/browse/SPARK-32520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-32520.
----------------------------------
    Fix Version/s: 3.1.0
         Assignee: Jungtaek Lim
       Resolution: Fixed

This was fixed in https://github.com/apache/spark/pull/29343 together.

> Flaky Test: KafkaSourceStressSuite.stress test with multiple topics and 
> partitions
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-32520
>                 URL: https://issues.apache.org/jira/browse/SPARK-32520
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming, Tests
>    Affects Versions: 3.1.0
>            Reporter: Hyukjin Kwon
>            Assignee: Jungtaek Lim
>            Priority: Major
>             Fix For: 3.1.0
>
>
> {{KafkaSourceStressSuite.stress test with multiple topics and partitions}} 
> seems flaky in GitHub Actions build:
> https://github.com/apache/spark/pull/29335/checks?check_run_id=940205463
> {code}
> KafkaSourceStressSuite:
> - stress test with multiple topics and partitions *** FAILED *** (2 minutes, 
> 7 seconds)
>   Timed out waiting for stream: The code passed to failAfter did not complete 
> within 30 seconds.
>   java.lang.Thread.getStackTrace(Thread.java:1559)
>       org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
>       org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
>       
> org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
>       org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
>       org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
>       
> org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
>       
> org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:471)
>       
> org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:470)
>       scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>       Caused by:      null
>       
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2156)
>               
> org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:483)
>               
> org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:472)
>               
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>               
> org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
>               
> org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
>               
> org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
>               
> org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
>               
> org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
>               
> org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
>   == Progress ==
>      AssertOnQuery(<condition>, )
>      AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = empty Range 0 until 0, message = )
>      CheckAnswer:
>      StopStream
>      AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = Range 0 until 8, message = )
>      
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7dce5824,Map(),null)
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
>      CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
>      StopStream
>      
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7255955e,Map(),null)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
> stress3), data = Range 8 until 9, message = Add topic stress7)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
> stress3), data = Range 9 until 10, message = )
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
> stress3), data = Range 10 until 15, message = Add partition)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
> stress5, stress3), data = empty Range 15 until 15, message = Add topic 
> stress9)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
> stress3), data = Range 15 until 16, message = Delete topic stress5)
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
> stress3, stress10), data = Range 16 until 23, message = Add topic stress11)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23]
>      StopStream
>      AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
> stress3, stress10), data = Range 23 until 32, message = Add partition)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, 
> stress1, stress3, stress10), data = Range 32 until 37, message = Add topic 
> stress13)
>      
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@73a476e3,Map(),null)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, 
> stress1, stress3, stress10), data = Range 37 until 38, message = )
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
> stress3, stress10), data = Range 38 until 41, message = Delete topic stress8)
>   => CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41]
>      StopStream
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
> stress1, stress3, stress10), data = empty Range 41 until 41, message = Add 
> topic stress15)
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
> stress1, stress3, stress10), data = Range 41 until 43, message = )
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
> stress1, stress3, stress10, stress16), data = Range 43 until 48, message = 
> Add topic stress17)
>      
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@46dd79a7,Map(),null)
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
> stress1, stress18, stress3, stress10, stress16), data = Range 48 until 52, 
> message = Add topic stress19)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52]
>      StopStream
>      
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6b1792a,Map(),null)
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
> stress1, stress18, stress3, stress10, stress16), data = Range 52 until 55, 
> message = )
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
> stress1, stress18, stress3, stress10, stress16), data = Range 55 until 57, 
> message = )
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
> stress1, stress18, stress3, stress10), data = empty Range 57 until 57, 
> message = Delete topic stress16)
>      AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
> stress1, stress18, stress3, stress10), data = Range 57 until 66, message = )
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
>      StopStream
>      
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c98dafc,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
> stress18, stress3, stress10), data = Range 66 until 71, message = Delete 
> topic stress14)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71]
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
> stress18, stress3, stress10), data = Range 71 until 75, message = Delete 
> topic stress1)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
> stress18, stress3, stress10), data = Range 75 until 77, message = )
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
> stress18, stress3, stress10), data = Range 77 until 86, message = Add 
> partition)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86]
>      StopStream
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
> stress1, stress18, stress3, stress10), data = Range 86 until 93, message = 
> Add topic stress21)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
> stress1, stress18, stress22, stress3, stress10), data = Range 93 until 102, 
> message = Add topic stress23)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
> stress1, stress18, stress22, stress3, stress10), data = Range 102 until 103, 
> message = Add partition)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
> stress1, stress18, stress22, stress3, stress10), data = Range 103 until 104, 
> message = )
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
> stress1, stress18, stress22, stress3, stress10), data = Range 104 until 113, 
> message = )
>      
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7d64fb6b,Map(),null)
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
> stress1, stress18, stress22, stress3, stress10), data = Range 113 until 116, 
> message = Add partition)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116]
>      StopStream
>      AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
> stress1, stress18, stress22, stress3, stress10), data = Range 116 until 123, 
> message = )
>      AddKafkaData(topics = Set(stress24, stress4, stress6, stress12, stress2, 
> stress20, stress1, stress18, stress22, stress3, stress10), data = Range 123 
> until 125, message = Add topic stress25)
>      
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50d9b521,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125]
>   == Stream ==
>   Output Mode: Append
>   Stream state: {KafkaV2[SubscribePattern[stress.*]]: 
> {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}}
>   Thread state: alive
>   Thread stack trace: scala.runtime.Statics.anyHash(Statics.java:122)
>   scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87)
>   scala.collection.immutable.HashMap.computeHash(HashMap.scala:96)
>   scala.collection.immutable.HashMap.$plus(HashMap.scala:65)
>   scala.collection.immutable.HashMap.$plus(HashMap.scala:39)
>   scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
>   scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
>   
> scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
>   scala.collection.generic.Growable$$Lambda$8/681384962.apply(Unknown Source)
>   scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>   scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>   scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
>   scala.collection.generic.GenMapFactory.apply(GenMapFactory.scala:51)
>   scala.sys.package$.env(package.scala:64)
>   org.apache.spark.util.Utils$.isTesting(Utils.scala:1908)
>   
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:134)
>   
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:133)
>   
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29)
>   
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp(AnalysisHelper.scala:157)
>   
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp$(AnalysisHelper.scala:156)
>   
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
>   
> org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1802)
>   
> org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1797)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1616/668661558.apply(Unknown
>  Source)
>   scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
>   scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
>   scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1615/260639590.apply(Unknown
>  Source)
>   scala.collection.immutable.List.foreach(List.scala:392)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1704/154914836.apply(Unknown
>  Source)
>   
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
>   
> org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
>   
> org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
>   
> org.apache.spark.sql.execution.streaming.IncrementalExecution$$Lambda$4150/1689084884.apply(Unknown
>  Source)
>   
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>   
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:135)
>   
> org.apache.spark.sql.execution.QueryExecution$$Lambda$1693/650691670.apply(Unknown
>  Source)
>   org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
>   
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:135)
>   
> org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81)
>   
> org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
>   
> org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:87)
>   
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105)
>   
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:102)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:563)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4148/1474636575.apply(Unknown
>  Source)
>   
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>   
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4069/138532033.apply$mcV$sp(Unknown
>  Source)
>   scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>   
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4067/940382466.apply$mcZ$sp(Unknown
>  Source)
>   
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
>   == Sink ==
>   0:
>   1: [1] [3] [5] [7] [2] [4] [6] [8]
>   2: [10]
>   3: [9]
>   4:
>   5: [11] [14] [12] [13] [15]
>   6: [16]
>   7: [19] [21] [17] [20] [22] [18] [23]
>   8: [30] [33] [25] [27] [32] [37] [35] [29] [34] [36] [28] [31] [24] [26]
>   9: [38]
>   == Plan ==
>   == Parsed Logical Plan ==
>   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
>   +- SerializeFromObject [input[0, int, false] AS value#26274]
>      +- MapElements 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#26273: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: 
> scala.Tuple2
>            +- Project [cast(key#26248 as string) AS key#26262, 
> cast(value#26249 as string) AS value#26263]
>               +- StreamingDataSourceV2Relation [key#26248, value#26249, 
> topic#26250, partition#26251, offset#26252L, timestamp#26253, 
> timestampType#26254], 
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, 
> KafkaV2[SubscribePattern[stress.*]], 
> {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}},
>  
> {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}
>   == Analyzed Logical Plan ==
>   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
>   +- SerializeFromObject [input[0, int, false] AS value#26274]
>      +- MapElements 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#26273: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: 
> scala.Tuple2
>            +- Project [cast(key#26248 as string) AS key#26262, 
> cast(value#26249 as string) AS value#26263]
>               +- StreamingDataSourceV2Relation [key#26248, value#26249, 
> topic#26250, partition#26251, offset#26252L, timestamp#26253, 
> timestampType#26254], 
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, 
> KafkaV2[SubscribePattern[stress.*]], 
> {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}},
>  
> {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}
>   == Optimized Logical Plan ==
>   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
>   +- SerializeFromObject [input[0, int, false] AS value#26274]
>      +- MapElements 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#26273: int
>         +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: 
> scala.Tuple2
>            +- Project [cast(key#26248 as string) AS key#26262, 
> cast(value#26249 as string) AS value#26263]
>               +- StreamingDataSourceV2Relation [key#26248, value#26249, 
> topic#26250, partition#26251, offset#26252L, timestamp#26253, 
> timestampType#26254], 
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, 
> KafkaV2[SubscribePattern[stress.*]], 
> {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}},
>  
> {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}
>   == Physical Plan ==
>   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
>   +- *(1) SerializeFromObject [input[0, int, false] AS value#26274]
>      +- *(1) MapElements 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533,
>  obj#26273: int
>         +- *(1) DeserializeToObject newInstance(class scala.Tuple2), 
> obj#26272: scala.Tuple2
>            +- *(1) Project [cast(key#26248 as string) AS key#26262, 
> cast(value#26249 as string) AS value#26263]
>               +- *(1) Project [key#26248, value#26249, topic#26250, 
> partition#26251, offset#26252L, timestamp#26253, timestampType#26254]
>                  +- MicroBatchScan[key#26248, value#26249, topic#26250, 
> partition#26251, offset#26252L, timestamp#26253, timestampType#26254] class 
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan 
> (StreamTest.scala:452)
>   org.scalatest.exceptions.TestFailedException:
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
>   at 
> org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562)
>   at org.scalatest.Assertions.fail(Assertions.scala:933)
>   at org.scalatest.Assertions.fail$(Assertions.scala:929)
>   at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562)
>   at 
> org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:452)
>   at 
> org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:788)
>   at 
> org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:764)
>   at 
> org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:334)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:53)
>   at 
> org.apache.spark.sql.streaming.StreamTest.runStressTest(StreamTest.scala:876)
>   at 
> org.apache.spark.sql.streaming.StreamTest.runStressTest$(StreamTest.scala:828)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:53)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite.$anonfun$new$157(KafkaMicroBatchSourceSuite.scala:1906)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at 
> org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158)
>   at 
> org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
>   at 
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
>   at 
> org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
>   at 
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60)
>   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
>   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
>   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60)
>   at 
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
>   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
>   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
>   at 
> org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
>   at 
> org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
>   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
>   at org.scalatest.Suite.run(Suite.scala:1112)
>   at org.scalatest.Suite.run$(Suite.scala:1094)
>   at 
> org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
>   at 
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
>   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
>   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
>   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
>   at 
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60)
>   at 
> org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
>   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
>   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
>   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60)
>   at 
> org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
>   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
>   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
>   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> This seems a bit frequent.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to