[jira] [Commented] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245191#comment-16245191 ] Apache Spark commented on SPARK-22403: -- User 'wypoon' has created a pull request for this issue: https://github.com/apache/spark/pull/19703 > StructuredKafkaWordCount example fails in YARN cluster mode > --- > > Key: SPARK-22403 > URL: https://issues.apache.org/jira/browse/SPARK-22403 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Wing Yew Poon > > When I run the StructuredKafkaWordCount example in YARN client mode, it runs > fine. However, when I run it in YARN cluster mode, the application errors > during initialization, and dies after the default number of YARN application > attempts. In the AM log, I see > {noformat} > 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value > AS STRING) > 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to > /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata > org.apache.hadoop.security.AccessControlException: Permission denied: > user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) > at > org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) > ... > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) > {noformat} > Looking at StreamingQueryManager#createQuery, we have > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198 > {code} > val checkpointLocation = userSpecifiedCheckpointLocation.map { ... > ... >
[jira] [Commented] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16227321#comment-16227321 ] Shixiong Zhu commented on SPARK-22403: -- Yeah, feel free to submit a PR to improve the example. > StructuredKafkaWordCount example fails in YARN cluster mode > --- > > Key: SPARK-22403 > URL: https://issues.apache.org/jira/browse/SPARK-22403 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Wing Yew Poon > > When I run the StructuredKafkaWordCount example in YARN client mode, it runs > fine. However, when I run it in YARN cluster mode, the application errors > during initialization, and dies after the default number of YARN application > attempts. In the AM log, I see > {noformat} > 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value > AS STRING) > 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to > /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata > org.apache.hadoop.security.AccessControlException: Permission denied: > user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) > at > org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) > ... > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) > {noformat} > Looking at StreamingQueryManager#createQuery, we have > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198 > {code} > val checkpointLocation = userSpecifiedCheckpointLocation.map { ... > ... > }.orElse { > ... > }.getOrElse
[jira] [Commented] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16227298#comment-16227298 ] Wing Yew Poon commented on SPARK-22403: --- I realize that in a production application, one would set checkpointLocation and avoid this issue. However, there is evidently a problem in the code that handles the case when checkpointLocation is not set and a temporary checkpoint location is created. Also, the StructuredKafkaWordCount example does not accept a parameter for setting the checkpointLocation. > StructuredKafkaWordCount example fails in YARN cluster mode > --- > > Key: SPARK-22403 > URL: https://issues.apache.org/jira/browse/SPARK-22403 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Wing Yew Poon > > When I run the StructuredKafkaWordCount example in YARN client mode, it runs > fine. However, when I run it in YARN cluster mode, the application errors > during initialization, and dies after the default number of YARN application > attempts. In the AM log, I see > {noformat} > 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value > AS STRING) > 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to > /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata > org.apache.hadoop.security.AccessControlException: Permission denied: > user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) > at > org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) > ... > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) > {noformat} > Looking at
[jira] [Commented] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16227266#comment-16227266 ] Shixiong Zhu commented on SPARK-22403: -- Yeah, Spark creates a temp directory for you. You can set "checkpointLocation" by yourself to avoid this issue. I don't know if there is an API to create a temp directory for all types of file systems. > StructuredKafkaWordCount example fails in YARN cluster mode > --- > > Key: SPARK-22403 > URL: https://issues.apache.org/jira/browse/SPARK-22403 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Wing Yew Poon > > When I run the StructuredKafkaWordCount example in YARN client mode, it runs > fine. However, when I run it in YARN cluster mode, the application errors > during initialization, and dies after the default number of YARN application > attempts. In the AM log, I see > {noformat} > 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value > AS STRING) > 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to > /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata > org.apache.hadoop.security.AccessControlException: Permission denied: > user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) > at > org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) > ... > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) > {noformat} > Looking at StreamingQueryManager#createQuery, we have >
[jira] [Commented] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225913#comment-16225913 ] Wing Yew Poon commented on SPARK-22403: --- The simplest change that will solve the problem in this particular scenario, is to change the Utils.createTempDir(namePrefix = s"temporary") call in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L208 to Utils.createTempDir(root = "/tmp", namePrefix = "temporary"). In my view, using "/tmp" is not worse, in fact is better, than using System.getProperty("java.io.tmpdir"). However, others may know of better solutions. > StructuredKafkaWordCount example fails in YARN cluster mode > --- > > Key: SPARK-22403 > URL: https://issues.apache.org/jira/browse/SPARK-22403 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Wing Yew Poon > > When I run the StructuredKafkaWordCount example in YARN client mode, it runs > fine. However, when I run it in YARN cluster mode, the application errors > during initialization, and dies after the default number of YARN application > attempts. In the AM log, I see > {noformat} > 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value > AS STRING) > 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to > /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata > org.apache.hadoop.security.AccessControlException: Permission denied: > user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) > at > org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) > ... > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) > at