Thanks a lot for your support. It is working now. I wrote it like below
val newRDD = rdd.mapPartitions { partition => { val result = partition.map(.....) result } } newRDD.foreach { } On Thu, Aug 27, 2015 at 10:34 PM, Cody Koeninger <c...@koeninger.org> wrote: > This job contains a spark output action, and is what I originally meant: > > > rdd.mapPartitions { > result > }.foreach { > > } > > This job is just a transformation, and won't do anything unless you have > another output action. Not to mention, it will exhaust the iterator, as > you noticed: > > rdd.mapPartitions { > result.foreach > result > } > > > > On Thu, Aug 27, 2015 at 2:22 PM, Ahmed Nawar <ahmed.na...@gmail.com> > wrote: > >> Yes, of course, I am doing that. But once i added results.foreach(row=> >> {}) i pot empty RDD. >> >> >> >> rdd.mapPartitions(partitionOfRecords => { >> >> DBConnectionInit() >> >> val results = partitionOfRecords.map(......) >> >> DBConnection.commit() >> >> results.foreach(row=> {}) >> >> results >> >> }) >> >> >> >> On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> You need to return an iterator from the closure you provide to >>> mapPartitions >>> >>> On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ahmed.na...@gmail.com> >>> wrote: >>> >>>> Thanks for foreach idea. But once i used it i got empty rdd. I think >>>> because "results" is an iterator. >>>> >>>> Yes i know "Map is lazy" but i expected there is solution to force >>>> action. >>>> >>>> I can not use foreachPartition because i need reuse the new RDD after >>>> some maps. >>>> >>>> >>>> >>>> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> >>>>> Map is lazy. You need an actual action, or nothing will happen. Use >>>>> foreachPartition, or do an empty foreach after the map. >>>>> >>>>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.na...@gmail.com> >>>>> wrote: >>>>> >>>>>> Dears, >>>>>> >>>>>> I needs to commit DB Transaction for each partition,Not for each >>>>>> row. below didn't work for me. >>>>>> >>>>>> >>>>>> rdd.mapPartitions(partitionOfRecords => { >>>>>> >>>>>> DBConnectionInit() >>>>>> >>>>>> val results = partitionOfRecords.map(......) >>>>>> >>>>>> DBConnection.commit() >>>>>> >>>>>> >>>>>> }) >>>>>> >>>>>> >>>>>> >>>>>> Best regards, >>>>>> >>>>>> Ahmed Atef Nawwar >>>>>> >>>>>> Data Management & Big Data Consultant >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org> >>>>>> wrote: >>>>>> >>>>>>> Your kafka broker died or you otherwise had a rebalance. >>>>>>> >>>>>>> Normally spark retries take care of that. >>>>>>> >>>>>>> Is there something going on with your kafka installation, that >>>>>>> rebalance is taking especially long? >>>>>>> >>>>>>> Yes, increasing backoff / max number of retries will "help", but >>>>>>> it's better to figure out what's going on with kafka. >>>>>>> >>>>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora < >>>>>>> shushantaror...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi >>>>>>>> >>>>>>>> My streaming application gets killed with below error >>>>>>>> >>>>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>>>>>> kafka.common.NotLeaderForPartitionException, >>>>>>>> kafka.common.NotLeaderForPartitionException, >>>>>>>> kafka.common.NotLeaderForPartitionException, >>>>>>>> kafka.common.NotLeaderForPartitionException, >>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >>>>>>>> [testtopic,193])) >>>>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating >>>>>>>> jobs for time 1440626120000 ms >>>>>>>> org.apache.spark.SparkException: >>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>>>>> Set([testtopic,115])) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>>>>> at >>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>>>>> at >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Kafka params in job logs printed are : >>>>>>>> value.serializer = class >>>>>>>> org.apache.kafka.common.serialization.StringSerializer >>>>>>>> key.serializer = class >>>>>>>> org.apache.kafka.common.serialization.StringSerializer >>>>>>>> block.on.buffer.full = true >>>>>>>> retry.backoff.ms = 100 >>>>>>>> buffer.memory = 1048576 >>>>>>>> batch.size = 16384 >>>>>>>> metrics.sample.window.ms = 30000 >>>>>>>> metadata.max.age.ms = 300000 >>>>>>>> receive.buffer.bytes = 32768 >>>>>>>> timeout.ms = 30000 >>>>>>>> max.in.flight.requests.per.connection = 5 >>>>>>>> bootstrap.servers = [broker1:9092, broker2:9092, >>>>>>>> broker3:9092] >>>>>>>> metric.reporters = [] >>>>>>>> client.id = >>>>>>>> compression.type = none >>>>>>>> retries = 0 >>>>>>>> max.request.size = 1048576 >>>>>>>> send.buffer.bytes = 131072 >>>>>>>> acks = all >>>>>>>> reconnect.backoff.ms = 10 >>>>>>>> linger.ms = 0 >>>>>>>> metrics.num.samples = 2 >>>>>>>> metadata.fetch.timeout.ms = 60000 >>>>>>>> >>>>>>>> >>>>>>>> Is it kafka broker getting down and job is getting killed ? Whats >>>>>>>> the best way to handle it ? >>>>>>>> Increasing retries and backoff time wil help and to what values >>>>>>>> those should be set to never have streaming application failure - >>>>>>>> rather it >>>>>>>> keep on retrying after few seconds and send a event so that my custom >>>>>>>> code >>>>>>>> can send notification of kafka broker down if its because of that. >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >