OK, seems there’s nothing strange from your code. So maybe we need to narrow down the cause, would you please run KafkaWordCount example in Spark to see if it is OK, if this is OK, then we should focus on your implementation, otherwise Kafka potentially has some problems.
Thanks Jerry From: James King [mailto:jakwebin...@gmail.com] Sent: Wednesday, April 1, 2015 6:59 PM To: Saisai Shao Cc: bit1...@163.com; user Subject: Re: Spark + Kafka This is the code. And I couldn't find anything like the log you suggested. public KafkaLogConsumer(int duration, String master) { JavaStreamingContext spark = createSparkContext(duration, master); Map<String, Integer> topics = new HashMap<String, Integer>(); topics.put("test", 1); JavaPairDStream<String, String> input = KafkaUtils.createStream(spark, "somesparkhost:2181", "groupid", topics); input.print(); spark.start(); spark.awaitTermination(); } private JavaStreamingContext createSparkContext(int duration, String master) { SparkConf sparkConf = new SparkConf() .setAppName(this.getClass().getSimpleName()) .setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(duration)); return ssc; } On Wed, Apr 1, 2015 at 11:37 AM, James King <jakwebin...@gmail.com<mailto:jakwebin...@gmail.com>> wrote: Thanks Saisai, Sure will do. But just a quick note that when i set master as "local[*]" Spark started showing Kafka messages as expected, so the problem in my view was to do with not enough threads to process the incoming data. Thanks. On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao <sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote: Would you please share your code snippet please, so we can identify is there anything wrong in your code. Beside would you please grep your driver's debug log to see if there's any debug log about "Stream xxx received block xxx", this means that Spark Streaming is keeping receiving data from sources like Kafka. 2015-04-01 16:18 GMT+08:00 James King <jakwebin...@gmail.com<mailto:jakwebin...@gmail.com>>: Thank you bit1129, From looking at the web UI i can see 2 cores Also looking at http://spark.apache.org/docs/1.2.1/configuration.html But can't see obvious configuration for number of receivers can you help please. On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com<mailto:bit1...@163.com> <bit1...@163.com<mailto:bit1...@163.com>> wrote: Please make sure that you have given more cores than Receiver numbers. From: James King<mailto:jakwebin...@gmail.com> Date: 2015-04-01 15:21 To: user<mailto:user@spark.apache.org> Subject: Spark + Kafka I have a simple setup/runtime of Kafka and Sprak. I have a command line consumer displaying arrivals to Kafka topic. So i know messages are being received. But when I try to read from Kafka topic I get no messages, here are some logs below. I'm thinking there aren't enough threads. How do i check that. Thank you. 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job 1427871410000 ms.0 from job set of time 1427871410000 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job 1427871410000 ms.0 from job set of time 1427871410000 ms 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for time 1427871410000 ms (execution: 0.000 s) 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event ClearMetadata(1427871410000 ms) 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 1427871410000 ms 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: [] 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older than 1427871405000 ms: 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old RDDs: [1427871405000 ms -> 8] 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence list 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, response is 0 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 1427871410000 ms 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older than 1427871405000 ms: 1427871405000 ms 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 1427871410000 ms 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches ArrayBuffer(1427871400000 ms) 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to Actor[akka://sparkDriver/temp/$o] 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.499181 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.886121 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: app-20150401065621-0007/0 is now EXITED (Command exited with code 1) 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Executor app-20150401065621-0007/0 removed: Command exited with code 1 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to remove non-existent executor 0 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code (1)) from Actor[akka://sparkDriver/temp/$p] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message (6.653705 ms) ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor added: app-20150401065621-0007/1 on worker-20150331133159-somesparkhost-49556 (somesparkhost:49556) with 2 cores 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Granted executor ID app-20150401065621-0007/1 on hostPort somesparkhost:49556 with 2 cores, 512.0 MB RAM 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message (1.119731 ms) ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(1,LOADING,None,None) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: app-20150401065621-0007/1 is now LOADING 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message (0.516301 ms) ExecutorUpdated(1,LOADING,None,None) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.652891 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message ExecutorUpdated(1,RUNNING,None,None) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: app-20150401065621-0007/1 is now RUNNING 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message (0.381614 ms) ExecutorUpdated(1,RUNNING,None,None) from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] 2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:53 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (0.417759 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:50 - [actor] received message ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:54 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, runningTasks: 0 2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled message (1.426392 ms) ReviveOffers from Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] 2015-04-01 08:56:55 DEBUG RecurringTimer:63 - Callback for JobGenerator called at time 1427871415000 2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event GenerateJobs(1427871415000 ms) 2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generating jobs for time 1427871415000 ms 2015-04-01 08:56:55 DEBUG KafkaInputDStream:63 - Time 1427871415000 ms is valid 2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generated 1 jobs for time 1427871415000 ms 2015-04-01 08:56:55 INFO JobScheduler:59 - Added jobs for time 1427871415000 ms 2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event DoCheckpoint(1427871415000 ms) ------------------------------------------- Time: 1427871415000 ms -------------------------------------------