GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/21124

    SPARK-23004

    ## What changes were proposed in this pull request?
    
    A structured streaming query with a streaming aggregation can throw the 
following error in rare cases. 
    ```
    java.lang.IllegalStateException: Cannot remove after already committed or 
aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
 ) at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
 at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:80)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
 at org.apache.spark.sql.execution.aggregate.ObjectHashAgg
 
regateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
...
     ```
    This can happen when the following conditions are accidentally hit. 
     # Streaming aggregation with aggregation function that is a subset of 
[`TypedImperativeAggregation`](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473)
 (for example, `collect_set`, `collect_list`, `percentile`, etc.). 
     # Query running in `update}` mode
     # After the shuffle, a partition has exactly 128 records. 
    
    This happens because of the following. 
     1. The `StateStoreSaveExec` used in streaming aggregations has the 
[following 
logic](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359)
 when used in `update` mode.
       - There is an iterator that reads data from its parent iterator and 
updates the StateStore.    
       - When the parent iterator is fully consumed (i.e. 
`baseIterator.hasNext` returns false) then all state changes are committed by 
calling {{StateStore.commit}}. 
       - The implementation of `StateStore.commit()` in `HDFSBackedStateStore` 
does not allow itself to be called twice. However, the logic is such that, if 
`hasNext` is called multiple times after `baseIterator.hasNext` has returned 
false then each time it will call `StateStore.commit`.
       - For most aggregation functions, this is okay because `hasNext` is only 
called once. But thats not the case with `ImperativeTypedAggregates`.
    
     1. `ImperativeTypedAggregates` are executed using 
`ObjectHashAggregateExec` which will try to use two kinds of hashmaps for 
aggregations. 
       - It will first try to use an unsorted hashmap. If the size of the 
hashmap increases beyond a certain threshold (default 128), then it will switch 
to using a sorted hashmap. 
       - The [switching 
logic](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala)
 in `ObjectAggregationIterator` (used by `ObjectHashAggregateExec`)  is such 
that when the number of records matches the threshold (i.e. 128), it will end 
up calling the `iterator.hasNext` twice.
    
    When combined with the above two conditions are combined, it leads to the 
above error. This latent bug has existed since Spark 2.1 when 
`ObjectHashAggregateExec` was introduced in Spark.
    
    The solution is to use `NextIterator` or `CompletionIterator`, each of 
which has a flag to prevent the "onCompletion" tasks from being called more 
than once. In this PR, I chose to implement using `NextIterator`.
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-23004

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21124
    
----
commit a78ba3769399165a39954046b304a36e4b9a6533
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-04-23T01:22:09Z

    SPARK-23004

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to