I am not at all clear on what you are saying. "Yes , I am printing each messages . It is processing all messages under each dstream block." If it is processing all messages, what is the problem you are having?
"The issue is with Directsream processing 10 message per event. " What distinction are you making between a message and an event? "I am expecting Directsream to process 1 million messages" Your first email said you were publishing 100 messages but only processing 10. Why are you now trying to process 1 million messages without understanding what is going on? Make sure you can process a limited number of messages correctly first. The first code examples you posted to the list had some pretty serious errors (ie only trying to process 1 partition, trying to process offsets that didn't exist). Make sure that is all fixed first. To be clear, I use direct kakfa rdds to process batches with like 4gb of messages per partition, you shouldn't be hitting some kind of limit with 1 million messages per batch. You may of course hit executor resource issues depending on what you're trying to do with each message, but that doesn't sound like the case here. If you want help, either clarify what you are saying, or post a minimal reproducible code example, with expected output vs actual output. On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi < [email protected]> wrote: > Cody, > Yes , I am printing each messages . It is processing all messages > under each dstream block. > > Source systems are publishing 1 Million messages /4 secs which is less > than batch interval. The issue is with Directsream processing 10 message > per event. When partitions were increased to 20 in topic, DirectStream > picksup only 200 messages ( I guess 10 for each partition ) at a time for > processing . I have 16 executors running for streaming ( both yarn > client & cluster mode). > I am expecting Directsream to process 1 million messages which > published in topic < batch interval . > > Using createStream , It could batch 150K messages and process . > createStream is better than Directsream in this case . Again why only > 150K. > > Any clarification is much appreciated on directStream processing > millions per batch . > > > > > Sent from Samsung Mobile. > > > -------- Original message -------- > From: Cody Koeninger <[email protected]> > Date:06/02/2016 01:30 (GMT+05:30) > To: Diwakar Dhanuskodi <[email protected]> > Cc: [email protected] > Subject: Re: Kafka directsream receiving rate > > Have you tried just printing each message, to see which ones are being > processed? > > On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi < > [email protected]> wrote: > >> I am able to see no of messages processed per event in >> sparkstreaming web UI . Also I am counting the messages inside >> foreachRDD . >> Removed the settings for backpressure but still the same . >> >> >> >> >> >> Sent from Samsung Mobile. >> >> >> -------- Original message -------- >> From: Cody Koeninger <[email protected]> >> Date:06/02/2016 00:33 (GMT+05:30) >> To: Diwakar Dhanuskodi <[email protected]> >> Cc: [email protected] >> Subject: Re: Kafka directsream receiving rate >> >> How are you counting the number of messages? >> >> I'd go ahead and remove the settings for backpressure and >> maxrateperpartition, just to eliminate that as a variable. >> >> On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi < >> [email protected]> wrote: >> >>> I am using one directsream. Below is the call to directsream:- >>> >>> val topicSet = topics.split(",").toSet >>> val kafkaParams = Map[String,String]("bootstrap.servers" -> " >>> datanode4.isdp.com:9092") >>> val k = >>> KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, >>> kafkaParams, topicSet) >>> >>> When I replace DirectStream call to createStream, all messages >>> were read by one Dstream block.:- >>> val k = KafkaUtils.createStream(ssc, >>> "datanode4.isdp.com:2181","resp",topicMap >>> ,StorageLevel.MEMORY_ONLY) >>> >>> I am using below spark-submit to execute: >>> ./spark-submit --master yarn-client --conf >>> "spark.dynamicAllocation.enabled=true" --conf >>> "spark.shuffle.service.enabled=true" --conf >>> "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf >>> "spark.sql.unsafe.enabled=false" --conf >>> "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" >>> --conf "spark.shuffle.consolidateFiles=true" --conf >>> "spark.streaming.kafka.maxRatePerPartition=1000000" --driver-memory 2g >>> --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files >>> /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml >>> --jars >>> /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar >>> /root/Jars/sparkreceiver.jar >>> >>> >>> >>> >>> Sent from Samsung Mobile. >>> >>> >>> -------- Original message -------- >>> From: Cody Koeninger <[email protected]> >>> Date:05/02/2016 22:07 (GMT+05:30) >>> To: Diwakar Dhanuskodi <[email protected]> >>> Cc: [email protected] >>> Subject: Re: Kafka directsream receiving rate >>> >>> If you're using the direct stream, you have 0 receivers. Do you mean >>> you have 1 executor? >>> >>> Can you post the relevant call to createDirectStream from your code, as >>> well as any relevant spark configuration? >>> >>> On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi < >>> [email protected]> wrote: >>> >>>> Adding more info >>>> >>>> Batch interval is 2000ms. >>>> I expect all 100 messages go thru one dstream from directsream but >>>> it receives at rate of 10 messages at time. Am I missing some >>>> configurations here. Any help appreciated. >>>> >>>> Regards >>>> Diwakar. >>>> >>>> >>>> Sent from Samsung Mobile. >>>> >>>> >>>> -------- Original message -------- >>>> From: Diwakar Dhanuskodi <[email protected]> >>>> Date:05/02/2016 07:33 (GMT+05:30) >>>> To: [email protected] >>>> Cc: >>>> Subject: Kafka directsream receiving rate >>>> >>>> Hi, >>>> Using spark 1.5.1. >>>> I have a topic with 20 partitions. When I publish 100 messages. Spark >>>> direct stream is receiving 10 messages per dstream. I have only one >>>> receiver . When I used createStream the receiver received entire 100 >>>> messages at once. >>>> >>>> Appreciate any help . >>>> >>>> Regards >>>> Diwakar >>>> >>>> >>>> Sent from Samsung Mobile. >>>> >>> >>> >> >
