GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/21124
SPARK-23004 ## What changes were proposed in this pull request? A structured streaming query with a streaming aggregation can throw the following error in rare cases. ``` java.lang.IllegalStateException: Cannot remove after already committed or aborted at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659 ) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:80) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111) at org.apache.spark.sql.execution.aggregate.ObjectHashAgg regateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at ...  ``` This can happen when the following conditions are accidentally hit. # Streaming aggregation with aggregation function that is a subset of [`TypedImperativeAggregation`](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473) (for example, `collect_set`, `collect_list`, `percentile`, etc.). # Query running in `update}` mode # After the shuffle, a partition has exactly 128 records. This happens because of the following. 1. The `StateStoreSaveExec` used in streaming aggregations has the [following logic](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359) when used in `update` mode. - There is an iterator that reads data from its parent iterator and updates the StateStore. - When the parent iterator is fully consumed (i.e. `baseIterator.hasNext` returns false) then all state changes are committed by calling {{StateStore.commit}}. - The implementation of `StateStore.commit()` in `HDFSBackedStateStore` does not allow itself to be called twice. However, the logic is such that, if `hasNext` is called multiple times after `baseIterator.hasNext` has returned false then each time it will call `StateStore.commit`. - For most aggregation functions, this is okay because `hasNext` is only called once. But thats not the case with `ImperativeTypedAggregates`. 1. `ImperativeTypedAggregates` are executed using `ObjectHashAggregateExec` which will try to use two kinds of hashmaps for aggregations. - It will first try to use an unsorted hashmap. If the size of the hashmap increases beyond a certain threshold (default 128), then it will switch to using a sorted hashmap. - The [switching logic](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala) in `ObjectAggregationIterator` (used by `ObjectHashAggregateExec`)  is such that when the number of records matches the threshold (i.e. 128), it will end up calling the `iterator.hasNext` twice. When combined with the above two conditions are combined, it leads to the above error. This latent bug has existed since Spark 2.1 when `ObjectHashAggregateExec` was introduced in Spark. The solution is to use `NextIterator` or `CompletionIterator`, each of which has a flag to prevent the "onCompletion" tasks from being called more than once. In this PR, I chose to implement using `NextIterator`. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21124 ---- commit a78ba3769399165a39954046b304a36e4b9a6533 Author: Tathagata Das <tathagata.das1565@...> Date: 2018-04-23T01:22:09Z SPARK-23004 ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org