[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin can you review the latest PR. This seems like it works without issue. I ran this on my home cluster and my larger clusters at work where the issue was first discovered and it seems to solve it. Thanks again for the help --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin you are right! I appreciate the help with this one. I will cut a patch in the AM after testing on a large scale cluster job that is taking from IBM MQ and ETLing data and shipping off to Kafka. But this looks to work: val nonExistentDirectory = new File( System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).toURI.toString --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin can you please review latest patch thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin this seems to work.. Not sure what your thoughts are on this private val tmpDir = broadcastedHadoopConf.value.get("hadoop.tmp.dir", System.getProperty("java.io.tmpdir")) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin I made the following change and it didn't work. How do you want to proceed? --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -98,6 +98,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( override def isValid(): Boolean = true + private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir") + override def getPartitions: Array[Partition] = { assertValid() Array.tabulate(_blockIds.length) { i => @@ -136,7 +138,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // this dummy directory should not already exist otherwise the WAL will try to recover // past events from the directory and throw errors. val nonExistentDirectory = new File( - System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath + tmpDir, UUID.randomUUID().toString).getAbsolutePath writeAheadLog = WriteAheadLogUtils.createLogForReceiver( SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle) It did not work.. Looks like it needs an HDFS path.. 18/10/30 01:06:50 ERROR Executor: Exception in task 0.2 in stage 3.0 (TID 73) org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://hdpmgmt1.hdp.senia.org:8020/user/sparkstreaming/sparkstreaming/Spark_Streaming_MQ/receivedData/0/log-1540875768007-1540875828007,0,988) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:147) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:175) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:175) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Pathname /hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_02/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_01/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e from /hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_02/file:/hadoop/yarn/local/usercache/sparkstreaming/appcache/application_1540875581273_0002/container_e09_1540875581273_0002_01_01/tmp/57b411fc-d4ed-4ac9-a32c-ecfc901dc29e is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:197) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:245) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.(FileBasedWriteAheadLog.scala:80) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:141) at org.apache.spark.
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @vanzin trying this.. I'll advise shortly private val tmpDir = "file:///" + System.getProperty("java.io.tmpdir") --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 History from JIRA and error WriteAheadLogBackedBlockRDD in YARN Cluster Mode Fails due lack of access to HDFS path due to it using a similar name was $PWD folder from YARN AM Cluster Mode for Spark > While attempting to use Spark Streaming and WriteAheadLogs. I noticed the following errors after the driver attempted to recovery the already read data that was being written to HDFS in the checkpoint folder. After spending many hours looking at the cause of the following error below due to the fact the parent folder /hadoop exists in our HDFS FS.. I am wonder if its possible to make an option configurable to choose an alternate bogus directory that will never be used. > hadoop fs -ls / > drwx-- - dsadmdsadm 0 2017-06-20 13:20 /hadoop > hadoop fs -ls /hadoop/apps > drwx-- - dsadm dsadm 0 2017-06-20 13:20 /hadoop/apps > streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala > val nonExistentDirectory = new File( > System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath > writeAheadLog = WriteAheadLogUtils.createLogForReceiver( > SparkEnv.get.conf, nonExistentDirectory, hadoopConf) > dataRead = writeAheadLog.read(partition.walRecordHandle) > 18/10/19 00:03:03 DEBUG YarnSchedulerBackend$YarnDriverEndpoint: Launching task 72 on executor id: 1 hostname: ha20t5002dn.tech.hdp.example.com. > 18/10/19 00:03:03 DEBUG BlockManager: Getting local block broadcast_4_piece0 as bytes > 18/10/19 00:03:03 DEBUG BlockManager: Level for block broadcast_4_piece0 is StorageLevel(disk, memory, 1 replicas) > 18/10/19 00:03:03 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on ha20t5002dn.tech.hdp.example.com:32768 (size: 33.7 KB, free: 912.2 MB) > 18/10/19 00:03:03 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 71, ha20t5002dn.tech.hdp.example.com, executor 1): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://tech/user/hdpdevspark/sparkstreaming/Spark_Streaming_MQ_IDMS/receivedData/0/log-1539921695606-1539921755606,0,1017) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:145) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:173) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:173) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:173) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=hdpdevspark, access=EXECUTE, inode="/hadoop/diskc/hadoop/yarn/local/usercache/hdpdevspark/appcache/application_1539554105597_0338/container_e322_1539554105597_0338_01_02/tmp/170f36b8-9202-4556-89a4-64587c7136b6":dsadm:dsadm:drwx-- > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205) > at org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer$RangerAccessControlEnforcer.checkPermission(RangerHdfsAuthorizer.java:307) > at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190) > at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1827) > at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:108) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInf
[GitHub] spark pull request #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN...
Github user gss2002 commented on a diff in the pull request: https://github.com/apache/spark/pull/22867#discussion_r229130982 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala --- @@ -136,7 +139,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // this dummy directory should not already exist otherwise the WAL will try to recover // past events from the directory and throw errors. val nonExistentDirectory = new File( - System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath + tmpDir, UUID.randomUUID().toString).getAbsolutePath --- End diff -- I'll run some tests over here on my cluster with a few changes and see what happens --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN...
Github user gss2002 commented on a diff in the pull request: https://github.com/apache/spark/pull/22867#discussion_r229129992 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala --- @@ -136,7 +139,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // this dummy directory should not already exist otherwise the WAL will try to recover // past events from the directory and throw errors. val nonExistentDirectory = new File( - System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath + tmpDir, UUID.randomUUID().toString).getAbsolutePath --- End diff -- @vanzin I wondered the same thing I figured for now I'd generate the patch to try to get some feedback. This only affects the YARN Cluster mode as it looks $PWD which resolves to the local disk where the job is executing. I can look at making the change to use localFS if it seems reasonable? does that work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluste...
Github user gss2002 commented on the issue: https://github.com/apache/spark/pull/22867 @lresende any possibilities of getting this change committed or at least getting it on the radar for key changes for spark to work with YARN Cluster mode specifically since $PWD creates issues with Spark when a similar path is used within HDFS and users do not have access to it when using writeAheadLogs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22867: [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN...
GitHub user gss2002 opened a pull request: https://github.com/apache/spark/pull/22867 [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluster Mode Fails ⦠â¦due lack of access to tmpDir from $PWD to HDFS WriteAheadLogBackedBlockRDD usage of java.io.tmpdir will fail if $PWD resolves to a folder in HDFS and the Spark YARN Cluster job does not have the correct access to this folder in regards to the dummy folder. So this patch provides an option to set spark.streaming.receiver.blockStore.tmpdir to override java.io.tmpdir which sets $PWD from YARN Cluster mode. ## What changes were proposed in this pull request? This change provides an option to override the java.io.tmpdir option so that when $PWD is resolved in YARN Cluster mode Spark does not attempt to use this folder and instead use the folder provided with the following option: spark.streaming.receiver.blockStore.tmpdir ## How was this patch tested? Patch was manually tested on a Spark Streaming Job with Write Ahead logs in Cluster mode. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gss2002/spark SPARK-25778 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22867.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22867 commit 4970827038fb18f74c1b7975d7bfc00609dc9405 Author: gss2002 Date: 2018-10-28T04:22:31Z [SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluster Mode Fails due lack of access to tmpDir from $PWD to HDFS WriteAheadLogBackedBlockRDD usage of java.io.tmpdir will fail if $PWD resolves to a folder in HDFS and the Spark YARN Cluster job does not have the correct access to this folder in regards to the dummy folder. So this patch provides an option to set spark.streaming.receiver.blockStore.tmpdir to override java.io.tmpdir which sets $PWD from YARN Cluster mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2706][SQL] Enable Spark to support Hive...
Github user gss2002 commented on the pull request: https://github.com/apache/spark/pull/2241#issuecomment-56185334 We have been using this fix for a few weeks now against Hive 13. The only outstanding issue I see and this could be something larger is the fact that Spark Thrift service doesn't seem to support the hive.server2.enable.doAs = true. It doesn't set proxy user. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org