[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state
[ https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225273#comment-16225273 ] Apache Spark commented on SPARK-22305: -- User 'joseph-torres' has created a pull request for this issue: https://github.com/apache/spark/pull/19611 > HDFSBackedStateStoreProvider fails with StackOverflowException when > attempting to recover state > --- > > Key: SPARK-22305 > URL: https://issues.apache.org/jira/browse/SPARK-22305 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yuval Itzchakov > > Environment: > Spark: 2.2.0 > Java version: 1.8.0_112 > spark.sql.streaming.minBatchesToRetain: 100 > After an application failure due to OOM exceptions, restarting the > application with the existing state produces the following OOM: > {code:java} > java.io.IOException: com.google.protobuf.ServiceException: > java.lang.StackOverflowError > at > org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303) > at > org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269) > at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at
[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state
[ https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223000#comment-16223000 ] Shixiong Zhu commented on SPARK-22305: -- Why not just delete the whole checkpoint dir? Dropping state store will make Spark return wrong answers. > HDFSBackedStateStoreProvider fails with StackOverflowException when > attempting to recover state > --- > > Key: SPARK-22305 > URL: https://issues.apache.org/jira/browse/SPARK-22305 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yuval Itzchakov > > Environment: > Spark: 2.2.0 > Java version: 1.8.0_112 > spark.sql.streaming.minBatchesToRetain: 100 > After an application failure due to OOM exceptions, restarting the > application with the existing state produces the following OOM: > {code:java} > java.io.IOException: com.google.protobuf.ServiceException: > java.lang.StackOverflowError > at > org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303) > at > org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269) > at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at
[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state
[ https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221982#comment-16221982 ] Yuval Itzchakov commented on SPARK-22305: - [~zsxwing] A typical query takes ~ 0.5 seconds to execute, so about 120 batches. I discovered something else which relates to the StackOverflow. It seems the way that `HDFSBackedStateStoreProvider` is implemented is that there is a correlation between the file name of the latest offset and the state store version Spark looks for. But assume the following situation: 1. The state was modified in such a way that it is not backwards compatible with the state in the store 2. Due to 1, we delete the state in the store but keep the offset files 3. Spark starts, trying to recover the previous state, and takes the latest state version from the latest offset file 4. The state store recursively starts searching for the version of the state (which no longer exists), ending on a StackOverflowException. Would it be possible to separate the two (state and offsets) such that it would be possible to automatically start with the latest stored offsets but without the state? > HDFSBackedStateStoreProvider fails with StackOverflowException when > attempting to recover state > --- > > Key: SPARK-22305 > URL: https://issues.apache.org/jira/browse/SPARK-22305 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yuval Itzchakov > > Environment: > Spark: 2.2.0 > Java version: 1.8.0_112 > spark.sql.streaming.minBatchesToRetain: 100 > After an application failure due to OOM exceptions, restarting the > application with the existing state produces the following OOM: > {code:java} > java.io.IOException: com.google.protobuf.ServiceException: > java.lang.StackOverflowError > at > org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303) > at > org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269) > at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295) > at >
[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state
[ https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221432#comment-16221432 ] Shixiong Zhu commented on SPARK-22305: -- [~Yuval.Itzchakov] how many batches per 1 minute in your query? If there are a lot of batches, you can try to run your application with `--conf spark.sql.streaming.stateStore.maintenanceInterval=10s` to set a small interval as a workaround. However, we definitely should fix this by rewriting these codes in a non-recursive way. > HDFSBackedStateStoreProvider fails with StackOverflowException when > attempting to recover state > --- > > Key: SPARK-22305 > URL: https://issues.apache.org/jira/browse/SPARK-22305 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yuval Itzchakov > > Environment: > Spark: 2.2.0 > Java version: 1.8.0_112 > spark.sql.streaming.minBatchesToRetain: 100 > After an application failure due to OOM exceptions, restarting the > application with the existing state produces the following OOM: > {code:java} > java.io.IOException: com.google.protobuf.ServiceException: > java.lang.StackOverflowError > at > org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303) > at > org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269) > at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at >
[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state
[ https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212608#comment-16212608 ] Yuval Itzchakov commented on SPARK-22305: - [~tdas] What do you think? > HDFSBackedStateStoreProvider fails with StackOverflowException when > attempting to recover state > --- > > Key: SPARK-22305 > URL: https://issues.apache.org/jira/browse/SPARK-22305 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Yuval Itzchakov > > Environment: > Spark: 2.2.0 > Java version: 1.8.0_112 > spark.sql.streaming.minBatchesToRetain: 100 > After an application failure due to OOM exceptions, restarting the > application with the existing state produces the following OOM: > {code:java} > java.io.IOException: com.google.protobuf.ServiceException: > java.lang.StackOverflowError > at > org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303) > at > org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269) > at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295) > at scala.Option.getOrElse(Option.scala:121) > at >