You need to return an iterator from the closure you provide to mapPartitions

On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <> wrote:

> Thanks for foreach idea. But once i used it i got empty rdd. I think
> because "results" is an iterator.
> Yes i know "Map is lazy" but i expected there is solution to force action.
> I can not use foreachPartition because i need reuse the new RDD after some
> maps.
> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <>
> wrote:
>> Map is lazy.  You need an actual action, or nothing will happen.  Use
>> foreachPartition, or do an empty foreach after the map.
>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <>
>> wrote:
>>> Dears,
>>>     I needs to commit DB Transaction for each partition,Not for each
>>> row. below didn't work for me.
>>> rdd.mapPartitions(partitionOfRecords => {
>>> DBConnectionInit()
>>> val results =
>>> DBConnection.commit()
>>> })
>>> Best regards,
>>> Ahmed Atef Nawwar
>>> Data Management & Big Data Consultant
>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <>
>>> wrote:
>>>> Your kafka broker died or you otherwise had a rebalance.
>>>> Normally spark retries take care of that.
>>>> Is there something going on with your kafka installation, that
>>>> rebalance is taking especially long?
>>>> Yes, increasing backoff / max number of retries will "help", but it's
>>>> better to figure out what's going on with kafka.
>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>>>> wrote:
>>>>> Hi
>>>>> My streaming application gets killed with below error
>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>> kafka.common.NotLeaderForPartitionException,
>>>>> kafka.common.NotLeaderForPartitionException,
>>>>> kafka.common.NotLeaderForPartitionException,
>>>>> kafka.common.NotLeaderForPartitionException,
>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>>> [testtopic,193]))
>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>>>> for time 1440626120000 ms
>>>>> org.apache.spark.SparkException:
>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>> Set([testtopic,115]))
>>>>> at
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>> at
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>> at
>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>> at
>>>>> Kafka params in job logs printed are :
>>>>>  value.serializer = class
>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>         key.serializer = class
>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>         block.on.buffer.full = true
>>>>> = 100
>>>>>         buffer.memory = 1048576
>>>>>         batch.size = 16384
>>>>> = 30000
>>>>> = 300000
>>>>>         receive.buffer.bytes = 32768
>>>>> = 30000
>>>>> = 5
>>>>>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>>>>         metric.reporters = []
>>>>> =
>>>>>         compression.type = none
>>>>>         retries = 0
>>>>>         max.request.size = 1048576
>>>>>         send.buffer.bytes = 131072
>>>>>         acks = all
>>>>> = 10
>>>>> = 0
>>>>>         metrics.num.samples = 2
>>>>> = 60000
>>>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>>>> best way to handle it ?
>>>>> Increasing retries and backoff time  wil help and to what values those
>>>>> should be set to never have streaming application failure - rather it keep
>>>>> on retrying after few seconds and send a event so that my custom code can
>>>>> send notification of kafka broker down if its because of that.
>>>>> Thanks

Reply via email to