This job contains a spark output action, and is what I originally meant:
rdd.mapPartitions { result }.foreach { } This job is just a transformation, and won't do anything unless you have another output action. Not to mention, it will exhaust the iterator, as you noticed: rdd.mapPartitions { result.foreach result } On Thu, Aug 27, 2015 at 2:22 PM, Ahmed Nawar <ahmed.na...@gmail.com> wrote: > Yes, of course, I am doing that. But once i added results.foreach(row=> > {}) i pot empty RDD. > > > > rdd.mapPartitions(partitionOfRecords => { > > DBConnectionInit() > > val results = partitionOfRecords.map(......) > > DBConnection.commit() > > results.foreach(row=> {}) > > results > > }) > > > > On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> You need to return an iterator from the closure you provide to >> mapPartitions >> >> On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ahmed.na...@gmail.com> >> wrote: >> >>> Thanks for foreach idea. But once i used it i got empty rdd. I think >>> because "results" is an iterator. >>> >>> Yes i know "Map is lazy" but i expected there is solution to force >>> action. >>> >>> I can not use foreachPartition because i need reuse the new RDD after >>> some maps. >>> >>> >>> >>> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> >>>> Map is lazy. You need an actual action, or nothing will happen. Use >>>> foreachPartition, or do an empty foreach after the map. >>>> >>>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.na...@gmail.com> >>>> wrote: >>>> >>>>> Dears, >>>>> >>>>> I needs to commit DB Transaction for each partition,Not for each >>>>> row. below didn't work for me. >>>>> >>>>> >>>>> rdd.mapPartitions(partitionOfRecords => { >>>>> >>>>> DBConnectionInit() >>>>> >>>>> val results = partitionOfRecords.map(......) >>>>> >>>>> DBConnection.commit() >>>>> >>>>> >>>>> }) >>>>> >>>>> >>>>> >>>>> Best regards, >>>>> >>>>> Ahmed Atef Nawwar >>>>> >>>>> Data Management & Big Data Consultant >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> Your kafka broker died or you otherwise had a rebalance. >>>>>> >>>>>> Normally spark retries take care of that. >>>>>> >>>>>> Is there something going on with your kafka installation, that >>>>>> rebalance is taking especially long? >>>>>> >>>>>> Yes, increasing backoff / max number of retries will "help", but it's >>>>>> better to figure out what's going on with kafka. >>>>>> >>>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> Hi >>>>>>> >>>>>>> My streaming application gets killed with below error >>>>>>> >>>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: >>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>>>>> kafka.common.NotLeaderForPartitionException, >>>>>>> kafka.common.NotLeaderForPartitionException, >>>>>>> kafka.common.NotLeaderForPartitionException, >>>>>>> kafka.common.NotLeaderForPartitionException, >>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], >>>>>>> [testtopic,193])) >>>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating >>>>>>> jobs for time 1440626120000 ms >>>>>>> org.apache.spark.SparkException: >>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException, >>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>>>> Set([testtopic,115])) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>>>> at >>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>>>> at >>>>>>> >>>>>>> >>>>>>> >>>>>>> Kafka params in job logs printed are : >>>>>>> value.serializer = class >>>>>>> org.apache.kafka.common.serialization.StringSerializer >>>>>>> key.serializer = class >>>>>>> org.apache.kafka.common.serialization.StringSerializer >>>>>>> block.on.buffer.full = true >>>>>>> retry.backoff.ms = 100 >>>>>>> buffer.memory = 1048576 >>>>>>> batch.size = 16384 >>>>>>> metrics.sample.window.ms = 30000 >>>>>>> metadata.max.age.ms = 300000 >>>>>>> receive.buffer.bytes = 32768 >>>>>>> timeout.ms = 30000 >>>>>>> max.in.flight.requests.per.connection = 5 >>>>>>> bootstrap.servers = [broker1:9092, broker2:9092, >>>>>>> broker3:9092] >>>>>>> metric.reporters = [] >>>>>>> client.id = >>>>>>> compression.type = none >>>>>>> retries = 0 >>>>>>> max.request.size = 1048576 >>>>>>> send.buffer.bytes = 131072 >>>>>>> acks = all >>>>>>> reconnect.backoff.ms = 10 >>>>>>> linger.ms = 0 >>>>>>> metrics.num.samples = 2 >>>>>>> metadata.fetch.timeout.ms = 60000 >>>>>>> >>>>>>> >>>>>>> Is it kafka broker getting down and job is getting killed ? Whats >>>>>>> the best way to handle it ? >>>>>>> Increasing retries and backoff time wil help and to what values >>>>>>> those should be set to never have streaming application failure - >>>>>>> rather it >>>>>>> keep on retrying after few seconds and send a event so that my custom >>>>>>> code >>>>>>> can send notification of kafka broker down if its because of that. >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >