[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2020-11-10 Thread Vinod KC (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229719#comment-17229719
 ] 

Vinod KC commented on SPARK-26825:
--

As a workaround, we can set a {{checkpointLocation to stream.}}

 eg:  {{option("checkpointLocation", "path/to/checkpoint/dir")}}

{{So both in client and cluster mode, same hdfs path will be referred .}}

> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> 

[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2020-02-24 Thread Shyam (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043593#comment-17043593
 ] 

Shyam commented on SPARK-26825:
---

[~asdaraujo] [~gsomogyi] I am facing the same issue in spark-sql-2.4.1version , 
how to replace/overwrite what is returning by this Utils.createTempDir code ?

Any more clue.

> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> val 

[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2019-02-20 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773727#comment-16773727
 ] 

Jungtaek Lim commented on SPARK-26825:
--

Similar issue was reported (SPARK-19909) which root reason looks same - so I 
guess current PR for this issue would resolve SPARK-19909 as well.

> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> val checkpointPath = new 

[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2019-02-05 Thread Andre Araujo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760901#comment-16760901
 ] 

Andre Araujo commented on SPARK-26825:
--

Thanks a lot, [~gsomogyi] 

> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> val checkpointPath = new Path(checkpointRoot)
> checkpointPath: org.apache.hadoop.fs.Path = 
> 

[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2019-02-05 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760777#comment-16760777
 ] 

Gabor Somogyi commented on SPARK-26825:
---

There is another PR from me which modifies this part but after it merged I'm 
intended to solve this as well.

> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> scala> val checkpointPath = new Path(checkpointRoot)
> 

[jira] [Commented] (SPARK-26825) Spark Structure Streaming job failing when submitted in cluster mode

2019-02-05 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760702#comment-16760702
 ] 

Gabor Somogyi commented on SPARK-26825:
---

[~asdaraujo] excellent analysis! One minor correction:
 * Replacing the call to Utils.createTempDir with something that creates a temp 
dir on default FS, rather than local filesystem

This is correct:
{quote}Ensuring this method returns a path qualified with a scheme (hdfs://, to 
avoid later fs resolution mistakes.{quote}


> Spark Structure Streaming job failing when submitted in cluster mode
> 
>
> Key: SPARK-26825
> URL: https://issues.apache.org/jira/browse/SPARK-26825
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Andre Araujo
>Priority: Major
>
> I have a structured streaming job that runs successfully when launched in 
> "client" mode. However, when launched in "cluster" mode it fails with the 
> following weird messages on the error log. Note that the path in the error 
> message is actually a local filesystem path that has been mistakenly prefixed 
> with a {{hdfs://}} scheme.
> {code}
> 19/02/01 12:53:14 ERROR streaming.StreamMetadata: Error writing stream 
> metadata StreamMetadata(68f9fb30-5853-49b4-b192-f1e0483e0d95) to 
> hdfs://ns1/data/yarn/nm/usercache/root/appcache/application_1548823131831_0160/container_1548823131831_0160_02_01/tmp/temporary-3789423a-6ded-4084-aab3-3b6301c34e07/metadataorg.apache.hadoop.security.AccessControlException:
>  Permission denied: user=root, access=WRITE, 
> inode="/":hdfs:supergroup:drwxr-xr-x
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:400)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1853)
> {code}
> I dug a little bit into this and here's what I think it's going on:
> # When a new streaming query is created, the {{StreamingQueryManager}} 
> determines the checkpoint location 
> [here|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L216].
>  If neither the user nor the Spark conf specify a checkpoint location, the 
> location is returned by a call to {{Utils.createTempDir(namePrefix = 
> s"temporary").getCanonicalPath}}. 
>Here, I see two issues:
> #* The canonical path returned by {{Utils.createTempDir}} does *not* have a 
> scheme ({{hdfs://}} or {{file://}}), so, it's ambiguous as to what type of 
> file system the path belongs to.
> #* Also note that the path returned by the {{Utils.createTempDir}} call is a 
> local path, not a HDFS path, as the paths returned by the other two 
> conditions. I executed {{Utils.createTempDir}} in a test job, both in cluster 
> and client modes, and the results are these:
> {code}
> *Client mode:*
> java.io.tmpdir=/tmp
> createTempDir(namePrefix = s"temporary") => 
> /tmp/temporary-c51f1466-fd50-40c7-b136-1f2f06672e25
> *Cluster mode:*
> java.io.tmpdir=/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/
> createTempDir(namePrefix = s"temporary") => 
> /yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e
> {code}
> # This temporary checkpoint location is then [passed to the 
> constructor|https://github.com/apache/spark/blob/d811369ce23186cbb3208ad665e15408e13fea87/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L276]
>  of the {{MicroBatchExecution}} instance
> # This is the point where [{{resolvedCheckpointRoot}} is 
> calculated|https://github.com/apache/spark/blob/755f9c20761e3db900c6c2b202cd3d2c5bbfb7c0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L89].
>  Here, it's where things start to break: since the path returned by 
> {{Utils.createTempDir}} doesn't have a scheme, and since HDFS is the default 
> filesystem, the code resolves the path as being a HDFS path, rather than a 
> local one, as shown below:
> {code}
> scala> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.Path
> scala> // value returned by the Utils.createTempDir method
> scala> val checkpointRoot = 
> "/yarn/nm/usercache/root/appcache/application_154906473_0029/container_154906473_0029_01_01/tmp/temporary-47c13b28-14bd-4d1b-8acc-3e445948415e"
> checkpointRoot: String = 
>