[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-07-12 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-23004:

Fix Version/s: (was: 3.0.0)
   2.4.0

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.3.1, 2.4.0
>
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called multiple times 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Target Version/s: 2.3.1, 2.4.0  (was: 2.3.1)

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Assignee: Tathagata Das
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called multiple times after {{baseIterator.hasNext}} has 
> 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Component/s: (was: Input/Output)
 Structured Streaming

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Assignee: Tathagata Das
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called multiple times after 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Target Version/s: 2.3.1

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called multiple times after {{baseIterator.hasNext}} has 
> returned false then each time it will call {{StateStore.commit}}.
>  

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Affects Version/s: 2.1.0
   2.1.1
   2.1.2
   2.2.0
   2.3.0

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Description: 
{{A structured streaming query with streaming aggregations can throw the 
following error in rare cases. 
{code:java}
java.lang.IllegalStateException: Cannot remove after already committed or 
aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
 ) at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
 at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
org.apache.spark.scheduler.Task.run(Task.scala:108) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
executor driver): java.lang.IllegalStateException: Cannot remove after already 
committed or aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
 

This can happen when the following conditions are accidentally hit. 
 # Streaming aggregation with aggregation function that is a subset of 
{{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
 (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
 # Query running in {{update}} mode
 # After the shuffle, a partition has exactly 128 records. 

This happens because of the following. 
 # The {{StateStoreSaveExec}} used in streaming aggregations has the [following 
logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
 when used in {{update}} mode.
 ## There is an iterator that reads data from its parent iterator and updates 
the StateStore.
 ## When the parent iterator is fully consumed (i.e. \{{baseIterator.hasNext}} 
returns false) then all state changes are committed by calling 
{{StateStore.commit}}. 
 ## The implementation of \{{StateStore.commit()}} in {{HDFSBackedStateStore}} 
does not allow itself to be called twice. However, the logic is such that, if 
{{hasNext}} is called multiple times after \{{baseIterator.hasNext}} has 
returned false then each time it will call \{{StateStore.commit}}.
 ## For most aggregation functions, this is okay because \{{hasNext}} is only 
called once. But thats not the case with \{{ImperativeTypedAggregates}}.
 # {\{ImperativeTypedAggregates}} are executed using 
{{ObjectHashAggregateExec}} which will try to use two kinds of hashmaps for 
aggregations. 
 ## It will first try to use an unsorted hashmap. If the size of the hashmap 
increases beyond a certain threshold (default 128), then it will switch to 
using a sorted hashmap. 
 ## The [switching 
logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala]
 in {{ObjectAggregationIterator}} (used by 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Description: 
A structured streaming query with a streaming aggregation can throw the 
following error in rare cases. 
{code:java}
java.lang.IllegalStateException: Cannot remove after already committed or 
aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
 ) at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
 at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
org.apache.spark.scheduler.Task.run(Task.scala:108) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
executor driver): java.lang.IllegalStateException: Cannot remove after already 
committed or aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
 

This can happen when the following conditions are accidentally hit. 
 # Streaming aggregation with aggregation function that is a subset of 
{{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
 (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
 # Query running in {{update}} mode
 # After the shuffle, a partition has exactly 128 records. 

This happens because of the following. 
 # The {{StateStoreSaveExec}} used in streaming aggregations has the [following 
logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
 when used in {{update}} mode.
 ## There is an iterator that reads data from its parent iterator and updates 
the StateStore.
 ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
returns false) then all state changes are committed by calling 
{{StateStore.commit}}. 
 ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
does not allow itself to be called twice. However, the logic is such that, if 
{{hasNext}} is called multiple times after {{baseIterator.hasNext}} has 
returned false then each time it will call {{StateStore.commit}}.
 ## For most aggregation functions, this is okay because {{hasNext}} is only 
called once. But thats not the case with {{ImperativeTypedAggregates}}.
 # {{ImperativeTypedAggregates}} are executed using {{ObjectHashAggregateExec}} 
which will try to use two kinds of hashmaps for aggregations. 
 ## It will first try to use an unsorted hashmap. If the size of the hashmap 
increases beyond a certain threshold (default 128), then it will switch to 
using a sorted hashmap. 
 ## The [switching 
logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala]
 in {{ObjectAggregationIterator}} (used by 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Description: 
A structured streaming query with streaming aggregations can throw the 
following error in rare cases. 
{code:java}
java.lang.IllegalStateException: Cannot remove after already committed or 
aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
 ) at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
 at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
org.apache.spark.scheduler.Task.run(Task.scala:108) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
executor driver): java.lang.IllegalStateException: Cannot remove after already 
committed or aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
 

This can happen when the following conditions are accidentally hit. 
 # Streaming aggregation with aggregation function that is a subset of 
\{{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
 (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
 # Query running in {{update}} mode
 # After the shuffle, a partition has exactly 128 records. 

This happens because of the following. 
 # The {{StateStoreSaveExec}} used in streaming aggregations has the [following 
logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
 when used in {{update}} mode.
 ## There is a iterator that 

 

 

  was:
 

 

 

 
{code:java}
java.lang.IllegalStateException: Cannot remove after already committed or 
aborted
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
at 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Description: 
 

 

 

 
{code:java}
java.lang.IllegalStateException: Cannot remove after already committed or 
aborted
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
18-01-05 13:29:57 WARN TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 
196171, localhost, executor driver): java.lang.IllegalStateException: Cannot 
remove after already committed or aborted
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659)

{code}

  was:
{code}
java.lang.IllegalStateException: Cannot remove after already committed or 
aborted
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
)
at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at