[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/22867 Merging to master / 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22867 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98831/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22867 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22867 **[Test build #98831 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98831/testReport)** for PR 22867 at commit [`c539c94`](https://github.com/apache/spark/commit/c539c949abc5b8019e7fe79aa23fb440311d788f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22867 **[Test build #98831 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98831/testReport)** for PR 22867 at commit [`c539c94`](https://github.com/apache/spark/commit/c539c949abc5b8019e7fe79aa23fb440311d788f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/22867 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin can you review the latest PR. This seems like it works without issue. I ran this on my home cluster and my larger clusters at work where the issue was first discovered and it seems to solve it. Thanks again for the help --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin you are right! I appreciate the help with this one. I will cut a patch in the AM after testing on a large scale cluster job that is taking from IBM MQ and ETLing data and shipping off to Kafka. But this looks to work: val nonExistentDirectory = new File( System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).toURI.toString --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/22867 I think the reason your original attempt didn't work is because of how you wrote the code. private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir") That seems to have too many slashes. I suggested using "File.toURI" which results in a different URI: ``` scala> new java.io.File("/tmp").toURI() res0: java.net.URI = file:/tmp/ ``` Could you try that instead? If that really doesn't work we can go with `hadoop.tmp.dir`, but I'd rather really use something that is local to the application instead of something that lives in HDFS and can pose problems if you have multiple applications, and can also leave garbage behind. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user lresende commented on the issue: https://github.com/apache/spark/pull/22867 @gss2002 Sorry I missed this initially, but great that @vanzin is helping you with the fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin can you please review latest patch thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin this seems to work.. Not sure what your thoughts are on this private val tmpDir = broadcastedHadoopConf.value.get("hadoop.tmp.dir", System.getProperty("java.io.tmpdir")) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin I made the following change and it didn't work. How do you want to proceed? --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -98,6 +98,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( override def isValid(): Boolean = true + private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir") + override def getPartitions: Array[Partition] = { assertValid() Array.tabulate(_blockIds.length) { i => @@ -136,7 +138,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // this dummy directory should not already exist otherwise the WAL will try to recover // past events from the directory and throw errors. val nonExistentDirectory = new File( - System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath + tmpDir, UUID.randomUUID().toString).getAbsolutePath writeAheadLog = WriteAheadLogUtils.createLogForReceiver( SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle) It did not work.. Looks like it needs an HDFS path.. 18/10/30 01:06:50 ERROR Executor: Exception in task 0.2 in stage 3.0 (TID 73) org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://hdpmgmt1.hdp.senia.org:8020/user/sparkstreaming/sparkstreaming/Spark_Streaming_MQ/receivedData/0/log-1540875768007-1540875828007,0,988) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:147) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:175) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Pathname /hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_02/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_01/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e from /hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_02/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_01/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:197) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:245) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.(FileBasedWriteAheadLog.scala:80) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:141) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForR
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin trying this.. I'll advise shortly private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir") --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 History from JIRA and error WriteAheadLogBackedBlockRDD in YARN Cluster Mode Fails due lack of access to HDFS path due to it using a similar name was $PWD folder from YARN AM Cluster Mode for Spark > While attempting to use Spark Streaming and WriteAheadLogs. I noticed the following errors after the driver attempted to recovery the already read data that was being written to HDFS in the checkpoint folder. After spending many hours looking at the cause of the following error below due to the fact the parent folder /hadoop exists in our HDFS FS.. I am wonder if its possible to make an option configurable to choose an alternate bogus directory that will never be used. > hadoop fs -ls / > drwx-- - dsadmdsadm 0 2017-06-20 13:20 /hadoop > hadoop fs -ls /hadoop/apps > drwx-- - dsadm dsadm 0 2017-06-20 13:20 /hadoop/apps > streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala > val nonExistentDirectory = new File( > System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath > writeAheadLog = WriteAheadLogUtils.createLogForReceiver( > SparkEnv.get.conf, nonExistentDirectory, hadoopConf) > dataRead = writeAheadLog.read(partition.walRecordHandle) > 18/10/19 00:03:03 DEBUG YarnSchedulerBackend$YarnDriverEndpoint: Launching task 72 on executor id: 1 hostname: ha20t5002dn.tech.hdp.example.com. > 18/10/19 00:03:03 DEBUG BlockManager: Getting local block broadcast_4_piece0 as bytes > 18/10/19 00:03:03 DEBUG BlockManager: Level for block broadcast_4_piece0 is StorageLevel(disk, memory, 1 replicas) > 18/10/19 00:03:03 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on ha20t5002dn.tech.hdp.example.com:32768 (size: 33.7 KB, free: 912.2 MB) > 18/10/19 00:03:03 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 71, ha20t5002dn.tech.hdp.example.com, executor 1): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://tech/user/hdpdevspark/sparkstreaming/Spark_Streaming_MQ_IDMS/receivedData/0/log-1539921695606-1539921755606,0,1017) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:145) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:173) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:173) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:173) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=hdpdevspark, access=EXECUTE, inode="/hadoop/diskc/hadoop/yarn/local/usercache/hdpdevspark/appcache/application_1539554105597_0338/container_e322_1539554105597_0338_01_02/tmp/170f36b8-9202-4556-89a4-64587c7136b6":dsadm:dsadm:drwx-- > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205) > at org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer$RangerAccessControlEnforcer.checkPermission(RangerHdfsAuthorizer.java:307) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190) > at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1827) > at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:108) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3972) > at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1130) > at org.
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @lresende any possibilities of getting this change committed or at least getting it on the radar for key changes for spark to work with YARN Cluster mode specifically since $PWD creates issues with Spark when a similar path is used within HDFS and users do not have access to it when using writeAheadLogs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22867 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22867 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22867 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org