OK, seems there’s nothing strange from your code. So maybe we need to narrow 
down the cause, would you please run KafkaWordCount example in Spark to see if 
it is OK, if this is OK, then we should focus on your implementation, otherwise 
Kafka potentially has some problems.

Thanks
Jerry

From: James King [mailto:jakwebin...@gmail.com]
Sent: Wednesday, April 1, 2015 6:59 PM
To: Saisai Shao
Cc: bit1...@163.com; user
Subject: Re: Spark + Kafka

This is the  code. And I couldn't find anything like the log you suggested.

public KafkaLogConsumer(int duration, String master) {
                        JavaStreamingContext spark = 
createSparkContext(duration, master);

                        Map<String, Integer> topics = new HashMap<String, 
Integer>();
                        topics.put("test", 1);

                        JavaPairDStream<String, String> input = 
KafkaUtils.createStream(spark, "somesparkhost:2181", "groupid", topics);
                        input.print();

                        spark.start();
                        spark.awaitTermination();
            }

            private JavaStreamingContext createSparkContext(int duration, 
String master) {

                        SparkConf sparkConf = new SparkConf()
                                                                                
                            .setAppName(this.getClass().getSimpleName())
                                                                                
                            .setMaster(master);
                        JavaStreamingContext ssc = new 
JavaStreamingContext(sparkConf,
                                                                                
                                                    
Durations.seconds(duration));
                        return ssc;
            }

On Wed, Apr 1, 2015 at 11:37 AM, James King 
<jakwebin...@gmail.com<mailto:jakwebin...@gmail.com>> wrote:
Thanks Saisai,

Sure will do.

But just a quick note that when i set master as "local[*]" Spark started 
showing Kafka messages as expected, so the problem in my view was to do with 
not enough threads to process the incoming data.

Thanks.


On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao 
<sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote:
Would you please share your code snippet please, so we can identify is there 
anything wrong in your code.

Beside would you please grep your driver's debug log to see if there's any 
debug log about "Stream xxx received block xxx", this means that Spark 
Streaming is keeping receiving data from sources like Kafka.


2015-04-01 16:18 GMT+08:00 James King 
<jakwebin...@gmail.com<mailto:jakwebin...@gmail.com>>:
Thank you bit1129,

From looking at the web UI i can see 2 cores

Also looking at http://spark.apache.org/docs/1.2.1/configuration.html

But can't see obvious configuration for number of receivers can you help please.


On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com<mailto:bit1...@163.com> 
<bit1...@163.com<mailto:bit1...@163.com>> wrote:
Please make sure that you have given more cores than Receiver numbers.




From: James King<mailto:jakwebin...@gmail.com>
Date: 2015-04-01 15:21
To: user<mailto:user@spark.apache.org>
Subject: Spark + Kafka
I have a simple setup/runtime of Kafka and Sprak.

I have a command line consumer displaying arrivals to Kafka topic. So i know 
messages are being received.

But when I try to read from Kafka topic I get no messages, here are some logs 
below.

I'm thinking there aren't enough threads. How do i check that.

Thank you.

2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job 
1427871410000 ms.0 from job set of time 1427871410000 ms
2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job 
1427871410000 ms.0 from job set of time 1427871410000 ms
2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for time 
1427871410000 ms (execution: 0.000 s)
2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event 
ClearMetadata(1427871410000 ms)
2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 
1427871410000 ms
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: 
[]
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older 
than 1427871405000 ms:
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old 
RDDs: [1427871405000 ms -> 8]
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8
2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence list
2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message 
RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message 
RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message 
(0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8
2015-04-01 08:56:50 INFO  BlockManager:59 - Removing RDD 8
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD 8, 
response is 0
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled message 
(0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
2015-04-01 08:56:50 INFO  KafkaInputDStream:59 - Removing blocks of RDD 
BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time 1427871410000 ms
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that were older 
than 1427871405000 ms: 1427871405000 ms
2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for time 
1427871410000 ms
2015-04-01 08:56:50 INFO  ReceivedBlockTracker:59 - Deleting batches 
ArrayBuffer(1427871400000 ms)
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 to 
Actor[akka://sparkDriver/temp/$o]
2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, 
runningTasks: 0
2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (0.499181 ms) ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:50 WARN  TaskSchedulerImpl:71 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, 
runningTasks: 0
2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (0.886121 ms) ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message 
ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) from 
Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated: 
app-20150401065621-0007/0 is now EXITED (Command exited with code 1)
2015-04-01 08:56:52 INFO  SparkDeploySchedulerBackend:59 - Executor 
app-20150401065621-0007/0 removed: Command exited with code 1
2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message RemoveExecutor(0,Unknown executor exit code (1)) from 
Actor[akka://sparkDriver/temp/$p]
2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to remove 
non-existent executor 0
2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code (1)) from 
Actor[akka://sparkDriver/temp/$p]
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message 
(6.653705 ms) ExecutorUpdated(0,EXITED,Some(Command exited with code 
1),Some(1)) from 
Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message 
ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512)
 from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor added: 
app-20150401065621-0007/1 on worker-20150331133159-somesparkhost-49556 
(somesparkhost:49556) with 2 cores
2015-04-01 08:56:52 INFO  SparkDeploySchedulerBackend:59 - Granted executor ID 
app-20150401065621-0007/1 on hostPort somesparkhost:49556 with 2 cores, 512.0 
MB RAM
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message 
(1.119731 ms) 
ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512)
 from Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message 
ExecutorUpdated(1,LOADING,None,None) from 
Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated: 
app-20150401065621-0007/1 is now LOADING
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message 
(0.516301 ms) ExecutorUpdated(1,LOADING,None,None) from 
Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:52 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, 
runningTasks: 0
2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (0.652891 ms) ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received message 
ExecutorUpdated(1,RUNNING,None,None) from 
Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated: 
app-20150401065621-0007/1 is now RUNNING
2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled message 
(0.381614 ms) ExecutorUpdated(1,RUNNING,None,None) from 
Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:53 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, 
runningTasks: 0
2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (0.417759 ms) ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:50 - [actor] received 
message ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:54 DEBUG TaskSchedulerImpl:63 - parentName: , name: TaskSet_0, 
runningTasks: 0
2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:56 - [actor] handled 
message (1.426392 ms) ReviveOffers from 
Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
2015-04-01 08:56:55 DEBUG RecurringTimer:63 - Callback for JobGenerator called 
at time 1427871415000
2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event 
GenerateJobs(1427871415000 ms)
2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generating jobs for time 
1427871415000 ms
2015-04-01 08:56:55 DEBUG KafkaInputDStream:63 - Time 1427871415000 ms is valid
2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generated 1 jobs for time 
1427871415000 ms
2015-04-01 08:56:55 INFO  JobScheduler:59 - Added jobs for time 1427871415000 ms
2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event 
DoCheckpoint(1427871415000 ms)
-------------------------------------------
Time: 1427871415000 ms
-------------------------------------------





Reply via email to