spark ssh to slave
I have two hosts 192.168.1.15 (Master) and 192.168.1.16 (Worker) These two hosts have exchanged public keys so they have free access to each other. But when I do /sbin/start-all.sh from 192.168.1.15 I still get 192.168.1.16: Permission denied (publickey,gssapi-keyex,gssapi-with-mic). any thoughts why? or what i could check to fix this. Regards
Re: spark ssh to slave
Thanks Akhil, yes that works fine it just lets me straight in. On Mon, Jun 8, 2015 at 11:58 AM, Akhil Das wrote: > Can you do *ssh -v 192.168.1.16* from the Master machine and make sure > its able to login without password? > > Thanks > Best Regards > > On Mon, Jun 8, 2015 at 2:51 PM, James King wrote: > >> I have two hosts 192.168.1.15 (Master) and 192.168.1.16 (Worker) >> >> These two hosts have exchanged public keys so they have free access to >> each other. >> >> But when I do /sbin/start-all.sh from 192.168.1.15 I still >> get >> >> 192.168.1.16: Permission denied (publickey,gssapi-keyex,gssapi-with-mic). >> >> any thoughts why? or what i could check to fix this. >> >> Regards >> >> >> >> >> >> >> >> >> >> >
Spark + Kafka
Hi All, Which build of Spark is best when using Kafka? Regards jk
Re: Spark + Kafka
Thanks Jeff, I'm planning to use it in standalone mode, OK will use hadoop 2.4 package. Chao! On Wed, Mar 18, 2015 at 10:56 AM, Jeffrey Jedele wrote: > What you call "sub-category" are packages pre-built to run on certain > Hadoop environments. It really depends on where you want to run Spark. As > far as I know, this is mainly about the included HDFS binding - so if you > just want to play around with Spark, any of the packages should be fine. I > wouldn't use source though, because you'd have to compile it yourself. > > PS: Make sure to use "Reply to all". If you're not including the mailing > list in the response, I'm the only one who will get your message. > > Regards, > Jeff > > 2015-03-18 10:49 GMT+01:00 James King : > >> Any sub-category recommendations hadoop, MapR, CDH? >> >> On Wed, Mar 18, 2015 at 10:48 AM, James King >> wrote: >> >>> Many thanks Jeff will give it a go. >>> >>> On Wed, Mar 18, 2015 at 10:47 AM, Jeffrey Jedele < >>> jeffrey.jed...@gmail.com> wrote: >>> >>>> Probably 1.3.0 - it has some improvements in the included Kafka >>>> receiver for streaming. >>>> >>>> https://spark.apache.org/releases/spark-release-1-3-0.html >>>> >>>> Regards, >>>> Jeff >>>> >>>> 2015-03-18 10:38 GMT+01:00 James King : >>>> >>>>> Hi All, >>>>> >>>>> Which build of Spark is best when using Kafka? >>>>> >>>>> Regards >>>>> jk >>>>> >>>> >>>> >>> >> >
Re: Spark + Kafka
Thanks Khanderao. On Wed, Mar 18, 2015 at 7:18 PM, Khanderao Kand Gmail < khanderao.k...@gmail.com> wrote: > I have used various version of spark (1.0, 1.2.1) without any issues . > Though I have not significantly used kafka with 1.3.0 , a preliminary > testing revealed no issues . > > - khanderao > > > > > On Mar 18, 2015, at 2:38 AM, James King wrote: > > > > Hi All, > > > > Which build of Spark is best when using Kafka? > > > > Regards > > jk >
Writing Spark Streaming Programs
Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk
Re: Writing Spark Streaming Programs
Many thanks Gerard, this is very helpful. Cheers! On Thu, Mar 19, 2015 at 4:02 PM, Gerard Maas wrote: > Try writing this Spark Streaming idiom in Java and you'll choose Scala > soon enough: > > dstream.foreachRDD{rdd => > rdd.foreachPartition( partition => ) > } > > When deciding between Java and Scala for Spark, IMHO Scala has the > upperhand. If you're concerned with readability, have a look at the Scala > coding style recently open sourced by DataBricks: > https://github.com/databricks/scala-style-guide (btw, I don't agree a > good part of it, but recognize that it can keep the most complex Scala > constructions out of your code) > > > > On Thu, Mar 19, 2015 at 3:50 PM, James King wrote: > >> Hello All, >> >> I'm using Spark for streaming but I'm unclear one which implementation >> language to use Java, Scala or Python. >> >> I don't know anything about Python, familiar with Scala and have been >> doing Java for a long time. >> >> I think the above shouldn't influence my decision on which language to >> use because I believe the tool should, fit the problem. >> >> In terms of performance Java and Scala are comparable. However Java is OO >> and Scala is FP, no idea what Python is. >> >> If using Scala and not applying a consistent style of programming Scala >> code can become unreadable, but I do like the fact it seems to be possible >> to do so much work with so much less code, that's a strong selling point >> for me. Also it could be that the type of programming done in Spark is best >> implemented in Scala as FP language, not sure though. >> >> The question I would like your good help with is are there any other >> considerations I need to think about when deciding this? are there any >> recommendations you can make in regards to this? >> >> Regards >> jk >> >> >> >> >> >> >> >
Re: Spark + Kafka
Many thanks all for the good responses, appreciated. On Thu, Mar 19, 2015 at 8:36 AM, James King wrote: > Thanks Khanderao. > > On Wed, Mar 18, 2015 at 7:18 PM, Khanderao Kand Gmail < > khanderao.k...@gmail.com> wrote: > >> I have used various version of spark (1.0, 1.2.1) without any issues . >> Though I have not significantly used kafka with 1.3.0 , a preliminary >> testing revealed no issues . >> >> - khanderao >> >> >> >> > On Mar 18, 2015, at 2:38 AM, James King wrote: >> > >> > Hi All, >> > >> > Which build of Spark is best when using Kafka? >> > >> > Regards >> > jk >> > >
NetwrokWordCount + Spark standalone
I'm trying to run the Java NetwrokWordCount example against a simple spark standalone runtime of one master and one worker. But it doesn't seem to work, the text entered on the Netcat data server is not being picked up and printed to Eclispe console output. However if I use conf.setMaster("local[2]") it works, the correct text gets picked up and printed to Eclipse console. Any ideas why, any pointers?
Re: NetwrokWordCount + Spark standalone
Thanks Akhil, Yes indeed this is why it works when using local[2] but I'm unclear of why it doesn't work when using standalone daemons? Is there way to check what cores are being seen when running against standalone daemons? I'm running the master and worker on same ubuntu host. The Driver program is running from a windows machine. On ubuntu host command cat /proc/cpuinfo | grep processor | wc -l is giving 2 On Windows machine it is: NumberOfCores=2 NumberOfLogicalProcessors=4 On Wed, Mar 25, 2015 at 2:06 PM, Akhil Das wrote: > Spark Streaming requires you to have minimum of 2 cores, 1 for receiving > your data and the other for processing. So when you say local[2] it > basically initialize 2 threads on your local machine, 1 for receiving data > from network and the other for your word count processing. > > Thanks > Best Regards > > On Wed, Mar 25, 2015 at 6:31 PM, James King wrote: > >> I'm trying to run the Java NetwrokWordCount example against a simple >> spark standalone runtime of one master and one worker. >> >> But it doesn't seem to work, the text entered on the Netcat data server >> is not being picked up and printed to Eclispe console output. >> >> However if I use conf.setMaster("local[2]") it works, the correct text >> gets picked up and printed to Eclipse console. >> >> Any ideas why, any pointers? >> > >
Spark + Kafka
I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 142787141 ms.0 from job set of time 142787141 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 142787141 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(142787141 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 142787141 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms -> 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 142787141 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 142787141 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(142787140 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.499181 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.886121 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: app-20150401065621-0007/0 is now EXITED (Command exited with code 1) 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Executor app-20150401065621-0007/0 removed: Command exited with code 1 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to remove non-existent executor 0 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message (6.653705 ms) ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost :7077/user/Master#336117298] 2015-04-01 08:56:52 DEBUG AppClient$Cl
Re: Spark + Kafka
Thank you bit1129, >From looking at the web UI i can see 2 cores Also looking at http://spark.apache.org/docs/1.2.1/configuration.html But can't see obvious configuration for number of receivers can you help please. On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com wrote: > Please make sure that you have given more cores than Receiver numbers. > > > > > *From:* James King > *Date:* 2015-04-01 15:21 > *To:* user > *Subject:* Spark + Kafka > I have a simple setup/runtime of Kafka and Sprak. > > I have a command line consumer displaying arrivals to Kafka topic. So i > know messages are being received. > > But when I try to read from Kafka topic I get no messages, here are some > logs below. > > I'm thinking there aren't enough threads. How do i check that. > > Thank you. > > 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job > 142787141 ms.0 from job set of time 142787141 ms > 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job > 142787141 ms.0 from job set of time 142787141 ms > 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time > 142787141 ms (execution: 0.000 s) > 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event > ClearMetadata(142787141 ms) > 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time > 142787141 ms > 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old > RDDs: [] > 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: > 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were > older than 1427871405000 ms: > 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to > old RDDs: [1427871405000 ms -> 8] > 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 > 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence > list > 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received > message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] > 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received > message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] > 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled > message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] > 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 > 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 > 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, > response is 0 > 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled > message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] > 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD > BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time > 142787141 ms > 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were > older than 1427871405000 ms: 1427871405000 ms > 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time > 142787141 ms > 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches > ArrayBuffer(142787140 ms) > 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to > Actor[akka://sparkDriver/temp/$o] > 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] > received message ReviveOffers from > Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] > 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: > TaskSet_0, runningTasks: 0 > 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled > message (0.499181 ms) ReviveOffers from > Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] > 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not > accepted any resources; check your cluster UI to ensure that workers are > registered and have sufficient resources > 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] > received message ReviveOffers from > Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] > 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: > TaskSet_0, runningTasks: 0 > 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled > message (0.886121 ms) ReviveOffers from > Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] > 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received > message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) > from Actor[akka.tcp://sparkMaster@somesparkhost > :7077/user/Master#336117298] > 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: > app-20150401065621-0007/0 i
Re: Spark + Kafka
Thanks Saisai, Sure will do. But just a quick note that when i set master as "local[*]" Spark started showing Kafka messages as expected, so the problem in my view was to do with not enough threads to process the incoming data. Thanks. On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao wrote: > Would you please share your code snippet please, so we can identify is > there anything wrong in your code. > > Beside would you please grep your driver's debug log to see if there's any > debug log about "Stream xxx received block xxx", this means that Spark > Streaming is keeping receiving data from sources like Kafka. > > > 2015-04-01 16:18 GMT+08:00 James King : > >> Thank you bit1129, >> >> From looking at the web UI i can see 2 cores >> >> Also looking at http://spark.apache.org/docs/1.2.1/configuration.html >> >> But can't see obvious configuration for number of receivers can you help >> please. >> >> >> On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com wrote: >> >>> Please make sure that you have given more cores than Receiver numbers. >>> >>> >>> >>> >>> *From:* James King >>> *Date:* 2015-04-01 15:21 >>> *To:* user >>> *Subject:* Spark + Kafka >>> I have a simple setup/runtime of Kafka and Sprak. >>> >>> I have a command line consumer displaying arrivals to Kafka topic. So i >>> know messages are being received. >>> >>> But when I try to read from Kafka topic I get no messages, here are some >>> logs below. >>> >>> I'm thinking there aren't enough threads. How do i check that. >>> >>> Thank you. >>> >>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job >>> 142787141 ms.0 from job set of time 142787141 ms >>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job >>> 142787141 ms.0 from job set of time 142787141 ms >>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for >>> time 142787141 ms (execution: 0.000 s) >>> 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event >>> ClearMetadata(142787141 ms) >>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time >>> 142787141 ms >>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old >>> RDDs: [] >>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: >>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were >>> older than 1427871405000 ms: >>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to >>> old RDDs: [1427871405000 ms -> 8] >>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 >>> 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence >>> list >>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received >>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] >>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received >>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] >>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled >>> message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] >>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 >>> 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 >>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD >>> 8, response is 0 >>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled >>> message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] >>> 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD >>> BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time >>> 142787141 ms >>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that >>> were older than 1427871405000 ms: 1427871405000 ms >>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for >>> time 142787141 ms >>> 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches >>> ArrayBuffer(142787140 ms) >>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 >>> to Actor[akka://sparkDriver/temp/$o] >>> 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] >>> received message ReviveOffers from >>> Actor[akka://sparkDriver/user/Co
Re: Spark + Kafka
This is the code. And I couldn't find anything like the log you suggested. public KafkaLogConsumer(int duration, String master) { JavaStreamingContext spark = createSparkContext(duration, master); Map topics = new HashMap(); topics.put("test", 1); JavaPairDStream input = KafkaUtils.createStream(spark, "somesparkhost:2181", "groupid", topics); input.print(); spark.start(); spark.awaitTermination(); } private JavaStreamingContext createSparkContext(int duration, String master) { SparkConf sparkConf = new SparkConf() .setAppName(this.getClass().getSimpleName()) .setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(duration)); return ssc; } On Wed, Apr 1, 2015 at 11:37 AM, James King wrote: > Thanks Saisai, > > Sure will do. > > But just a quick note that when i set master as "local[*]" Spark started > showing Kafka messages as expected, so the problem in my view was to do > with not enough threads to process the incoming data. > > Thanks. > > > On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao > wrote: > >> Would you please share your code snippet please, so we can identify is >> there anything wrong in your code. >> >> Beside would you please grep your driver's debug log to see if there's >> any debug log about "Stream xxx received block xxx", this means that Spark >> Streaming is keeping receiving data from sources like Kafka. >> >> >> 2015-04-01 16:18 GMT+08:00 James King : >> >>> Thank you bit1129, >>> >>> From looking at the web UI i can see 2 cores >>> >>> Also looking at http://spark.apache.org/docs/1.2.1/configuration.html >>> >>> But can't see obvious configuration for number of receivers can you help >>> please. >>> >>> >>> On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com wrote: >>> >>>> Please make sure that you have given more cores than Receiver numbers. >>>> >>>> >>>> >>>> >>>> *From:* James King >>>> *Date:* 2015-04-01 15:21 >>>> *To:* user >>>> *Subject:* Spark + Kafka >>>> I have a simple setup/runtime of Kafka and Sprak. >>>> >>>> I have a command line consumer displaying arrivals to Kafka topic. So i >>>> know messages are being received. >>>> >>>> But when I try to read from Kafka topic I get no messages, here are >>>> some logs below. >>>> >>>> I'm thinking there aren't enough threads. How do i check that. >>>> >>>> Thank you. >>>> >>>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job >>>> 142787141 ms.0 from job set of time 142787141 ms >>>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job >>>> 142787141 ms.0 from job set of time 142787141 ms >>>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for >>>> time 142787141 ms (execution: 0.000 s) >>>> 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event >>>> ClearMetadata(142787141 ms) >>>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time >>>> 142787141 ms >>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to >>>> old RDDs: [] >>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: >>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were >>>> older than 1427871405000 ms: >>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to >>>> old RDDs: [1427871405000 ms -> 8] >>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: >>>> 8 >>>> 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence >>>> list >>>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received >>>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] >>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received >>>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] >>>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled >>>> message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] >>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 >>>> 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 >>>>
A stream of json objects using Java
I'm reading a stream of string lines that are in json format. I'm using Java with Spark. Is there a way to get this from a transformation? so that I end up with a stream of JSON objects. I would also welcome any feedback about this approach or alternative approaches. thanks jk
Spark Cluster: RECEIVED SIGNAL 15: SIGTERM
Any idea what this means, many thanks ==> logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 <== 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4 cores, 6.6 GB RAM 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0 15/04/13 07:07:22 INFO Worker: Spark home: /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/13 07:07:22 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:8081 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at http://09:8081 15/04/13 07:07:22 INFO Worker: Connecting to master akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master... 15/04/13 07:07:22 INFO Worker: Successfully registered with master spark://08:7077 *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*
Spark Directed Acyclic Graph / Jobs
Is there a good resource that explains how Spark jobs gets broken down to tasks and executions. I just need to get a better understanding of this. Regards j
Re: Spark Directed Acyclic Graph / Jobs
Thanks Jerry, The other paper you refer to is may be ? http://research.microsoft.com/pubs/63785/eurosys07.pdf Regards j On Fri, Apr 17, 2015 at 9:45 AM, Shao, Saisai wrote: > I think this paper will be a good resource ( > https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf), also the > paper of Dryad is also a good one. > > > > Thanks > > Jerry > > > > *From:* James King [mailto:jakwebin...@gmail.com] > *Sent:* Friday, April 17, 2015 3:26 PM > *To:* user > *Subject:* Spark Directed Acyclic Graph / Jobs > > > > Is there a good resource that explains how Spark jobs gets broken down to > tasks and executions. > > > > I just need to get a better understanding of this. > > > > Regards > > j > > >
Skipped Jobs
In the web ui i can see some jobs as 'skipped' what does that mean? why are these jobs skipped? do they ever get executed? Regards jk
Spark Unit Testing
I'm trying to write some unit tests for my spark code. I need to pass a JavaPairDStream to my spark class. Is there a way to create a JavaPairDStream using Java API? Also is there a good resource that covers an approach (or approaches) for unit testing using Java. Regards jk
Re: Spark Unit Testing
Hi Emre, thanks for the help will have a look. Cheers! On Tue, Apr 21, 2015 at 1:46 PM, Emre Sevinc wrote: > Hello James, > > Did you check the following resources: > > - > https://github.com/apache/spark/tree/master/streaming/src/test/java/org/apache/spark/streaming > > - > http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs > > -- > Emre Sevinç > http://www.bigindustries.be/ > > > On Tue, Apr 21, 2015 at 1:26 PM, James King wrote: > >> I'm trying to write some unit tests for my spark code. >> >> I need to pass a JavaPairDStream to my spark class. >> >> Is there a way to create a JavaPairDStream using Java API? >> >> Also is there a good resource that covers an approach (or approaches) for >> unit testing using Java. >> >> Regards >> jk >> > > > > -- > Emre Sevinc >
Auto Starting a Spark Job on Cluster Starup
What's the best way to start-up a spark job as part of starting-up the Spark cluster. I have an single uber jar for my job and want to make the start-up as easy as possible. Thanks jk
Master <-chatter -> Worker
Is there a good resource that covers what kind of chatter (communication) that goes on between driver, master and worker processes? Thanks
Spark Cluster Setup
I'm trying to find out how to setup a resilient Spark cluster. Things I'm thinking about include: - How to start multiple masters on different hosts? - there isn't a conf/masters file from what I can see Thank you.
Re: Spark Cluster Setup
Thanks Dean, Sure I have that setup locally and testing it with ZK. But to start my multiple Masters do I need to go to each host and start there or is there a better way to do this. Regards jk On Fri, Apr 24, 2015 at 5:23 PM, Dean Wampler wrote: > The convention for standalone cluster is to use Zookeeper to manage master > failover. > > http://spark.apache.org/docs/latest/spark-standalone.html > > Dean Wampler, Ph.D. > Author: Programming Scala, 2nd Edition > <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) > Typesafe <http://typesafe.com> > @deanwampler <http://twitter.com/deanwampler> > http://polyglotprogramming.com > > On Fri, Apr 24, 2015 at 5:01 AM, James King wrote: > >> I'm trying to find out how to setup a resilient Spark cluster. >> >> Things I'm thinking about include: >> >> - How to start multiple masters on different hosts? >> - there isn't a conf/masters file from what I can see >> >> >> Thank you. >> > >
Querying Cluster State
If I have 5 nodes and I wish to maintain 1 Master and 2 Workers on each node, so in total I will have 5 master and 10 Workers. Now to maintain that setup I would like to query spark regarding the number Masters and Workers that are currently available using API calls and then take some appropriate action based on the information I get back, like restart a dead Master or Worker. Is this possible? does Spark provide such API?
Re: Querying Cluster State
Thanks for the response. But no this does not answer the question. The question was: Is there a way (via some API call) to query the number and type of daemons currently running in the Spark cluster. Regards On Sun, Apr 26, 2015 at 10:12 AM, ayan guha wrote: > In my limited understanding, there must be single "leader" master in > the cluster. If there are multiple leaders, it will lead to unstable > cluster as each masters will keep scheduling independently. You should use > zookeeper for HA, so that standby masters can vote to find new leader if > the primary goes down. > > Now, you can still have multiple masters running as leaders but > conceptually they should be thought as different clusters. > > Regarding workers, they should follow their master. > > Not sure if this answers your question, as I am sure you have read the > documentation thoroughly. > > Best > Ayan > > On Sun, Apr 26, 2015 at 6:31 PM, James King wrote: > >> If I have 5 nodes and I wish to maintain 1 Master and 2 Workers on each >> node, so in total I will have 5 master and 10 Workers. >> >> Now to maintain that setup I would like to query spark regarding the >> number Masters and Workers that are currently available using API calls and >> then take some appropriate action based on the information I get back, like >> restart a dead Master or Worker. >> >> Is this possible? does Spark provide such API? >> > > > > -- > Best Regards, > Ayan Guha >
Re: Querying Cluster State
Very helpful indeed. Thank you Nicholas. On Sunday, 26 April 2015, Nicholas Chammas wrote: > The Spark web UI offers a JSON interface with some of this information. > > http://stackoverflow.com/a/29659630/877069 > > It's not an official API, so be warned that it may change unexpectedly > between versions, but you might find it helpful. > > Nick > > On Sun, Apr 26, 2015 at 9:46 AM michal.klo...@gmail.com > < > michal.klo...@gmail.com > > wrote: > >> Not sure if there's a spark native way but we've been using consul for >> this. >> >> M >> >> >> >> On Apr 26, 2015, at 5:17 AM, James King > > wrote: >> >> Thanks for the response. >> >> But no this does not answer the question. >> >> The question was: Is there a way (via some API call) to query the number >> and type of daemons currently running in the Spark cluster. >> >> Regards >> >> >> On Sun, Apr 26, 2015 at 10:12 AM, ayan guha > > wrote: >> >>> In my limited understanding, there must be single "leader" master in >>> the cluster. If there are multiple leaders, it will lead to unstable >>> cluster as each masters will keep scheduling independently. You should use >>> zookeeper for HA, so that standby masters can vote to find new leader if >>> the primary goes down. >>> >>> Now, you can still have multiple masters running as leaders but >>> conceptually they should be thought as different clusters. >>> >>> Regarding workers, they should follow their master. >>> >>> Not sure if this answers your question, as I am sure you have read the >>> documentation thoroughly. >>> >>> Best >>> Ayan >>> >>> On Sun, Apr 26, 2015 at 6:31 PM, James King >> > wrote: >>> >>>> If I have 5 nodes and I wish to maintain 1 Master and 2 Workers on each >>>> node, so in total I will have 5 master and 10 Workers. >>>> >>>> Now to maintain that setup I would like to query spark regarding the >>>> number Masters and Workers that are currently available using API calls and >>>> then take some appropriate action based on the information I get back, like >>>> restart a dead Master or Worker. >>>> >>>> Is this possible? does Spark provide such API? >>>> >>> >>> >>> >>> -- >>> Best Regards, >>> Ayan Guha >>> >> >>
spark-defaults.conf
I renamed spark-defaults.conf.template to spark-defaults.conf and invoked spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh But I still get failed to launch org.apache.spark.deploy.worker.Worker: --properties-file FILE Path to a custom Spark properties file. Default is conf/spark-defaults.conf. But I'm thinking it should pick up the default spark-defaults.conf from conf dir Am I expecting or doing something wrong? Regards jk
Re: spark-defaults.conf
Thanks. I've set SPARK_HOME and SPARK_CONF_DIR appropriately in .bash_profile But when I start worker like this spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh I still get failed to launch org.apache.spark.deploy.worker.Worker: Default is conf/spark-defaults.conf. 15/04/27 11:51:33 DEBUG Utils: Shutdown hook called On Mon, Apr 27, 2015 at 1:15 PM, Zoltán Zvara wrote: > You should distribute your configuration file to workers and set the > appropriate environment variables, like HADOOP_HOME, SPARK_HOME, > HADOOP_CONF_DIR, SPARK_CONF_DIR. > > On Mon, Apr 27, 2015 at 12:56 PM James King wrote: > >> I renamed spark-defaults.conf.template to spark-defaults.conf >> and invoked >> >> spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh >> >> But I still get >> >> failed to launch org.apache.spark.deploy.worker.Worker: >> --properties-file FILE Path to a custom Spark properties file. >> Default is conf/spark-defaults.conf. >> >> But I'm thinking it should pick up the default spark-defaults.conf from >> conf dir >> >> Am I expecting or doing something wrong? >> >> Regards >> jk >> >> >>
Re: spark-defaults.conf
So no takers regarding why spark-defaults.conf is not being picked up. Here is another one: If Zookeeper is configured in Spark why do we need to start a slave like this: spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh 1 spark://somemaster:7077 i.e. why do we need to specify the master url explicitly Shouldn't Spark just consult with ZK and us the active master? Or is ZK only used during failure? On Mon, Apr 27, 2015 at 1:53 PM, James King wrote: > Thanks. > > I've set SPARK_HOME and SPARK_CONF_DIR appropriately in .bash_profile > > But when I start worker like this > > spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh > > I still get > > failed to launch org.apache.spark.deploy.worker.Worker: > Default is conf/spark-defaults.conf. > 15/04/27 11:51:33 DEBUG Utils: Shutdown hook called > > > > > > On Mon, Apr 27, 2015 at 1:15 PM, Zoltán Zvara > wrote: > >> You should distribute your configuration file to workers and set the >> appropriate environment variables, like HADOOP_HOME, SPARK_HOME, >> HADOOP_CONF_DIR, SPARK_CONF_DIR. >> >> On Mon, Apr 27, 2015 at 12:56 PM James King >> wrote: >> >>> I renamed spark-defaults.conf.template to spark-defaults.conf >>> and invoked >>> >>> spark-1.3.0-bin-hadoop2.4/sbin/start-slave.sh >>> >>> But I still get >>> >>> failed to launch org.apache.spark.deploy.worker.Worker: >>> --properties-file FILE Path to a custom Spark properties file. >>> Default is conf/spark-defaults.conf. >>> >>> But I'm thinking it should pick up the default spark-defaults.conf from >>> conf dir >>> >>> Am I expecting or doing something wrong? >>> >>> Regards >>> jk >>> >>> >>> >
submitting to multiple masters
I have multiple masters running and I'm trying to submit an application using spark-1.3.0-bin-hadoop2.4/bin/spark-submit with this config (i.e. a comma separated list of master urls) --master spark://master01:7077,spark://master02:7077 But getting this exception Exception in thread "main" org.apache.spark.SparkException: Invalid master URL: spark://spark://master02:7077 What am I doing wrong? Many Thanks jk
Re: submitting to multiple masters
Indeed, many thanks Michal for the help. On Tue, Apr 28, 2015 at 2:20 PM, michal.klo...@gmail.com < michal.klo...@gmail.com> wrote: > According to the docs it should go like this: > spark://host1:port1,host2:port2 > > > > https://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper > > Thanks > M > > > On Apr 28, 2015, at 8:13 AM, James King wrote: > > I have multiple masters running and I'm trying to submit an application > using > > spark-1.3.0-bin-hadoop2.4/bin/spark-submit > > with this config (i.e. a comma separated list of master urls) > > --master spark://master01:7077,spark://master02:7077 > > > But getting this exception > > Exception in thread "main" org.apache.spark.SparkException: Invalid master > URL: spark://spark://master02:7077 > > > What am I doing wrong? > > Many Thanks > jk > >
Enabling Event Log
I'm unclear why I'm getting this exception. It seems to have realized that I want to enable Event Logging but ignoring where I want it to log to i.e. file:/opt/cb/tmp/spark-events which does exist. spark-default.conf # Example: spark.master spark://master1:7077,master2:7077 spark.eventLog.enabled true spark.history.fs.logDirectoryfile:/opt/cb/tmp/spark-events # spark.eventLog.dir hdfs://namenode:8021/directory # spark.serializer org.apache.spark.serializer.KryoSerializer # spark.driver.memory 5g # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three" Exception following job submission: spark.eventLog.enabled=true spark.history.fs.logDirectory=file:/opt/cb/tmp/spark-events spark.jars=file:/opt/cb/scripts/spark-streamer/cb-spark-streamer-1.0-SNAPSHOT.jar spark.master=spark://master1:7077,master2:7077 Exception in thread "main" java.lang.IllegalArgumentException: Log directory /tmp/spark-events does not exist. at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99) at org.apache.spark.SparkContext.(SparkContext.scala:399) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:75) at org.apache.spark.streaming.api.java.JavaStreamingContext.(JavaStreamingContext.scala:132) Many Thanks jk
Re: Enabling Event Log
Oops! well spotted. Many thanks Shixiong. On Fri, May 1, 2015 at 1:25 AM, Shixiong Zhu wrote: > "spark.history.fs.logDirectory" is for the history server. For Spark > applications, they should use "spark.eventLog.dir". Since you commented out > "spark.eventLog.dir", it will be "/tmp/spark-events". And this folder does > not exits. > > Best Regards, > Shixiong Zhu > > 2015-04-29 23:22 GMT-07:00 James King : > > I'm unclear why I'm getting this exception. >> >> It seems to have realized that I want to enable Event Logging but >> ignoring where I want it to log to i.e. file:/opt/cb/tmp/spark-events which >> does exist. >> >> spark-default.conf >> >> # Example: >> spark.master spark://master1:7077,master2:7077 >> spark.eventLog.enabled true >> spark.history.fs.logDirectoryfile:/opt/cb/tmp/spark-events >> # spark.eventLog.dir hdfs://namenode:8021/directory >> # spark.serializer >> org.apache.spark.serializer.KryoSerializer >> # spark.driver.memory 5g >> # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value >> -Dnumbers="one two three" >> >> Exception following job submission: >> >> spark.eventLog.enabled=true >> spark.history.fs.logDirectory=file:/opt/cb/tmp/spark-events >> >> spark.jars=file:/opt/cb/scripts/spark-streamer/cb-spark-streamer-1.0-SNAPSHOT.jar >> spark.master=spark://master1:7077,master2:7077 >> Exception in thread "main" java.lang.IllegalArgumentException: Log >> directory /tmp/spark-events does not exist. >> at >> org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99) >> at org.apache.spark.SparkContext.(SparkContext.scala:399) >> at >> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642) >> at >> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:75) >> at >> org.apache.spark.streaming.api.java.JavaStreamingContext.(JavaStreamingContext.scala:132) >> >> >> Many Thanks >> jk >> > >
Receiver Fault Tolerance
In the O'reilly book Learning Spark Chapter 10 section 24/7 Operation It talks about 'Receiver Fault Tolerance' I'm unsure of what a Receiver is here, from reading it sounds like when you submit an application to the cluster in cluster mode i.e. *--deploy-mode cluster *the driver program will run on a Worker and this case this Worker is seen as a Receiver because it is consuming messages from the source. Is the above understanding correct? or is there more to it?
Re: Receiver Fault Tolerance
Many thanks all, your responses have been very helpful. Cheers On Wed, May 6, 2015 at 2:14 PM, ayan guha wrote: > > https://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics > > > On Wed, May 6, 2015 at 10:09 PM, James King wrote: > >> In the O'reilly book Learning Spark Chapter 10 section 24/7 Operation >> >> It talks about 'Receiver Fault Tolerance' >> >> I'm unsure of what a Receiver is here, from reading it sounds like when >> you submit an application to the cluster in cluster mode i.e. *--deploy-mode >> cluster *the driver program will run on a Worker and this case this >> Worker is seen as a Receiver because it is consuming messages from the >> source. >> >> >> Is the above understanding correct? or is there more to it? >> >> >> > > > -- > Best Regards, > Ayan Guha >
Stop Cluster Mode Running App
I submitted a Spark Application in cluster mode and now every time I stop the cluster and restart it the job resumes execution. I even killed a daemon called DriverWrapper it stops the app but it resumes again. How can stop this application from running?
Re: Stop Cluster Mode Running App
Many Thanks Silvio, Someone also suggested using something similar : ./bin/spark-class org.apache.spark.deploy.Client kill Regards jk On Fri, May 8, 2015 at 2:12 AM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Hi James, > > If you’re on Spark 1.3 you can use the kill command in spark-submit to > shut it down. You’ll need the driver id from the Spark UI or from when you > submitted the app. > > spark-submit --master spark://master:7077 --kill > > Thanks, > Silvio > > From: James King > Date: Wednesday, May 6, 2015 at 12:02 PM > To: user > Subject: Stop Cluster Mode Running App > > I submitted a Spark Application in cluster mode and now every time I > stop the cluster and restart it the job resumes execution. > > I even killed a daemon called DriverWrapper it stops the app but it > resumes again. > > How can stop this application from running? >
Submit Spark application in cluster mode and supervised
I have two hosts host01 and host02 (lets call them) I run one Master and two Workers on host01 I also run one Master and two Workers on host02 Now I have 1 LIVE Master on host01 and a STANDBY Master on host02 The LIVE Master is aware of all Workers in the cluster Now I submit a Spark application using bin/spark-submit --class SomeApp --deploy-mode cluster --supervise --master spark://host01:7077 Some.jar This to make the driver resilient to failure. Now the interesting part: If I stop the cluster (all daemons on all hosts) and restart the Master and Workers *only* on host01 the job resumes! as expected. But if I stop the cluster (all daemons on all hosts) and restart the Master and Workers *only* on host02 the job *does not* resume execution! why? I can see the driver on host02 WebUI listed but no job execution. Please let me know why. Am I wrong to expect it to resume execution in this case?
Re: Submit Spark application in cluster mode and supervised
BTW I'm using Spark 1.3.0. Thanks On Fri, May 8, 2015 at 5:22 PM, James King wrote: > I have two hosts host01 and host02 (lets call them) > > I run one Master and two Workers on host01 > I also run one Master and two Workers on host02 > > Now I have 1 LIVE Master on host01 and a STANDBY Master on host02 > The LIVE Master is aware of all Workers in the cluster > > Now I submit a Spark application using > > bin/spark-submit --class SomeApp --deploy-mode cluster --supervise > --master spark://host01:7077 Some.jar > > This to make the driver resilient to failure. > > Now the interesting part: > > If I stop the cluster (all daemons on all hosts) and restart > the Master and Workers *only* on host01 the job resumes! as expected. > > But if I stop the cluster (all daemons on all hosts) and restart the > Master and Workers *only* on host02 the job *does not* > resume execution! why? > > I can see the driver on host02 WebUI listed but no job execution. Please > let me know why. > > Am I wrong to expect it to resume execution in this case? > > > > > >
Cluster mode and supervised app with multiple Masters
Why does this not work ./spark-1.3.0-bin-hadoop2.4/bin/spark-submit --class SomeApp --deploy-mode cluster --supervise --master spark://host01:7077,host02:7077 Some.jar With exception: Caused by: java.lang.NumberFormatException: For input string: "7077,host02:7077" It seems to accept only one master. Can this be done with multiple Masters? Thanks
Re: Submit Spark application in cluster mode and supervised
Many Thanks Silvio, What I found out later is the if there was catastrophic failure and all the daemons fail at the same time before any fail-over takes place in this case when you bring back the cluster up the the job resumes only on the Master is was last running on before the failure. Otherwise during partial failure normal fail-over takes place and the driver is handed over to another Master. Which answers my initial question. Regards jk On Fri, May 8, 2015 at 7:34 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > If you’re using multiple masters with ZooKeeper then you should set > your master URL to be > > spark://host01:7077,host02:7077 > > And the property spark.deploy.recoveryMode=ZOOKEEPER > > See here for more info: > http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper > > From: James King > Date: Friday, May 8, 2015 at 11:22 AM > To: user > Subject: Submit Spark application in cluster mode and supervised > > I have two hosts host01 and host02 (lets call them) > > I run one Master and two Workers on host01 > I also run one Master and two Workers on host02 > > Now I have 1 LIVE Master on host01 and a STANDBY Master on host02 > The LIVE Master is aware of all Workers in the cluster > > Now I submit a Spark application using > > bin/spark-submit --class SomeApp --deploy-mode cluster --supervise > --master spark://host01:7077 Some.jar > > This to make the driver resilient to failure. > > Now the interesting part: > > If I stop the cluster (all daemons on all hosts) and restart > the Master and Workers *only* on host01 the job resumes! as expected. > > But if I stop the cluster (all daemons on all hosts) and restart the > Master and Workers *only* on host02 the job *does not* > resume execution! why? > > I can see the driver on host02 WebUI listed but no job execution. Please > let me know why. > > Am I wrong to expect it to resume execution in this case? > > > > > >
Master HA
I know that it is possible to use Zookeeper and File System (not for production use) to achieve HA. Are there any other options now or in the near future?
Re: Master HA
Thanks Akhil, I'm using Spark in standalone mode so i guess Mesos is not an option here. On Tue, May 12, 2015 at 1:27 PM, Akhil Das wrote: > Mesos has a HA option (of course it includes zookeeper) > > Thanks > Best Regards > > On Tue, May 12, 2015 at 4:53 PM, James King wrote: > >> I know that it is possible to use Zookeeper and File System (not for >> production use) to achieve HA. >> >> Are there any other options now or in the near future? >> > >
Reading Real Time Data only from Kafka
What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das wrote: > Yep, you can try this lowlevel Kafka receiver > https://github.com/dibbhatt/kafka-spark-consumer. Its much more > flexible/reliable than the one comes with Spark. > > Thanks > Best Regards > > On Tue, May 12, 2015 at 5:15 PM, James King wrote: > >> What I want is if the driver dies for some reason and it is restarted I >> want to read only messages that arrived into Kafka following the restart of >> the driver program and re-connection to Kafka. >> >> Has anyone done this? any links or resources that can help explain this? >> >> Regards >> jk >> >> >> >
Re: Reading Real Time Data only from Kafka
Thanks Cody. Here are the events: - Spark app connects to Kafka first time and starts consuming - Messages 1 - 10 arrive at Kafka then Spark app gets them - Now driver dies - Messages 11 - 15 arrive at Kafka - Spark driver program reconnects - Then Messages 16 - 20 arrive Kafka What I want is that Spark ignores 11 - 15 but should process 16 - 20 since they arrived after the driver reconnected to Kafka Is this what happens by default in your suggestion? On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger wrote: > I don't think it's accurate for Akhil to claim that the linked library is > "much more flexible/reliable" than what's available in Spark at this point. > > James, what you're describing is the default behavior for the > createDirectStream api available as part of spark since 1.3. The kafka > parameter auto.offset.reset defaults to largest, ie start at the most > recent available message. > > This is described at > http://spark.apache.org/docs/latest/streaming-kafka-integration.html The > createDirectStream api implementation is described in detail at > https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md > > If for some reason you're stuck using an earlier version of spark, you can > accomplish what you want simply by starting the job using a new consumer > group (there will be no prior state in zookeeper, so it will start > consuming according to auto.offset.reset) > > On Tue, May 12, 2015 at 7:26 AM, James King wrote: > >> Very nice! will try and let you know, thanks. >> >> On Tue, May 12, 2015 at 2:25 PM, Akhil Das >> wrote: >> >>> Yep, you can try this lowlevel Kafka receiver >>> https://github.com/dibbhatt/kafka-spark-consumer. Its much more >>> flexible/reliable than the one comes with Spark. >>> >>> Thanks >>> Best Regards >>> >>> On Tue, May 12, 2015 at 5:15 PM, James King >>> wrote: >>> >>>> What I want is if the driver dies for some reason and it is restarted I >>>> want to read only messages that arrived into Kafka following the restart of >>>> the driver program and re-connection to Kafka. >>>> >>>> Has anyone done this? any links or resources that can help explain this? >>>> >>>> Regards >>>> jk >>>> >>>> >>>> >>> >> >
Re: Reading Real Time Data only from Kafka
Many thanks both, appreciate the help. On Tue, May 12, 2015 at 4:18 PM, Cody Koeninger wrote: > Yes, that's what happens by default. > > If you want to be super accurate about it, you can also specify the exact > starting offsets for every topic/partition. > > On Tue, May 12, 2015 at 9:01 AM, James King wrote: > >> Thanks Cody. >> >> Here are the events: >> >> - Spark app connects to Kafka first time and starts consuming >> - Messages 1 - 10 arrive at Kafka then Spark app gets them >> - Now driver dies >> - Messages 11 - 15 arrive at Kafka >> - Spark driver program reconnects >> - Then Messages 16 - 20 arrive Kafka >> >> What I want is that Spark ignores 11 - 15 >> but should process 16 - 20 since they arrived after the driver >> reconnected to Kafka >> >> Is this what happens by default in your suggestion? >> >> >> >> >> >> On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger >> wrote: >> >>> I don't think it's accurate for Akhil to claim that the linked library >>> is "much more flexible/reliable" than what's available in Spark at this >>> point. >>> >>> James, what you're describing is the default behavior for the >>> createDirectStream api available as part of spark since 1.3. The kafka >>> parameter auto.offset.reset defaults to largest, ie start at the most >>> recent available message. >>> >>> This is described at >>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html >>> The createDirectStream api implementation is described in detail at >>> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md >>> >>> If for some reason you're stuck using an earlier version of spark, you >>> can accomplish what you want simply by starting the job using a new >>> consumer group (there will be no prior state in zookeeper, so it will start >>> consuming according to auto.offset.reset) >>> >>> On Tue, May 12, 2015 at 7:26 AM, James King >>> wrote: >>> >>>> Very nice! will try and let you know, thanks. >>>> >>>> On Tue, May 12, 2015 at 2:25 PM, Akhil Das >>>> wrote: >>>> >>>>> Yep, you can try this lowlevel Kafka receiver >>>>> https://github.com/dibbhatt/kafka-spark-consumer. Its much more >>>>> flexible/reliable than the one comes with Spark. >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Tue, May 12, 2015 at 5:15 PM, James King >>>>> wrote: >>>>> >>>>>> What I want is if the driver dies for some reason and it is restarted >>>>>> I want to read only messages that arrived into Kafka following the >>>>>> restart >>>>>> of the driver program and re-connection to Kafka. >>>>>> >>>>>> Has anyone done this? any links or resources that can help explain >>>>>> this? >>>>>> >>>>>> Regards >>>>>> jk >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
Kafka Direct Approach + Zookeeper
From: http://spark.apache.org/docs/latest/streaming-kafka-integration.html I'm trying to use the direct approach to read messages form Kafka. Kafka is running as a cluster and configured with Zookeeper. On the above page it mentions: "In the Kafka parameters, you must specify either *metadata.broker.list* or *bootstrap.servers*. ..." Can someone please explain the difference of between the two config parameters? And which one is more relevant in my case? Regards jk
Kafka + Direct + Zookeeper
I'm trying Kafka Direct approach (for consume) but when I use only this config: kafkaParams.put("group.id", groupdid); kafkaParams.put("zookeeper.connect", zookeeperHostAndPort + "/cb_kafka"); I get this Exception in thread "main" org.apache.spark.SparkException: Must specify metadata.broker.list or bootstrap.servers Zookeeper should have enough information to provide connection details? or am I missing something?
Re: Kafka Direct Approach + Zookeeper
Many thanks Cody and contributors for the help. On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger wrote: > Either one will work, there is no semantic difference. > > The reason I designed the direct api to accept both of those keys is > because they were used to define lists of brokers in pre-existing Kafka > project apis. I don't know why the Kafka project chose to use 2 different > configuration keys. > > On Wed, May 13, 2015 at 5:00 AM, James King wrote: > >> From: >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html >> >> I'm trying to use the direct approach to read messages form Kafka. >> >> Kafka is running as a cluster and configured with Zookeeper. >> >> On the above page it mentions: >> >> "In the Kafka parameters, you must specify either *metadata.broker.list* >> or *bootstrap.servers*. ..." >> >> Can someone please explain the difference of between the two config >> parameters? >> >> And which one is more relevant in my case? >> >> Regards >> jk >> > >
Re: Kafka Direct Approach + Zookeeper
Looking at Consumer Configs in http://kafka.apache.org/documentation.html#consumerconfigs The properties *metadata.broker.list* or *bootstrap.servers *are not mentioned. Should I need these for consume side? On Wed, May 13, 2015 at 3:52 PM, James King wrote: > Many thanks Cody and contributors for the help. > > > On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger > wrote: > >> Either one will work, there is no semantic difference. >> >> The reason I designed the direct api to accept both of those keys is >> because they were used to define lists of brokers in pre-existing Kafka >> project apis. I don't know why the Kafka project chose to use 2 different >> configuration keys. >> >> On Wed, May 13, 2015 at 5:00 AM, James King >> wrote: >> >>> From: >>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html >>> >>> I'm trying to use the direct approach to read messages form Kafka. >>> >>> Kafka is running as a cluster and configured with Zookeeper. >>> >>> On the above page it mentions: >>> >>> "In the Kafka parameters, you must specify either *metadata.broker.list* >>> or *bootstrap.servers*. ..." >>> >>> Can someone please explain the difference of between the two config >>> parameters? >>> >>> And which one is more relevant in my case? >>> >>> Regards >>> jk >>> >> >> >
Re: Kafka Direct Approach + Zookeeper
Many thanks Cody! On Wed, May 13, 2015 at 4:22 PM, Cody Koeninger wrote: > In my mind, this isn't really a producer vs consumer distinction, this is > a broker vs zookeeper distinction. > > The producer apis talk to brokers. The low level consumer api (what direct > stream uses) also talks to brokers. The high level consumer api talks to > zookeeper, at least initially. > > TLDR; don't worry about it, just specify either of metadata.broker.list or > bootstrap.servers, using the exact same "host:port,host:port" format, and > you're good to go. > > > On Wed, May 13, 2015 at 9:03 AM, James King wrote: > >> Looking at Consumer Configs in >> http://kafka.apache.org/documentation.html#consumerconfigs >> >> The properties *metadata.broker.list* or *bootstrap.servers *are not >> mentioned. >> >> Should I need these for consume side? >> >> On Wed, May 13, 2015 at 3:52 PM, James King >> wrote: >> >>> Many thanks Cody and contributors for the help. >>> >>> >>> On Wed, May 13, 2015 at 3:44 PM, Cody Koeninger >>> wrote: >>> >>>> Either one will work, there is no semantic difference. >>>> >>>> The reason I designed the direct api to accept both of those keys is >>>> because they were used to define lists of brokers in pre-existing Kafka >>>> project apis. I don't know why the Kafka project chose to use 2 different >>>> configuration keys. >>>> >>>> On Wed, May 13, 2015 at 5:00 AM, James King >>>> wrote: >>>> >>>>> From: >>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html >>>>> >>>>> I'm trying to use the direct approach to read messages form Kafka. >>>>> >>>>> Kafka is running as a cluster and configured with Zookeeper. >>>>> >>>>> On the above page it mentions: >>>>> >>>>> "In the Kafka parameters, you must specify either >>>>> *metadata.broker.list* or *bootstrap.servers*. ..." >>>>> >>>>> Can someone please explain the difference of between the two config >>>>> parameters? >>>>> >>>>> And which one is more relevant in my case? >>>>> >>>>> Regards >>>>> jk >>>>> >>>> >>>> >>> >> >
Worker Spark Port
I understated that this port value is randomly selected. Is there a way to enforce which spark port a Worker should use?
Re: Worker Spark Port
Indeed, many thanks. On Wednesday, 13 May 2015, Cody Koeninger wrote: > I believe most ports are configurable at this point, look at > > http://spark.apache.org/docs/latest/configuration.html > > search for ".port" > > On Wed, May 13, 2015 at 9:38 AM, James King > wrote: > >> I understated that this port value is randomly selected. >> >> Is there a way to enforce which spark port a Worker should use? >> > >
Re: Worker Spark Port
So I'm using code like this to use specific ports: val conf = new SparkConf() .setMaster(master) .setAppName("namexxx") .set("spark.driver.port", "51810") .set("spark.fileserver.port", "51811") .set("spark.broadcast.port", "51812") .set("spark.replClassServer.port", "51813") .set("spark.blockManager.port", "51814") .set("spark.executor.port", "51815") My question now is : Will the master forward the spark.executor.port value (to use) to the worker when it hands it a task to do? Also the property spark.executor.port is different from the Worker spark port, how can I make the Worker run on a specific port? Regards jk On Wed, May 13, 2015 at 7:51 PM, James King wrote: > Indeed, many thanks. > > > On Wednesday, 13 May 2015, Cody Koeninger wrote: > >> I believe most ports are configurable at this point, look at >> >> http://spark.apache.org/docs/latest/configuration.html >> >> search for ".port" >> >> On Wed, May 13, 2015 at 9:38 AM, James King >> wrote: >> >>> I understated that this port value is randomly selected. >>> >>> Is there a way to enforce which spark port a Worker should use? >>> >> >>
Re: Worker Spark Port
I think this answers my question "executors, on the other hand, are bound with an application, ie spark context. Thus you modify executor properties through a context." Many Thanks. jk On Fri, May 15, 2015 at 3:23 PM, ayan guha wrote: > Hi > > I think you are mixing things a bit. > > Worker is part of the cluster. So it is governed by cluster manager. If > you are running standalone cluster, then you can modify spark-env and > configure SPARK_WORKER_PORT. > > executors, on the other hand, are bound with an application, ie spark > context. Thus you modify executor properties through a context. > > So, master != driver and executor != worker. > > Best > Ayan > > On Fri, May 15, 2015 at 7:52 PM, James King wrote: > >> So I'm using code like this to use specific ports: >> >> val conf = new SparkConf() >> .setMaster(master) >> .setAppName("namexxx") >> .set("spark.driver.port", "51810") >> .set("spark.fileserver.port", "51811") >> .set("spark.broadcast.port", "51812") >> .set("spark.replClassServer.port", "51813") >> .set("spark.blockManager.port", "51814") >> .set("spark.executor.port", "51815") >> >> My question now is : Will the master forward the spark.executor.port value >> (to use) to the worker when it hands it a task to do? >> >> Also the property spark.executor.port is different from the Worker spark >> port, how can I make the Worker run on a specific port? >> >> Regards >> >> jk >> >> >> On Wed, May 13, 2015 at 7:51 PM, James King >> wrote: >> >>> Indeed, many thanks. >>> >>> >>> On Wednesday, 13 May 2015, Cody Koeninger wrote: >>> >>>> I believe most ports are configurable at this point, look at >>>> >>>> http://spark.apache.org/docs/latest/configuration.html >>>> >>>> search for ".port" >>>> >>>> On Wed, May 13, 2015 at 9:38 AM, James King >>>> wrote: >>>> >>>>> I understated that this port value is randomly selected. >>>>> >>>>> Is there a way to enforce which spark port a Worker should use? >>>>> >>>> >>>> >> > > > -- > Best Regards, > Ayan Guha >