[ https://issues.apache.org/jira/browse/SPARK-21977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Burak Yavuz resolved SPARK-21977. --------------------------------- Resolution: Fixed Fix Version/s: 2.3.0 > SinglePartition optimizations break certain Streaming Stateful Aggregation > requirements > --------------------------------------------------------------------------------------- > > Key: SPARK-21977 > URL: https://issues.apache.org/jira/browse/SPARK-21977 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.0 > Reporter: Burak Yavuz > Assignee: Burak Yavuz > Fix For: 2.3.0 > > > This is a bit hard to explain as there are several issues here, I'll try my > best. Here are the requirements: > 1. A StructuredStreaming Source that can generate empty RDDs with 0 > partitions > 2. A StructuredStreaming query that uses the above source, performs a > stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's > by 1 > The crux of the problem is that when a dataset has a `coalesce(1)` call, it > receives a `SinglePartition` partitioning scheme. This scheme satisfies most > required distributions used for aggregations such as HashAggregateExec. This > causes a world of problems: > Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive > 0 partitions, nothing will be executed, the state store will not create any > delta files. When this happens, the next trigger fails, because the > StateStore fails to load the delta file for the previous trigger > Symptom 2. Let's say that there was data. Then in this case, if you stop > your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your > stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` > number of StateStores will fail to find its delta files. > To fix the issues above, we must check that the partitioning of the child of > a `StatefulOperator` satisfies: > If the grouping expressions are empty: > a) AllTuple distribution > b) Single physical partition > If the grouping expressions are non empty: > a) Clustered distribution > b) spark.sql.shuffle.partition # of partitions > whether or not coalesce(1) exists in the plan, and whether or not the input > RDD for the trigger has any data. > Once you fix the above problem by adding an Exchange to the plan, you come > across the following bug: > If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if > you have a trigger with no data, `StateStoreRestoreExec` doesn't return the > prior state. However, for this specific aggregation, `HashAggregateExec` > after the restore returns a (0, 0) row, since we're performing a count, and > there is no data. Then this data gets stored in `StateStoreSaveExec` causing > the previous counts to be overwritten and lost. > -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org