Dears, I needs to commit DB Transaction for each partition,Not for each row. below didn't work for me.
rdd.mapPartitions(partitionOfRecords => { DBConnectionInit() val results = partitionOfRecords.map(......) DBConnection.commit() }) Best regards, Ahmed Atef Nawwar Data Management & Big Data Consultant On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org> wrote: > Your kafka broker died or you otherwise had a rebalance. > > Normally spark retries take care of that. > > Is there something going on with your kafka installation, that rebalance > is taking especially long? > > Yes, increasing backoff / max number of retries will "help", but it's > better to figure out what's going on with kafka. > > On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <shushantaror...@gmail.com > > wrote: > >> Hi >> >> My streaming application gets killed with below error >> >> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> kafka.common.NotLeaderForPartitionException, >> org.apache.spark.SparkException: Couldn't find leader offsets for >> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >> [testtopic,193])) >> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for >> time 1440626120000 ms >> org.apache.spark.SparkException: >> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >> org.apache.spark.SparkException: Couldn't find leader offsets for >> Set([testtopic,115])) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >> at >> >> >> >> Kafka params in job logs printed are : >> value.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> key.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> block.on.buffer.full = true >> retry.backoff.ms = 100 >> buffer.memory = 1048576 >> batch.size = 16384 >> metrics.sample.window.ms = 30000 >> metadata.max.age.ms = 300000 >> receive.buffer.bytes = 32768 >> timeout.ms = 30000 >> max.in.flight.requests.per.connection = 5 >> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] >> metric.reporters = [] >> client.id = >> compression.type = none >> retries = 0 >> max.request.size = 1048576 >> send.buffer.bytes = 131072 >> acks = all >> reconnect.backoff.ms = 10 >> linger.ms = 0 >> metrics.num.samples = 2 >> metadata.fetch.timeout.ms = 60000 >> >> >> Is it kafka broker getting down and job is getting killed ? Whats the >> best way to handle it ? >> Increasing retries and backoff time wil help and to what values those >> should be set to never have streaming application failure - rather it keep >> on retrying after few seconds and send a event so that my custom code can >> send notification of kafka broker down if its because of that. >> >> >> Thanks >> >> >