
It's been some time since my last message on the subject of using many RDDs
in a Spark job, but I have just encountered the same problem again. The
thing it's that I have an RDD of time tagged data, that I want to 1) divide
into windows according to a timestamp field; 2) compute KMeans for each
time window; 3) collect the results in a RDD of pairs that contains the
centroids for each time window. For 1) I generate a RDD of pairs where the
key is an id for the time window, but for 2) I find the problem that KMeans
from MLlib only accepts a RDD, and I cannot call it from aggregateByKey. I
think this is a reusability problem for any algorithms in MLlib based on
passing an RDD, if we want to apply the algorithm independently to several
groups of data. So the only approaches I can imagine are:

a) Generate an RDD per time window, which is easy to do but doesn't work
because it's easy to end up with thousand of windows hence thousands of
RDDs, which freezes the Spark scheduler, as seen in my previous messages

b) Collect the set of ids for the time windows in the driver, and traverse
that set by generating an RDD per each window, calling KMeans, and then
storing the results with an export action. I will try that now and I think
that could work because only one RDD per window will be present at the same
time. The point here is that we avoid creating an RDD with a lineage
dependending on a thousand RDDs, like in the collecting phase 3) of a). But
that implies a sequential execution of the computation of KMeans, which is
a waste of resources: imagine I have a cluster with 200 machines and I can
compute each call to KMeans in 5 machines in 10 minutes, and I have 1000
windows to compute hence 1000 calls to KMeans; by sequencing the KMeans
computations I would be having 195 idle machines and a running time of 10 *
1000 windows. Maybe this could be overcome by having not 1 RDD but m RDDs
for some number m that doesn't freezes the Spark scheduler, but I think
that's a not very clean workaround.  Also, this makes very difficult to
reuse this computation of KMeans by window in a bigger program, because I'm
not able to get an RDD with a key per window id and the centroids in the
values. The only way I imagine I could do that is by storing the pairs in a
database during the export actions, and then loading all the results in a
single RDD, but I would prefer to do everything inside Spark if possible.

Maybe I'm missing something here, any idea would be appreciated.

Thanks in advance for your help,


Juan Rodriguez

2015-02-18 20:23 GMT+01:00 Juan Rodríguez Hortalá <

> Hi Sean,
> Thanks a lot for your answer. That explains it, as I was creating
> thousands of RDDs, so I guess the communication overhead was the reason why
> the Spark job was freezing. After changing the code to use RDDs of pairs
> and aggregateByKey it works just fine, and quite fast.
> Again, thanks a lot for your help.
> Greetings,
> Juan
> 2015-02-18 15:35 GMT+01:00 Sean Owen <so...@cloudera.com>:
>> At some level, enough RDDs creates hundreds of thousands of tiny
>> partitions of data each of which creates a task for each stage. The
>> raw overhead of all the message passing can slow things down a lot. I
>> would not design something to use an RDD per key. You would generally
>> use key by some value you want to divide and filter on, and then use a
>> *ByKey to do your work.
>> You don't work with individual RDDs this way, but usually that's good
>> news. You usually have a lot more flexibility operating just in pure
>> Java / Scala to do whatever you need inside your function.
>> On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá
>> <juan.rodriguez.hort...@gmail.com> wrote:
>> > Hi Paweł,
>> >
>> > Thanks a lot for your answer. I finally got the program to work by using
>> > aggregateByKey, but I was wondering why creating thousands of RDDs
>> doesn't
>> > work. I think that could be interesting for using methods that work on
>> RDDs
>> > like for example JavaDoubleRDD.stats() (
>> >
>> http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29
>> ).
>> > If the groups are small then I can chain groupBy(), collect(),
>> parallelize()
>> > and stats(), but that is quite inefficient because it implies moving
>> data to
>> > and from the driver, and also doesn't scale to big groups; on the other
>> hand
>> > if I use aggregateByKey or a similar function then I cannot use stats()
>> so I
>> > have to reimplement it. In general I was looking for a way to reuse
>> other
>> > functions that I have that work on RDDs, for using them on groups of
>> data in
>> > a RDD, because I don't see a how to directly apply them to each of the
>> > groups in a pair RDD.
>> >
>> > Again, thanks a lot for your answer,
>> >
>> > Greetings,
>> >
>> > Juan Rodriguez
>> >
>> >
>> >
>> >
>> > 2015-02-18 14:56 GMT+01:00 Paweł Szulc <paul.sz...@gmail.com>:
>> >>
>> >> Maybe you can omit using grouping all together with groupByKey? What is
>> >> your next step after grouping elements by key? Are you trying to reduce
>> >> values? If so then I would recommend using some reducing functions
>> like for
>> >> example reduceByKey or aggregateByKey. Those will first reduce value
>> for
>> >> each key locally on each node before doing actual IO over the network.
>> There
>> >> will also be no grouping phase so you will not run into memory issues.
>> >>
>> >> Please let me know if that helped
>> >>
>> >> Pawel Szulc
>> >> @rabbitonweb
>> >> http://www.rabbitonweb.com
>> >>
>> >>
>> >> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá
>> >> <juan.rodriguez.hort...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm writing a Spark program where I want to divide a RDD into
>> different
>> >>> groups, but the groups are too big to use groupByKey. To cope with
>> that,
>> >>> since I know in advance the list of keys for each group, I build a
>> map from
>> >>> the keys to the RDDs that result from filtering the input RDD to get
>> the
>> >>> records for the corresponding key. This works when I have a small
>> number of
>> >>> keys, but for big number of keys (tens of thousands) the execution
>> gets
>> >>> stuck, without issuing any new Spark stage. I suspect the reason is
>> that the
>> >>> Spark scheduler is not able to handle so many RDDs. Does it make
>> sense? I'm
>> >>> rewriting the program to use a single RDD of pairs, with cached
>> partions,
>> >>> but I wanted to be sure I understand the problem here.
>> >>>
>> >>> Thanks a lot in advance,
>> >>>
>> >>> Greetings,
>> >>>
>> >>> Juan Rodriguez
>> >>
>> >>
>> >

Reply via email to