Thanks for foreach idea. But once i used it i got empty rdd. I think because "results" is an iterator.
Yes i know "Map is lazy" but i expected there is solution to force action. I can not use foreachPartition because i need reuse the new RDD after some maps. On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <c...@koeninger.org> wrote: > > Map is lazy. You need an actual action, or nothing will happen. Use > foreachPartition, or do an empty foreach after the map. > > On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.na...@gmail.com> > wrote: > >> Dears, >> >> I needs to commit DB Transaction for each partition,Not for each row. >> below didn't work for me. >> >> >> rdd.mapPartitions(partitionOfRecords => { >> >> DBConnectionInit() >> >> val results = partitionOfRecords.map(......) >> >> DBConnection.commit() >> >> >> }) >> >> >> >> Best regards, >> >> Ahmed Atef Nawwar >> >> Data Management & Big Data Consultant >> >> >> >> >> >> >> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Your kafka broker died or you otherwise had a rebalance. >>> >>> Normally spark retries take care of that. >>> >>> Is there something going on with your kafka installation, that rebalance >>> is taking especially long? >>> >>> Yes, increasing backoff / max number of retries will "help", but it's >>> better to figure out what's going on with kafka. >>> >>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Hi >>>> >>>> My streaming application gets killed with below error >>>> >>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>> kafka.common.NotLeaderForPartitionException, >>>> kafka.common.NotLeaderForPartitionException, >>>> kafka.common.NotLeaderForPartitionException, >>>> kafka.common.NotLeaderForPartitionException, >>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >>>> [testtopic,193])) >>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs >>>> for time 1440626120000 ms >>>> org.apache.spark.SparkException: >>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>> Set([testtopic,115])) >>>> at >>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>> at >>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>> at >>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>> at >>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>> at >>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>> at >>>> >>>> >>>> >>>> Kafka params in job logs printed are : >>>> value.serializer = class >>>> org.apache.kafka.common.serialization.StringSerializer >>>> key.serializer = class >>>> org.apache.kafka.common.serialization.StringSerializer >>>> block.on.buffer.full = true >>>> retry.backoff.ms = 100 >>>> buffer.memory = 1048576 >>>> batch.size = 16384 >>>> metrics.sample.window.ms = 30000 >>>> metadata.max.age.ms = 300000 >>>> receive.buffer.bytes = 32768 >>>> timeout.ms = 30000 >>>> max.in.flight.requests.per.connection = 5 >>>> bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092] >>>> metric.reporters = [] >>>> client.id = >>>> compression.type = none >>>> retries = 0 >>>> max.request.size = 1048576 >>>> send.buffer.bytes = 131072 >>>> acks = all >>>> reconnect.backoff.ms = 10 >>>> linger.ms = 0 >>>> metrics.num.samples = 2 >>>> metadata.fetch.timeout.ms = 60000 >>>> >>>> >>>> Is it kafka broker getting down and job is getting killed ? Whats the >>>> best way to handle it ? >>>> Increasing retries and backoff time wil help and to what values those >>>> should be set to never have streaming application failure - rather it keep >>>> on retrying after few seconds and send a event so that my custom code can >>>> send notification of kafka broker down if its because of that. >>>> >>>> >>>> Thanks >>>> >>>> >>> >> >