[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state

2017-10-30 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225273#comment-16225273
 ] 

Apache Spark commented on SPARK-22305:
--

User 'joseph-torres' has created a pull request for this issue:
https://github.com/apache/spark/pull/19611

> HDFSBackedStateStoreProvider fails with StackOverflowException when 
> attempting to recover state
> ---
>
> Key: SPARK-22305
> URL: https://issues.apache.org/jira/browse/SPARK-22305
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yuval Itzchakov
>
> Environment:
> Spark: 2.2.0
> Java version: 1.8.0_112
> spark.sql.streaming.minBatchesToRetain: 100
> After an application failure due to OOM exceptions, restarting the 
> application with the existing state produces the following OOM:
> {code:java}
> java.io.IOException: com.google.protobuf.ServiceException: 
> java.lang.StackOverflowError
>   at 
> org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260)
>   at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269)
>   at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261)
>   at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at 

[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state

2017-10-27 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223000#comment-16223000
 ] 

Shixiong Zhu commented on SPARK-22305:
--

Why not just delete the whole checkpoint dir? Dropping state store will make 
Spark return wrong answers.

> HDFSBackedStateStoreProvider fails with StackOverflowException when 
> attempting to recover state
> ---
>
> Key: SPARK-22305
> URL: https://issues.apache.org/jira/browse/SPARK-22305
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yuval Itzchakov
>
> Environment:
> Spark: 2.2.0
> Java version: 1.8.0_112
> spark.sql.streaming.minBatchesToRetain: 100
> After an application failure due to OOM exceptions, restarting the 
> application with the existing state produces the following OOM:
> {code:java}
> java.io.IOException: com.google.protobuf.ServiceException: 
> java.lang.StackOverflowError
>   at 
> org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260)
>   at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269)
>   at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261)
>   at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at 

[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state

2017-10-27 Thread Yuval Itzchakov (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221982#comment-16221982
 ] 

Yuval Itzchakov commented on SPARK-22305:
-

[~zsxwing] A typical query takes ~ 0.5 seconds to execute, so about 120 
batches. 

I discovered something else which relates to the StackOverflow. It seems the 
way that `HDFSBackedStateStoreProvider` is implemented is that there is a 
correlation between the file name of the latest offset and the state store 
version Spark looks for.

But assume the following situation:

1. The state was modified in such a way that it is not backwards compatible 
with the state in the store
2. Due to 1, we delete the state in the store but keep the offset files
3. Spark starts, trying to recover the previous state, and takes the latest 
state version from the latest offset file
4. The state store recursively starts searching for the version of the state 
(which no longer exists), ending on a StackOverflowException.

Would it be possible to separate the two (state and offsets) such that it would 
be possible to automatically start with the latest stored offsets but without 
the state?

> HDFSBackedStateStoreProvider fails with StackOverflowException when 
> attempting to recover state
> ---
>
> Key: SPARK-22305
> URL: https://issues.apache.org/jira/browse/SPARK-22305
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yuval Itzchakov
>
> Environment:
> Spark: 2.2.0
> Java version: 1.8.0_112
> spark.sql.streaming.minBatchesToRetain: 100
> After an application failure due to OOM exceptions, restarting the 
> application with the existing state produces the following OOM:
> {code:java}
> java.io.IOException: com.google.protobuf.ServiceException: 
> java.lang.StackOverflowError
>   at 
> org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260)
>   at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269)
>   at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261)
>   at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
>   at 
> 

[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state

2017-10-26 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221432#comment-16221432
 ] 

Shixiong Zhu commented on SPARK-22305:
--

[~Yuval.Itzchakov] how many batches per 1 minute in your query? If there are a 
lot of batches, you can try to run your application with `--conf 
spark.sql.streaming.stateStore.maintenanceInterval=10s` to set a small interval 
as a workaround.

However, we definitely should fix this by rewriting these codes in a 
non-recursive way. 


> HDFSBackedStateStoreProvider fails with StackOverflowException when 
> attempting to recover state
> ---
>
> Key: SPARK-22305
> URL: https://issues.apache.org/jira/browse/SPARK-22305
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yuval Itzchakov
>
> Environment:
> Spark: 2.2.0
> Java version: 1.8.0_112
> spark.sql.streaming.minBatchesToRetain: 100
> After an application failure due to OOM exceptions, restarting the 
> application with the existing state produces the following OOM:
> {code:java}
> java.io.IOException: com.google.protobuf.ServiceException: 
> java.lang.StackOverflowError
>   at 
> org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260)
>   at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269)
>   at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261)
>   at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> 

[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state

2017-10-20 Thread Yuval Itzchakov (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212608#comment-16212608
 ] 

Yuval Itzchakov commented on SPARK-22305:
-

[~tdas] What do you think?

> HDFSBackedStateStoreProvider fails with StackOverflowException when 
> attempting to recover state
> ---
>
> Key: SPARK-22305
> URL: https://issues.apache.org/jira/browse/SPARK-22305
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yuval Itzchakov
>
> Environment:
> Spark: 2.2.0
> Java version: 1.8.0_112
> spark.sql.streaming.minBatchesToRetain: 100
> After an application failure due to OOM exceptions, restarting the 
> application with the existing state produces the following OOM:
> {code:java}
> java.io.IOException: com.google.protobuf.ServiceException: 
> java.lang.StackOverflowError
>   at 
> org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260)
>   at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269)
>   at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261)
>   at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
>