Hi My streaming application gets killed with below error
5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: ArrayBuffer(kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], [testtopic,193])) 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for time 1440626120000 ms org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,115])) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at Kafka params in job logs printed are : value.serializer = class org.apache.kafka.common.serialization.StringSerializer key.serializer = class org.apache.kafka.common.serialization.StringSerializer block.on.buffer.full = true retry.backoff.ms = 100 buffer.memory = 1048576 batch.size = 16384 metrics.sample.window.ms = 30000 metadata.max.age.ms = 300000 receive.buffer.bytes = 32768 timeout.ms = 30000 max.in.flight.requests.per.connection = 5 bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] metric.reporters = [] client.id = compression.type = none retries = 0 max.request.size = 1048576 send.buffer.bytes = 131072 acks = all reconnect.backoff.ms = 10 linger.ms = 0 metrics.num.samples = 2 metadata.fetch.timeout.ms = 60000 Is it kafka broker getting down and job is getting killed ? Whats the best way to handle it ? Increasing retries and backoff time wil help and to what values those should be set to never have streaming application failure - rather it keep on retrying after few seconds and send a event so that my custom code can send notification of kafka broker down if its because of that. Thanks