[ 
https://issues.apache.org/jira/browse/SPARK-21977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Burak Yavuz resolved SPARK-21977.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 2.3.0

> SinglePartition optimizations break certain Streaming Stateful Aggregation 
> requirements
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-21977
>                 URL: https://issues.apache.org/jira/browse/SPARK-21977
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Burak Yavuz
>            Assignee: Burak Yavuz
>             Fix For: 2.3.0
>
>
> This is a bit hard to explain as there are several issues here, I'll try my 
> best. Here are the requirements:
>   1. A StructuredStreaming Source that can generate empty RDDs with 0 
> partitions
>   2. A StructuredStreaming query that uses the above source, performs a 
> stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's 
> by 1
> The crux of the problem is that when a dataset has a `coalesce(1)` call, it 
> receives a `SinglePartition` partitioning scheme. This scheme satisfies most 
> required distributions used for aggregations such as HashAggregateExec. This 
> causes a world of problems:
>  Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 
> 0 partitions, nothing will be executed, the state store will not create any 
> delta files. When this happens, the next trigger fails, because the 
> StateStore fails to load the delta file for the previous trigger
>  Symptom 2. Let's say that there was data. Then in this case, if you stop 
> your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your 
> stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` 
> number of StateStores will fail to find its delta files.
> To fix the issues above, we must check that the partitioning of the child of 
> a `StatefulOperator` satisfies:
>   If the grouping expressions are empty:
>     a) AllTuple distribution
>     b) Single physical partition
>   If the grouping expressions are non empty:
>     a) Clustered distribution
>     b) spark.sql.shuffle.partition # of partitions
> whether or not coalesce(1) exists in the plan, and whether or not the input 
> RDD for the trigger has any data.
> Once you fix the above problem by adding an Exchange to the plan, you come 
> across the following bug:
> If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if 
> you have a trigger with no data, `StateStoreRestoreExec` doesn't return the 
> prior state. However, for this specific aggregation, `HashAggregateExec` 
> after the restore returns a (0, 0) row, since we're performing a count, and 
> there is no data. Then this data gets stored in `StateStoreSaveExec` causing 
> the previous counts to be overwritten and lost.
>   



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to