This job contains a spark output action, and is what I originally meant:

rdd.mapPartitions {
  result
}.foreach {

}

This job is just a transformation, and won't do anything unless you have
another output action.  Not to mention, it will exhaust the iterator, as
you noticed:

rdd.mapPartitions {
  result.foreach
  result
}



On Thu, Aug 27, 2015 at 2:22 PM, Ahmed Nawar <ahmed.na...@gmail.com> wrote:

> Yes, of course, I am doing that. But once i added results.foreach(row=>
> {})   i pot empty RDD.
>
>
>
> rdd.mapPartitions(partitionOfRecords => {
>
> DBConnectionInit()
>
> val results = partitionOfRecords.map(......)
>
> DBConnection.commit()
>
> results.foreach(row=> {})
>
> results
>
> })
>
>
>
> On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> You need to return an iterator from the closure you provide to
>> mapPartitions
>>
>> On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ahmed.na...@gmail.com>
>> wrote:
>>
>>> Thanks for foreach idea. But once i used it i got empty rdd. I think
>>> because "results" is an iterator.
>>>
>>> Yes i know "Map is lazy" but i expected there is solution to force
>>> action.
>>>
>>> I can not use foreachPartition because i need reuse the new RDD after
>>> some maps.
>>>
>>>
>>>
>>> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>>
>>>> Map is lazy.  You need an actual action, or nothing will happen.  Use
>>>> foreachPartition, or do an empty foreach after the map.
>>>>
>>>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.na...@gmail.com>
>>>> wrote:
>>>>
>>>>> Dears,
>>>>>
>>>>>     I needs to commit DB Transaction for each partition,Not for each
>>>>> row. below didn't work for me.
>>>>>
>>>>>
>>>>> rdd.mapPartitions(partitionOfRecords => {
>>>>>
>>>>> DBConnectionInit()
>>>>>
>>>>> val results = partitionOfRecords.map(......)
>>>>>
>>>>> DBConnection.commit()
>>>>>
>>>>>
>>>>> })
>>>>>
>>>>>
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Ahmed Atef Nawwar
>>>>>
>>>>> Data Management & Big Data Consultant
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Your kafka broker died or you otherwise had a rebalance.
>>>>>>
>>>>>> Normally spark retries take care of that.
>>>>>>
>>>>>> Is there something going on with your kafka installation, that
>>>>>> rebalance is taking especially long?
>>>>>>
>>>>>> Yes, increasing backoff / max number of retries will "help", but it's
>>>>>> better to figure out what's going on with kafka.
>>>>>>
>>>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> My streaming application gets killed with below error
>>>>>>>
>>>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>>>>> [testtopic,193]))
>>>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating
>>>>>>> jobs for time 1440626120000 ms
>>>>>>> org.apache.spark.SparkException:
>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>> Set([testtopic,115]))
>>>>>>> at
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>>>> at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Kafka params in job logs printed are :
>>>>>>>  value.serializer = class
>>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>>         key.serializer = class
>>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>>         block.on.buffer.full = true
>>>>>>>         retry.backoff.ms = 100
>>>>>>>         buffer.memory = 1048576
>>>>>>>         batch.size = 16384
>>>>>>>         metrics.sample.window.ms = 30000
>>>>>>>         metadata.max.age.ms = 300000
>>>>>>>         receive.buffer.bytes = 32768
>>>>>>>         timeout.ms = 30000
>>>>>>>         max.in.flight.requests.per.connection = 5
>>>>>>>         bootstrap.servers = [broker1:9092, broker2:9092,
>>>>>>> broker3:9092]
>>>>>>>         metric.reporters = []
>>>>>>>         client.id =
>>>>>>>         compression.type = none
>>>>>>>         retries = 0
>>>>>>>         max.request.size = 1048576
>>>>>>>         send.buffer.bytes = 131072
>>>>>>>         acks = all
>>>>>>>         reconnect.backoff.ms = 10
>>>>>>>         linger.ms = 0
>>>>>>>         metrics.num.samples = 2
>>>>>>>         metadata.fetch.timeout.ms = 60000
>>>>>>>
>>>>>>>
>>>>>>> Is it kafka broker getting down and job is getting killed ? Whats
>>>>>>> the best way to handle it ?
>>>>>>> Increasing retries and backoff time  wil help and to what values
>>>>>>> those should be set to never have streaming application failure - 
>>>>>>> rather it
>>>>>>> keep on retrying after few seconds and send a event so that my custom 
>>>>>>> code
>>>>>>> can send notification of kafka broker down if its because of that.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to