Hey all, I've been using Structured Streaming in production for almost a year already and I want to share the bugs I found in this time. I created a test for each of the issues and put them all here: https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
I split the issues into three groups: outer joins on event time, interval joins and Spark SQL. Issues related to outer joins: - When joining three or more input streams on event time, if two or more streams don't contain an event for a join key (which is event time), no row will be output even if other streams contain an event for this join key. Tests that check for this: https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86 and https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169 - When joining aggregated stream with raw events with a stream with already aggregated events (aggregation made outside of Spark), then no row will be output if that second stream don't contain a corresponding event. Test that checks for this: https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266 - When joining two aggregated streams (aggregated in Spark), no result is produced. Test that checks for this: https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341. I've already reported this one here: https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been handled yet. Issues related to interval joins: - When joining three streams (A, B, C) using interval join on event time, in the way that B.eventTime is conditioned on A.eventTime and C.eventTime is also conditioned on A.eventTime, and then doing window aggregation based on A's event time, the result is output only after watermark crosses the window end + interval(A, B) + interval (A, C). However, I'd expect results to be output faster, i.e. when the watermark crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is that event B can happen 3 minutes after event A and event C can happen 5 minutes after A, there is no point to suspend reporting output for 8 minutes (3+5) after the end of the window if we know that no more event can be matched after 5 min from the window end (assuming window end is based on A's event time). Test that checks for this: https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32 SQL issues: - WITH clause (in contrast to subquery) seems to create a static DataFrame that can't be used in streaming joins. Test that checks for this: https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31 - Two subqueries, each aggregating data using window() functio, breaks the output schema. Test that checks for this: https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122 I'm a beginner with Scala (I'm using Structured Streaming with PySpark) so won't be able to provide fixes. But I hope the test cases I provided can be of some help. Regards, Andrzej