Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark:// 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.com wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: *package* *com.spark*; *import* scala.Tuple2; *import* *kafka*.serializer.Decoder; *import* *kafka*.serializer.Encoder; *import* org.apache.spark.streaming.Duration; *import* org.apache.spark.*; *import* org.apache.spark.api.java.function.*; *import* org.apache.spark.api.java.*; *import* *org.apache.spark.streaming.kafka*.KafkaUtils; *import* *org.apache.spark.streaming.kafka*.*; *import* org.apache.spark.streaming.api.java.JavaStreamingContext; *import* org.apache.spark.streaming.api.java.JavaPairDStream; *import* org.apache.spark.streaming.api.java.JavaDStream; *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; *import* java.util.Map; *import* java.util.HashMap; *public* *class* *SparkStream* { *public* *static* *void* main(String args[]) { *if*(args.length != 3) { System.*out*.println(Usage: spark-submit –class com.spark.SparkStream target/SparkStream-with-dependencies.jar zookeeper_ip group_name topic1,topic2,...); System.*exit*(1); } MapString,Integer topicMap = *new* HashMapString,Integer(); String[] topic = args[2].split(,); *for*(String t: topic) { topicMap.put(t, *new* Integer(1)); } JavaStreamingContext jssc = *new* JavaStreamingContext( spark://192.168.88.130:7077, SparkStream, *new* Duration(3000)); JavaPairReceiverInputDStreamString, String messages = *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap ); System.*out*.println(Connection done); JavaDStreamString data = messages.map(*new* *FunctionTuple2String, String, String()* { *public* String call(Tuple2String, String message) { System.*out* .println(NewMessage: +message._2()); //for debugging *return* message._2(); } }); data.print(); jssc.start(); jssc.awaitTermination(); } } I am running the job, and at other terminal I am running kafka-producer to publish messages: #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Hi kafka second message another message But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received: --- Time: 1417107363000 ms --- 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s) 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/27 11:56:06 INFO
RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Hi, The spark master is working, and I have given the same url in the code: [cid:image001.png@01D00D82.6DC2FFF0] The warning is gone, and the new log is: --- Time: 141742785 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25 INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24 INFO [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks --- Time: 1417427853000 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27 INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26 INFO [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks What should be my approach now ? Need urgent help. Regards, Aiman From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, December 01, 2014 3:56 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077http://192.168.88.130:7077/ ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: package com.spark; import scala.Tuple2; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spark.*; import
Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
I see you have no worker machines to execute the job [image: Inline image 1] You haven't configured your spark cluster properly. Quick fix to get it running would be run it on local mode, for that change this line JavaStreamingContext jssc = *new* JavaStreamingContext(spark:// 192.168.88.130:7077, SparkStream, *new* Duration(3000)); to this JavaStreamingContext jssc = *new* JavaStreamingContext(local[4], SparkStream, *new* Duration(3000)); Thanks Best Regards On Mon, Dec 1, 2014 at 4:18 PM, m.sar...@accenture.com wrote: Hi, The spark master is working, and I have given the same url in the code: The warning is gone, and the new log is: --- Time: 141742785 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25 INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24 INFO [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0 received 0 blocks* --- Time: 1417427853000 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27 INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26 INFO [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0 received 0 blocks* What should be my approach now ? Need urgent help. Regards, Aiman *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Monday, December 01, 2014 3:56 PM *To:* Sarosh, M. *Cc:* user@spark.apache.org *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark:// 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.com wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka