Re: Bugs with joins and SQL in Structured Streaming

2024-02-26 Thread Mich Talebzadeh
Hi,

These are all on spark 3.5, correct?

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 26 Feb 2024 at 22:18, Andrzej Zera  wrote:

> Hey all,
>
> I've been using Structured Streaming in production for almost a year
> already and I want to share the bugs I found in this time. I created a test
> for each of the issues and put them all here:
> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>
> I split the issues into three groups: outer joins on event time, interval
> joins and Spark SQL.
>
> Issues related to outer joins:
>
>- When joining three or more input streams on event time, if two or
>more streams don't contain an event for a join key (which is event time),
>no row will be output even if other streams contain an event for this join
>key. Tests that check for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>and
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>- When joining aggregated stream with raw events with a stream with
>already aggregated events (aggregation made outside of Spark), then no row
>will be output if that second stream don't contain a corresponding event.
>Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>- When joining two aggregated streams (aggregated in Spark), no result
>is produced. Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>I've already reported this one here:
>https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
>handled yet.
>
> Issues related to interval joins:
>
>- When joining three streams (A, B, C) using interval join on event
>time, in the way that B.eventTime is conditioned on A.eventTime and
>C.eventTime is also conditioned on A.eventTime, and then doing window
>aggregation based on A's event time, the result is output only after
>watermark crosses the window end + interval(A, B) + interval (A, C).
>However, I'd expect results to be output faster, i.e. when the watermark
>crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
>that event B can happen 3 minutes after event A and event C can happen 5
>minutes after A, there is no point to suspend reporting output for 8
>minutes (3+5) after the end of the window if we know that no more event can
>be matched after 5 min from the window end (assuming window end is based on
>A's event time). Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>
> SQL issues:
>
>- WITH clause (in contrast to subquery) seems to create a static
>DataFrame that can't be used in streaming joins. Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>- Two subqueries, each aggregating data using window() functio, breaks
>the output schema. Test that checks for this:
>
> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>
> I'm a beginner with Scala (I'm using Structured Streaming with PySpark) so
> won't be able to provide fixes. But I hope the test cases I provided can be
> of some help.
>
> Regards,
> Andrzej
>


Bugs with joins and SQL in Structured Streaming

2024-02-26 Thread Andrzej Zera
Hey all,

I've been using Structured Streaming in production for almost a year
already and I want to share the bugs I found in this time. I created a test
for each of the issues and put them all here:
https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala

I split the issues into three groups: outer joins on event time, interval
joins and Spark SQL.

Issues related to outer joins:

   - When joining three or more input streams on event time, if two or more
   streams don't contain an event for a join key (which is event time), no row
   will be output even if other streams contain an event for this join key.
   Tests that check for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
   and
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
   - When joining aggregated stream with raw events with a stream with
   already aggregated events (aggregation made outside of Spark), then no row
   will be output if that second stream don't contain a corresponding event.
   Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
   - When joining two aggregated streams (aggregated in Spark), no result
   is produced. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
   I've already reported this one here:
   https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
   handled yet.

Issues related to interval joins:

   - When joining three streams (A, B, C) using interval join on event
   time, in the way that B.eventTime is conditioned on A.eventTime and
   C.eventTime is also conditioned on A.eventTime, and then doing window
   aggregation based on A's event time, the result is output only after
   watermark crosses the window end + interval(A, B) + interval (A, C).
   However, I'd expect results to be output faster, i.e. when the watermark
   crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
   that event B can happen 3 minutes after event A and event C can happen 5
   minutes after A, there is no point to suspend reporting output for 8
   minutes (3+5) after the end of the window if we know that no more event can
   be matched after 5 min from the window end (assuming window end is based on
   A's event time). Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32

SQL issues:

   - WITH clause (in contrast to subquery) seems to create a static
   DataFrame that can't be used in streaming joins. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
   - Two subqueries, each aggregating data using window() functio, breaks
   the output schema. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122

I'm a beginner with Scala (I'm using Structured Streaming with PySpark) so
won't be able to provide fixes. But I hope the test cases I provided can be
of some help.

Regards,
Andrzej