[jira] [Commented] (SPARK-21696) State Store can't handle corrupted snapshots
[ https://issues.apache.org/jira/browse/SPARK-21696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144009#comment-16144009 ] Apache Spark commented on SPARK-21696: -- User 'nonsleepr' has created a pull request for this issue: https://github.com/apache/spark/pull/18911 > State Store can't handle corrupted snapshots > > > Key: SPARK-21696 > URL: https://issues.apache.org/jira/browse/SPARK-21696 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0 >Reporter: Alexander Bessonov >Priority: Critical > Fix For: 2.2.1, 3.0.0 > > > State store's asynchronous maintenance task (generation of Snapshot files) is > not rescheduled if crashed which might lead to corrupted snapshots. > In our case, on multiple occasions, executors died during maintenance task > with Out Of Memory error which led to following error on recovery: > {code:none} > 17/08/07 20:12:24 WARN TaskSetManager: Lost task 3.1 in stage 102.0 (TID > 3314, dnj2-bach-r2n10.bloomberg.com, executor 94): java.io.EOFException > at java.io.DataInputStream.readInt(DataInputStream.java:392) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:436) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220) > at > org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at >
[jira] [Commented] (SPARK-21696) State Store can't handle corrupted snapshots
[ https://issues.apache.org/jira/browse/SPARK-21696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16122078#comment-16122078 ] Alexander Bessonov commented on SPARK-21696: {{HDFSBackedStateStoreProvider.doMaintenance()}} will supress any {{NonFatal}} exceptions. {{startMaintenanceIfNeeded.startMaintenanceIfNeeded()}} wouldn't restart maintenance if crashed. State Store still can function even when snapshot file is corrupted by simply falling back to deltas. > State Store can't handle corrupted snapshots > > > Key: SPARK-21696 > URL: https://issues.apache.org/jira/browse/SPARK-21696 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0 >Reporter: Alexander Bessonov >Priority: Critical > > State store's asynchronous maintenance task (generation of Snapshot files) is > not rescheduled if crashed which might lead to corrupted snapshots. > In our case, on multiple occasions, executors died during maintenance task > with Out Of Memory error which led to following error on recovery: > {code:none} > 17/08/07 20:12:24 WARN TaskSetManager: Lost task 3.1 in stage 102.0 (TID > 3314, dnj2-bach-r2n10.bloomberg.com, executor 94): java.io.EOFException > at java.io.DataInputStream.readInt(DataInputStream.java:392) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:436) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220) > at > org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at >