[ 
https://issues.apache.org/jira/browse/SPARK-22262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278629#comment-16278629
 ] 

Alban Hurtaud commented on SPARK-22262:
---------------------------------------

Hi, Can we have more info on why is this invalid ?
I am facing the exact same issue, I would expect that the query restarts from 
no checkpoints when the driver restarts.
But because the folder exist and has only for instance 2000.delta files and 
cannot find 1.delta, the query fails badly and crash for life ...
Is there something we miss ?

I looked at spark.streaming.stopGracefullyOnShutdown but it looks like it is 
not taken into account as it is not streaming...

> Failed to recover Spark Structured Streaming job from checkpoint location
> -------------------------------------------------------------------------
>
>                 Key: SPARK-22262
>                 URL: https://issues.apache.org/jira/browse/SPARK-22262
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Ubuntu Server 16.04 LTS *strong text* (AMI AWS - EC2)
> servers : 3 x m4.2xlarge
>  
>            Reporter: rohit verma
>
> Hello Everyone,
> I am using Structured Streaming + Kafka for realtime data analytics in our 
> project. I am using Spark 2.2, kafka 0.10.2.
> We are facing an issue during streaming query recovery from checkpoint at 
> application startup. As there are multiple streaming queries derived from a 
> single kafka streaming point and there are different checkpint directories 
> for every streaming query. So in case of job failure, when we restart the job 
> there are some streaming queries which fails to recover from checkpoint 
> location hence throw an exception of  *Error reading delta file*. To make the 
> job working again I need to clear the checkpoint location and all the data 
> from hdfs, then it works fine. But this should not be the case. Here I am 
> losing all the captured data which I don't want. Here are the logs :
> ------------------------------------------------------------------------------------------------------------------------
> Job aborted due to stage failure: Task 2 in stage 13.0 failed 4 times, most 
> recent failure: Lost task 2.3 in stage 13.0 (TID 831, 
> ip-172-31-10-246.us-west-2.compute.internal, executor 3): 
> java.lang.IllegalStateException: Error reading delta file 
> /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta of 
> HDFSStateStoreProvider[id = (op=0, part=2), dir = 
> /checkpointing/wifiHealthPerUserPerMinute/state/0/2]: 
> /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta does not exist
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
> ----------------------------------------------------------------------------------------------------------------------------
> Please help me out for the same. There may be workarounds for this issue, 
> please suggest me if any, or may be it is a bug.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to