Thanks Cody for trying to understand the issue . Sorry if I am not clear . The scenario is to process all messages at once in single dstream block when source system publishes messages .Source system will publish x messages / 10 minutes once.
By events I meant that total no of messages processed by each batch interval ( in my case 2000ms) by executor ( web UI shows each block processing as events) DirectStream is processing only 10 messages per batch. It is same if 100 or 1 million messages published. xyz topic having 20 partitions. I am using kafka producer api to publish messages. Below is the code that I am using { val topics = "xyz" val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) k. foreachRDD { rdd => val dstreamToRDD =rdd.cache () println (current time & dtreamToRDD.partitions.length.) val accTran = dstream To RDD. filter { ...} accTran.map {...} } ssc.start () ssc.awaitTermination } } I tried using DirectStream with map&partition which I had issue with offsetRange . After your suggestion offset issue is resolved when I used above DirectStream code with topic only. spark-submit setting that I am using is in the mail chain below . Is there any bottlebeck I am hitting to process maximum messages at one batch interval using directsream rdd? . If this is not clear . I would take this offline and explain scenario briefly. Sent from Samsung Mobile. <div>-------- Original message --------</div><div>From: Cody Koeninger <c...@koeninger.org> </div><div>Date:06/02/2016 22:32 (GMT+05:30) </div><div>To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> </div><div>Cc: user@spark.apache.org </div><div>Subject: Re: Kafka directsream receiving rate </div><div> </div>I am not at all clear on what you are saying. "Yes , I am printing each messages . It is processing all messages under each dstream block." If it is processing all messages, what is the problem you are having? "The issue is with Directsream processing 10 message per event. " What distinction are you making between a message and an event? "I am expecting Directsream to process 1 million messages" Your first email said you were publishing 100 messages but only processing 10. Why are you now trying to process 1 million messages without understanding what is going on? Make sure you can process a limited number of messages correctly first. The first code examples you posted to the list had some pretty serious errors (ie only trying to process 1 partition, trying to process offsets that didn't exist). Make sure that is all fixed first. To be clear, I use direct kakfa rdds to process batches with like 4gb of messages per partition, you shouldn't be hitting some kind of limit with 1 million messages per batch. You may of course hit executor resource issues depending on what you're trying to do with each message, but that doesn't sound like the case here. If you want help, either clarify what you are saying, or post a minimal reproducible code example, with expected output vs actual output. On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Cody, Yes , I am printing each messages . It is processing all messages under each dstream block. Source systems are publishing 1 Million messages /4 secs which is less than batch interval. The issue is with Directsream processing 10 message per event. When partitions were increased to 20 in topic, DirectStream picksup only 200 messages ( I guess 10 for each partition ) at a time for processing . I have 16 executors running for streaming ( both yarn client & cluster mode). I am expecting Directsream to process 1 million messages which published in topic < batch interval . Using createStream , It could batch 150K messages and process . createStream is better than Directsream in this case . Again why only 150K. Any clarification is much appreciated on directStream processing millions per batch . Sent from Samsung Mobile. -------- Original message -------- From: Cody Koeninger <c...@koeninger.org> Date:06/02/2016 01:30 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate Have you tried just printing each message, to see which ones are being processed? On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: I am able to see no of messages processed per event in sparkstreaming web UI . Also I am counting the messages inside foreachRDD . Removed the settings for backpressure but still the same . Sent from Samsung Mobile. -------- Original message -------- From: Cody Koeninger <c...@koeninger.org> Date:06/02/2016 00:33 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate How are you counting the number of messages? I'd go ahead and remove the settings for backpressure and maxrateperpartition, just to eliminate that as a variable. On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: I am using one directsream. Below is the call to directsream:- val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) When I replace DirectStream call to createStream, all messages were read by one Dstream block.:- val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap ,StorageLevel.MEMORY_ONLY) I am using below spark-submit to execute: ./spark-submit --master yarn-client --conf "spark.dynamicAllocation.enabled=true" --conf "spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" --conf "spark.shuffle.consolidateFiles=true" --conf "spark.streaming.kafka.maxRatePerPartition=1000000" --driver-memory 2g --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml --jars /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar /root/Jars/sparkreceiver.jar Sent from Samsung Mobile. -------- Original message -------- From: Cody Koeninger <c...@koeninger.org> Date:05/02/2016 22:07 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate If you're using the direct stream, you have 0 receivers. Do you mean you have 1 executor? Can you post the relevant call to createDirectStream from your code, as well as any relevant spark configuration? On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Adding more info Batch interval is 2000ms. I expect all 100 messages go thru one dstream from directsream but it receives at rate of 10 messages at time. Am I missing some configurations here. Any help appreciated. Regards Diwakar. Sent from Samsung Mobile. -------- Original message -------- From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Date:05/02/2016 07:33 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Kafka directsream receiving rate Hi, Using spark 1.5.1. I have a topic with 20 partitions. When I publish 100 messages. Spark direct stream is receiving 10 messages per dstream. I have only one receiver . When I used createStream the receiver received entire 100 messages at once. Appreciate any help . Regards Diwakar Sent from Samsung Mobile.