[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode
[ https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229719#comment-17229719 ] Vinod KC commented on SPARK-26825: -- As a workaround, we can set a {{checkpointLocation to stream.}} eg: {{option("checkpointLocation", "path/to/checkpoint/dir")}} {{So both in client and cluster mode, same hdfs path will be referred .}} > Spark Structure Streaming job failing when submitted in cluster mode > > > Key: SPARK-26825 > URL: https://issues.apache.org/jira/browse/SPARK-26825 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Andre Araujo >Priority: Major > > I have a structured streaming job that runs successfully when launched in > "client" mode. However, when launched in "cluster" mode it fails with the > following weird messages on the error log. Note that the path in the error > message is actually a local filesystem path that has been mistakenly prefixed > with a {{hdfs://}} scheme. > {code} > 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to > hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException: > Permission denied: user=root, access=WRITE, > inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853) > {code} > I dug a little bit into this and here's what I think it's going on: > # When a new streaming query is created, the {{StreamingQueryManager}} > determines the checkpoint location > [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216]. > If neither the user nor the Spark conf specify a checkpoint location, the > location is returned by a call to {{Utils.createTempDir(namePrefix = > s"temporary").getCanonicalPath}}. >Here, I see two issues: > #* The canonical path returned by {{Utils.createTempDir}} does *not* have a > scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of > file system the path belongs to. > #* Also note that the path returned by the {{Utils.createTempDir}} call is a > local path, not a HDFS path, as the paths returned by the other two > conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster > and client modes, and the results are these: > {code} > *Client mode:* > java.io.tmpdir=/tmp > createTempDir(namePrefix = s"temporary") => > /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25 > *Cluster mode:* > java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/ > createTempDir(namePrefix = s"temporary") => > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > {code} > # This temporary checkpoint location is then [passed to the > constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276] > of the {{MicroBatchExecution}} instance > # This is the point where [{{resolvedCheckpointRoot}} is > calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89]. > Here, it's where things start to break: since the path returned by > {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default > filesystem, the code resolves the path as being a HDFS path, rather than a > local one, as shown below: > {code} > scala> import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.Path > scala> // value returned by the Utils.createTempDir method > scala> val checkpointRoot = > "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e" > checkpointRoot: String = >
[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode
[ https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043593#comment-17043593 ] Shyam commented on SPARK-26825: --- [~asdaraujo] [~gsomogyi] I am facing the same issue in spark-sql-2.4.1version , how to replace/overwrite what is returning by this Utils.createTempDir code ? Any more clue. > Spark Structure Streaming job failing when submitted in cluster mode > > > Key: SPARK-26825 > URL: https://issues.apache.org/jira/browse/SPARK-26825 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Andre Araujo >Priority: Major > > I have a structured streaming job that runs successfully when launched in > "client" mode. However, when launched in "cluster" mode it fails with the > following weird messages on the error log. Note that the path in the error > message is actually a local filesystem path that has been mistakenly prefixed > with a {{hdfs://}} scheme. > {code} > 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to > hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException: > Permission denied: user=root, access=WRITE, > inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853) > {code} > I dug a little bit into this and here's what I think it's going on: > # When a new streaming query is created, the {{StreamingQueryManager}} > determines the checkpoint location > [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216]. > If neither the user nor the Spark conf specify a checkpoint location, the > location is returned by a call to {{Utils.createTempDir(namePrefix = > s"temporary").getCanonicalPath}}. >Here, I see two issues: > #* The canonical path returned by {{Utils.createTempDir}} does *not* have a > scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of > file system the path belongs to. > #* Also note that the path returned by the {{Utils.createTempDir}} call is a > local path, not a HDFS path, as the paths returned by the other two > conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster > and client modes, and the results are these: > {code} > *Client mode:* > java.io.tmpdir=/tmp > createTempDir(namePrefix = s"temporary") => > /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25 > *Cluster mode:* > java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/ > createTempDir(namePrefix = s"temporary") => > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > {code} > # This temporary checkpoint location is then [passed to the > constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276] > of the {{MicroBatchExecution}} instance > # This is the point where [{{resolvedCheckpointRoot}} is > calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89]. > Here, it's where things start to break: since the path returned by > {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default > filesystem, the code resolves the path as being a HDFS path, rather than a > local one, as shown below: > {code} > scala> import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.Path > scala> // value returned by the Utils.createTempDir method > scala> val checkpointRoot = > "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e" > checkpointRoot: String = > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > scala> val
[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode
[ https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773727#comment-16773727 ] Jungtaek Lim commented on SPARK-26825: -- Similar issue was reported (SPARK-19909) which root reason looks same - so I guess current PR for this issue would resolve SPARK-19909 as well. > Spark Structure Streaming job failing when submitted in cluster mode > > > Key: SPARK-26825 > URL: https://issues.apache.org/jira/browse/SPARK-26825 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Andre Araujo >Priority: Major > > I have a structured streaming job that runs successfully when launched in > "client" mode. However, when launched in "cluster" mode it fails with the > following weird messages on the error log. Note that the path in the error > message is actually a local filesystem path that has been mistakenly prefixed > with a {{hdfs://}} scheme. > {code} > 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to > hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException: > Permission denied: user=root, access=WRITE, > inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853) > {code} > I dug a little bit into this and here's what I think it's going on: > # When a new streaming query is created, the {{StreamingQueryManager}} > determines the checkpoint location > [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216]. > If neither the user nor the Spark conf specify a checkpoint location, the > location is returned by a call to {{Utils.createTempDir(namePrefix = > s"temporary").getCanonicalPath}}. >Here, I see two issues: > #* The canonical path returned by {{Utils.createTempDir}} does *not* have a > scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of > file system the path belongs to. > #* Also note that the path returned by the {{Utils.createTempDir}} call is a > local path, not a HDFS path, as the paths returned by the other two > conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster > and client modes, and the results are these: > {code} > *Client mode:* > java.io.tmpdir=/tmp > createTempDir(namePrefix = s"temporary") => > /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25 > *Cluster mode:* > java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/ > createTempDir(namePrefix = s"temporary") => > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > {code} > # This temporary checkpoint location is then [passed to the > constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276] > of the {{MicroBatchExecution}} instance > # This is the point where [{{resolvedCheckpointRoot}} is > calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89]. > Here, it's where things start to break: since the path returned by > {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default > filesystem, the code resolves the path as being a HDFS path, rather than a > local one, as shown below: > {code} > scala> import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.Path > scala> // value returned by the Utils.createTempDir method > scala> val checkpointRoot = > "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e" > checkpointRoot: String = > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > scala> val checkpointPath = new
[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode
[ https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760901#comment-16760901 ] Andre Araujo commented on SPARK-26825: -- Thanks a lot, [~gsomogyi] > Spark Structure Streaming job failing when submitted in cluster mode > > > Key: SPARK-26825 > URL: https://issues.apache.org/jira/browse/SPARK-26825 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Andre Araujo >Priority: Major > > I have a structured streaming job that runs successfully when launched in > "client" mode. However, when launched in "cluster" mode it fails with the > following weird messages on the error log. Note that the path in the error > message is actually a local filesystem path that has been mistakenly prefixed > with a {{hdfs://}} scheme. > {code} > 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to > hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException: > Permission denied: user=root, access=WRITE, > inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853) > {code} > I dug a little bit into this and here's what I think it's going on: > # When a new streaming query is created, the {{StreamingQueryManager}} > determines the checkpoint location > [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216]. > If neither the user nor the Spark conf specify a checkpoint location, the > location is returned by a call to {{Utils.createTempDir(namePrefix = > s"temporary").getCanonicalPath}}. >Here, I see two issues: > #* The canonical path returned by {{Utils.createTempDir}} does *not* have a > scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of > file system the path belongs to. > #* Also note that the path returned by the {{Utils.createTempDir}} call is a > local path, not a HDFS path, as the paths returned by the other two > conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster > and client modes, and the results are these: > {code} > *Client mode:* > java.io.tmpdir=/tmp > createTempDir(namePrefix = s"temporary") => > /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25 > *Cluster mode:* > java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/ > createTempDir(namePrefix = s"temporary") => > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > {code} > # This temporary checkpoint location is then [passed to the > constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276] > of the {{MicroBatchExecution}} instance > # This is the point where [{{resolvedCheckpointRoot}} is > calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89]. > Here, it's where things start to break: since the path returned by > {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default > filesystem, the code resolves the path as being a HDFS path, rather than a > local one, as shown below: > {code} > scala> import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.Path > scala> // value returned by the Utils.createTempDir method > scala> val checkpointRoot = > "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e" > checkpointRoot: String = > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > scala> val checkpointPath = new Path(checkpointRoot) > checkpointPath: org.apache.hadoop.fs.Path = >
[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode
[ https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760777#comment-16760777 ] Gabor Somogyi commented on SPARK-26825: --- There is another PR from me which modifies this part but after it merged I'm intended to solve this as well. > Spark Structure Streaming job failing when submitted in cluster mode > > > Key: SPARK-26825 > URL: https://issues.apache.org/jira/browse/SPARK-26825 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Andre Araujo >Priority: Major > > I have a structured streaming job that runs successfully when launched in > "client" mode. However, when launched in "cluster" mode it fails with the > following weird messages on the error log. Note that the path in the error > message is actually a local filesystem path that has been mistakenly prefixed > with a {{hdfs://}} scheme. > {code} > 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to > hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException: > Permission denied: user=root, access=WRITE, > inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853) > {code} > I dug a little bit into this and here's what I think it's going on: > # When a new streaming query is created, the {{StreamingQueryManager}} > determines the checkpoint location > [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216]. > If neither the user nor the Spark conf specify a checkpoint location, the > location is returned by a call to {{Utils.createTempDir(namePrefix = > s"temporary").getCanonicalPath}}. >Here, I see two issues: > #* The canonical path returned by {{Utils.createTempDir}} does *not* have a > scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of > file system the path belongs to. > #* Also note that the path returned by the {{Utils.createTempDir}} call is a > local path, not a HDFS path, as the paths returned by the other two > conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster > and client modes, and the results are these: > {code} > *Client mode:* > java.io.tmpdir=/tmp > createTempDir(namePrefix = s"temporary") => > /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25 > *Cluster mode:* > java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/ > createTempDir(namePrefix = s"temporary") => > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > {code} > # This temporary checkpoint location is then [passed to the > constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276] > of the {{MicroBatchExecution}} instance > # This is the point where [{{resolvedCheckpointRoot}} is > calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89]. > Here, it's where things start to break: since the path returned by > {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default > filesystem, the code resolves the path as being a HDFS path, rather than a > local one, as shown below: > {code} > scala> import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.Path > scala> // value returned by the Utils.createTempDir method > scala> val checkpointRoot = > "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e" > checkpointRoot: String = > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > scala> val checkpointPath = new Path(checkpointRoot) >
[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode
[ https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760702#comment-16760702 ] Gabor Somogyi commented on SPARK-26825: --- [~asdaraujo] excellent analysis! One minor correction: * Replacing the call to Utils.createTempDir with something that creates a temp dir on default FS, rather than local filesystem This is correct: {quote}Ensuring this method returns a path qualified with a scheme (hdfs://, to avoid later fs resolution mistakes.{quote} > Spark Structure Streaming job failing when submitted in cluster mode > > > Key: SPARK-26825 > URL: https://issues.apache.org/jira/browse/SPARK-26825 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Andre Araujo >Priority: Major > > I have a structured streaming job that runs successfully when launched in > "client" mode. However, when launched in "cluster" mode it fails with the > following weird messages on the error log. Note that the path in the error > message is actually a local filesystem path that has been mistakenly prefixed > with a {{hdfs://}} scheme. > {code} > 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to > hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException: > Permission denied: user=root, access=WRITE, > inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853) > {code} > I dug a little bit into this and here's what I think it's going on: > # When a new streaming query is created, the {{StreamingQueryManager}} > determines the checkpoint location > [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216]. > If neither the user nor the Spark conf specify a checkpoint location, the > location is returned by a call to {{Utils.createTempDir(namePrefix = > s"temporary").getCanonicalPath}}. >Here, I see two issues: > #* The canonical path returned by {{Utils.createTempDir}} does *not* have a > scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of > file system the path belongs to. > #* Also note that the path returned by the {{Utils.createTempDir}} call is a > local path, not a HDFS path, as the paths returned by the other two > conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster > and client modes, and the results are these: > {code} > *Client mode:* > java.io.tmpdir=/tmp > createTempDir(namePrefix = s"temporary") => > /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25 > *Cluster mode:* > java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/ > createTempDir(namePrefix = s"temporary") => > /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e > {code} > # This temporary checkpoint location is then [passed to the > constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276] > of the {{MicroBatchExecution}} instance > # This is the point where [{{resolvedCheckpointRoot}} is > calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89]. > Here, it's where things start to break: since the path returned by > {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default > filesystem, the code resolves the path as being a HDFS path, rather than a > local one, as shown below: > {code} > scala> import org.apache.hadoop.fs.Path > import org.apache.hadoop.fs.Path > scala> // value returned by the Utils.createTempDir method > scala> val checkpointRoot = > "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e" > checkpointRoot: String = >