[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-14 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/22867
  
Merging to master / 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22867
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98831/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22867
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22867
  
**[Test build #98831 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98831/testReport)**
 for PR 22867 at commit 
[`c539c94`](https://github.com/apache/spark/commit/c539c949abc5b8019e7fe79aa23fb440311d788f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22867
  
**[Test build #98831 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98831/testReport)**
 for PR 22867 at commit 
[`c539c94`](https://github.com/apache/spark/commit/c539c949abc5b8019e7fe79aa23fb440311d788f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-14 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/22867
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-13 Thread gss2002
Github user gss2002 commented on the issue:

https://github.com/apache/spark/pull/22867
  
@vanzin can you review the latest PR. This seems like it works without 
issue. I ran this on my home cluster and my larger clusters at work where the 
issue was first discovered and it seems to solve it. Thanks again for the help


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-06 Thread gss2002
Github user gss2002 commented on the issue:

https://github.com/apache/spark/pull/22867
  
@vanzin you are right! I appreciate the help with this one. I will cut a 
patch in the AM after testing on a large scale cluster job that is taking from 
IBM MQ and ETLing data and shipping off to Kafka.

But this looks to work:
   val nonExistentDirectory = new File(
  System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString).toURI.toString


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-06 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/22867
  
I think the reason your original attempt didn't work is because of how you 
wrote the code.

private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir")

That seems to have too many slashes. I suggested using "File.toURI" which 
results in a different URI:

```
scala> new java.io.File("/tmp").toURI()
res0: java.net.URI = file:/tmp/
```

Could you try that instead? If that really doesn't work we can go with 
`hadoop.tmp.dir`, but I'd rather really use something that is local to the 
application instead of something that lives in HDFS and can pose problems if 
you have multiple applications, and can also leave garbage behind.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-05 Thread lresende
Github user lresende commented on the issue:

https://github.com/apache/spark/pull/22867
  
@gss2002 Sorry I missed this initially, but great that @vanzin is helping 
you with the fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-11-03 Thread gss2002
Github user gss2002 commented on the issue:

https://github.com/apache/spark/pull/22867
  
@vanzin can you please review latest patch thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-10-29 Thread gss2002
Github user gss2002 commented on the issue:

https://github.com/apache/spark/pull/22867
  
@vanzin this seems to work.. Not sure what your thoughts are on this

  private val tmpDir = broadcastedHadoopConf.value.get("hadoop.tmp.dir",
System.getProperty("java.io.tmpdir"))


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-10-29 Thread gss2002
Github user gss2002 commented on the issue:

https://github.com/apache/spark/pull/22867
  
@vanzin I made the following change and it didn't work. How do you want to 
proceed?
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -98,6 +98,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
 
   override def isValid(): Boolean = true
 
+  private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir")
+
   override def getPartitions: Array[Partition] = {
 assertValid()
 Array.tabulate(_blockIds.length) { i =>
@@ -136,7 +138,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
 // this dummy directory should not already exist otherwise the WAL 
will try to recover
 // past events from the directory and throw errors.
 val nonExistentDirectory = new File(
-  System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString).getAbsolutePath
+  tmpDir, UUID.randomUUID().toString).getAbsolutePath
 writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
   SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
 dataRead = writeAheadLog.read(partition.walRecordHandle)

It did not work.. Looks like it needs an HDFS path..

18/10/30 01:06:50 ERROR Executor: Exception in task 0.2 in stage 3.0 (TID 
73)
org.apache.spark.SparkException: Could not read data from write ahead log 
record 
FileBasedWriteAheadLogSegment(hdfs://hdpmgmt1.hdp.senia.org:8020/user/sparkstreaming/sparkstreaming/Spark_Streaming_MQ/receivedData/0/log-1540875768007-1540875828007,0,988)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:147)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:175)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Pathname 
/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_02/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_01/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e
 from 
/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_02/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_01/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e
 is not a valid DFS filename.
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:197)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:245)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.(FileBasedWriteAheadLog.scala:80)
at 
org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142)
at 
org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:141)
at 
org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForR

[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-10-29 Thread gss2002
Github user gss2002 commented on the issue:

https://github.com/apache/spark/pull/22867
  
@vanzin trying this.. I'll advise shortly

  private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir")



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-10-29 Thread gss2002
Github user gss2002 commented on the issue:

https://github.com/apache/spark/pull/22867
  
 History from JIRA and error

WriteAheadLogBackedBlockRDD in YARN Cluster Mode Fails due lack of access 
to HDFS path
due to it using a similar name was $PWD folder from YARN AM Cluster Mode 
for Spark
> While attempting to use Spark Streaming and WriteAheadLogs. I noticed the 
following errors
after the driver attempted to recovery the already read data that was being 
written to HDFS
in the checkpoint folder. After spending many hours looking at the cause of 
the following
error below due to the fact the parent folder /hadoop exists in our HDFS 
FS..  I am wonder
if its possible to make an option configurable to choose an alternate bogus 
directory that
will never be used.
> hadoop fs -ls /
> drwx--   - dsadmdsadm   0 2017-06-20 13:20 /hadoop
> hadoop fs -ls /hadoop/apps
> drwx--   - dsadm dsadm  0 2017-06-20 13:20 /hadoop/apps
> 
streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
>   val nonExistentDirectory = new File(
>   System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString).getAbsolutePath
> writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
>   SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
> dataRead = writeAheadLog.read(partition.walRecordHandle)
> 18/10/19 00:03:03 DEBUG YarnSchedulerBackend$YarnDriverEndpoint: 
Launching task 72 on
executor id: 1 hostname: ha20t5002dn.tech.hdp.example.com.
> 18/10/19 00:03:03 DEBUG BlockManager: Getting local block 
broadcast_4_piece0 as bytes
> 18/10/19 00:03:03 DEBUG BlockManager: Level for block broadcast_4_piece0 
is StorageLevel(disk,
memory, 1 replicas)
> 18/10/19 00:03:03 INFO BlockManagerInfo: Added broadcast_4_piece0 in 
memory on ha20t5002dn.tech.hdp.example.com:32768
(size: 33.7 KB, free: 912.2 MB)
> 18/10/19 00:03:03 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 
71, ha20t5002dn.tech.hdp.example.com,
executor 1): org.apache.spark.SparkException: Could not read data from 
write ahead log record

FileBasedWriteAheadLogSegment(hdfs://tech/user/hdpdevspark/sparkstreaming/Spark_Streaming_MQ_IDMS/receivedData/0/log-1539921695606-1539921755606,0,1017)
>   at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:145)
>   at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:173)
>   at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:173)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:173)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
>   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.security.AccessControlException: Permission 
denied: user=hdpdevspark,
access=EXECUTE, 
inode="/hadoop/diskc/hadoop/yarn/local/usercache/hdpdevspark/appcache/application_1539554105597_0338/container_e322_1539554105597_0338_01_02/tmp/170f36b8-9202-4556-89a4-64587c7136b6":dsadm:dsadm:drwx--
>   at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
>   at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
>   at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
>   at 
org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer$RangerAccessControlEnforcer.checkPermission(RangerHdfsAuthorizer.java:307)
>   at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
>   at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1827)
>   at 
org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:108)
>   at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3972)
>   at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1130)
>   at 
org.

[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-10-29 Thread gss2002
Github user gss2002 commented on the issue:

https://github.com/apache/spark/pull/22867
  
@lresende any possibilities of getting this change committed or at least 
getting it on the radar for key changes for spark to work with YARN Cluster 
mode specifically since $PWD creates issues with Spark when a similar path is 
used within HDFS and users do not have access to it when using writeAheadLogs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22867
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22867
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...

2018-10-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22867
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org