Re: "collecting" DStream data

2016-05-15 Thread Daniel Haviv
I mistyped, the code is
foreachRDD(r=> arr++=r.collect)

And it does work for ArrayBuffer but not for HashMap

On Sun, May 15, 2016 at 3:04 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Daniel,
>
> Given your example, “arr” is defined on the driver, but the “foreachRDD”
> function is run on the executors. If you want to collect the results of the
> RDD/DStream down to the driver you need to call RDD.collect. You have to be
> careful though that you have enough memory on the driver JVM to hold the
> results, otherwise you’ll have an OOM exception. Also, you can’t update the
> value of a broadcast variable, since it’s immutable.
>
> Thanks,
> Silvio
>
> From: Daniel Haviv 
> Date: Sunday, May 15, 2016 at 6:23 AM
> To: user 
> Subject: "collecting" DStream data
>
> Hi,
> I have a DStream I'd like to collect and broadcast it's values.
> To do so I've created a mutable HashMap which i'm filling with foreachRDD
> but when I'm checking it, it remains empty. If I use ArrayBuffer it works
> as expected.
>
> This is my code:
>
> val arr = scala.collection.mutable.HashMap.empty[String,String]
> MappedVersionsToBrodcast.foreachRDD(r=> { r.foreach(r=> { arr+=r})   } )
>
>
> What am I missing here?
>
> Thank you,
> Daniel
>
>


Re: "collecting" DStream data

2016-05-15 Thread Silvio Fiorito
Hi Daniel,

Given your example, “arr” is defined on the driver, but the “foreachRDD” 
function is run on the executors. If you want to collect the results of the 
RDD/DStream down to the driver you need to call RDD.collect. You have to be 
careful though that you have enough memory on the driver JVM to hold the 
results, otherwise you’ll have an OOM exception. Also, you can’t update the 
value of a broadcast variable, since it’s immutable.

Thanks,
Silvio

From: Daniel Haviv 
>
Date: Sunday, May 15, 2016 at 6:23 AM
To: user >
Subject: "collecting" DStream data

Hi,
I have a DStream I'd like to collect and broadcast it's values.
To do so I've created a mutable HashMap which i'm filling with foreachRDD but 
when I'm checking it, it remains empty. If I use ArrayBuffer it works as 
expected.

This is my code:

val arr = scala.collection.mutable.HashMap.empty[String,String]
MappedVersionsToBrodcast.foreachRDD(r=> { r.foreach(r=> { arr+=r})   } )


What am I missing here?

Thank you,
Daniel