Thanks Supreeth and Shahbaz. I will try adding spark.streaming.kafka.maxRatePerPartition.
Hi Shahbaz, Please see comments, inline: - Which version of Spark you are using. ==> *1.5.2* - How big is the Kafka Cluster ==> *2 brokers* - What is the Message Size and type.==> *String, 9,550 bytes (around) * - How big is the spark cluster (How many executors ,How many cores Per Executor)==>* 2 Nodes, 16 executors, 1 core per executor* - What does your Spark Job looks like ==> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2) val parsedStream = inputStream .map(line => { val splitLines = line.split(",") (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong))) }) val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { prev.map(_ +: current).orElse(Some(current)) .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) }) state.checkpoint(Duration(25000)) state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase ssc } spark.streaming.backpressure.enabled set it to true and try? ==> *yes, i had enabled it.* Regards, ~Vinti On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz <shahzadh...@gmail.com> wrote: > Hello, > > - Which version of Spark you are using. > - How big is the Kafka Cluster > - What is the Message Size and type. > - How big is the spark cluster (How many executors ,How many cores Per > Executor) > - What does your Spark Job looks like . > > spark.streaming.backpressure.enabled set it to true and try? > > > Regards, > Shahbaz > +91-9986850670 > > On Sun, Mar 6, 2016 at 12:19 PM, Supreeth <supreeth....@gmail.com> wrote: > >> Try setting spark.streaming.kafka.maxRatePerPartition, this can help >> control the number of messages read from Kafka per partition on the spark >> streaming consumer. >> >> -S >> >> >> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari <vinti.u...@gmail.com> >> wrote: >> >> Hello, >> >> I am trying to figure out why my kafka+spark job is running slow. I found >> that spark is consuming all the messages out of kafka into a single batch >> itself and not sending any messages to the other batches. >> >> 2016/03/05 21:57:05 >> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243825000> >> 0 events - - queued 2016/03/05 21:57:00 >> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243820000> >> 0 events - - queued 2016/03/05 21:56:55 >> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243815000> >> 0 events - - queued 2016/03/05 21:56:50 >> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243810000> >> 0 events - - queued 2016/03/05 21:56:45 >> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243805000> >> 0 events - - queued 2016/03/05 21:56:40 >> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243800000> >> 4039573 events 6 ms - processing >> >> Does anyone know how this behavior can be changed so that the number of >> messages are load balanced across all the batches? >> >> Thanks, >> Vinti >> >> >