Hey all,

I've been using Structured Streaming in production for almost a year
already and I want to share the bugs I found in this time. I created a test
for each of the issues and put them all here:
https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala

I split the issues into three groups: outer joins on event time, interval
joins and Spark SQL.

Issues related to outer joins:

   - When joining three or more input streams on event time, if two or more
   streams don't contain an event for a join key (which is event time), no row
   will be output even if other streams contain an event for this join key.
   Tests that check for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
   and
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
   - When joining aggregated stream with raw events with a stream with
   already aggregated events (aggregation made outside of Spark), then no row
   will be output if that second stream don't contain a corresponding event.
   Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
   - When joining two aggregated streams (aggregated in Spark), no result
   is produced. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
   I've already reported this one here:
   https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
   handled yet.

Issues related to interval joins:

   - When joining three streams (A, B, C) using interval join on event
   time, in the way that B.eventTime is conditioned on A.eventTime and
   C.eventTime is also conditioned on A.eventTime, and then doing window
   aggregation based on A's event time, the result is output only after
   watermark crosses the window end + interval(A, B) + interval (A, C).
   However, I'd expect results to be output faster, i.e. when the watermark
   crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
   that event B can happen 3 minutes after event A and event C can happen 5
   minutes after A, there is no point to suspend reporting output for 8
   minutes (3+5) after the end of the window if we know that no more event can
   be matched after 5 min from the window end (assuming window end is based on
   A's event time). Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32

SQL issues:

   - WITH clause (in contrast to subquery) seems to create a static
   DataFrame that can't be used in streaming joins. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
   - Two subqueries, each aggregating data using window() functio, breaks
   the output schema. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122

I'm a beginner with Scala (I'm using Structured Streaming with PySpark) so
won't be able to provide fixes. But I hope the test cases I provided can be
of some help.

Regards,
Andrzej

Reply via email to