Hi, It's been some time since my last message on the subject of using many RDDs in a Spark job, but I have just encountered the same problem again. The thing it's that I have an RDD of time tagged data, that I want to 1) divide into windows according to a timestamp field; 2) compute KMeans for each time window; 3) collect the results in a RDD of pairs that contains the centroids for each time window. For 1) I generate a RDD of pairs where the key is an id for the time window, but for 2) I find the problem that KMeans from MLlib only accepts a RDD, and I cannot call it from aggregateByKey. I think this is a reusability problem for any algorithms in MLlib based on passing an RDD, if we want to apply the algorithm independently to several groups of data. So the only approaches I can imagine are:
a) Generate an RDD per time window, which is easy to do but doesn't work because it's easy to end up with thousand of windows hence thousands of RDDs, which freezes the Spark scheduler, as seen in my previous messages b) Collect the set of ids for the time windows in the driver, and traverse that set by generating an RDD per each window, calling KMeans, and then storing the results with an export action. I will try that now and I think that could work because only one RDD per window will be present at the same time. The point here is that we avoid creating an RDD with a lineage dependending on a thousand RDDs, like in the collecting phase 3) of a). But that implies a sequential execution of the computation of KMeans, which is a waste of resources: imagine I have a cluster with 200 machines and I can compute each call to KMeans in 5 machines in 10 minutes, and I have 1000 windows to compute hence 1000 calls to KMeans; by sequencing the KMeans computations I would be having 195 idle machines and a running time of 10 * 1000 windows. Maybe this could be overcome by having not 1 RDD but m RDDs for some number m that doesn't freezes the Spark scheduler, but I think that's a not very clean workaround. Also, this makes very difficult to reuse this computation of KMeans by window in a bigger program, because I'm not able to get an RDD with a key per window id and the centroids in the values. The only way I imagine I could do that is by storing the pairs in a database during the export actions, and then loading all the results in a single RDD, but I would prefer to do everything inside Spark if possible. Maybe I'm missing something here, any idea would be appreciated. Thanks in advance for your help, Greetings, Juan Rodriguez 2015-02-18 20:23 GMT+01:00 Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com>: > Hi Sean, > > Thanks a lot for your answer. That explains it, as I was creating > thousands of RDDs, so I guess the communication overhead was the reason why > the Spark job was freezing. After changing the code to use RDDs of pairs > and aggregateByKey it works just fine, and quite fast. > > Again, thanks a lot for your help. > > Greetings, > > Juan > > 2015-02-18 15:35 GMT+01:00 Sean Owen <so...@cloudera.com>: > >> At some level, enough RDDs creates hundreds of thousands of tiny >> partitions of data each of which creates a task for each stage. The >> raw overhead of all the message passing can slow things down a lot. I >> would not design something to use an RDD per key. You would generally >> use key by some value you want to divide and filter on, and then use a >> *ByKey to do your work. >> >> You don't work with individual RDDs this way, but usually that's good >> news. You usually have a lot more flexibility operating just in pure >> Java / Scala to do whatever you need inside your function. >> >> On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá >> <juan.rodriguez.hort...@gmail.com> wrote: >> > Hi Paweł, >> > >> > Thanks a lot for your answer. I finally got the program to work by using >> > aggregateByKey, but I was wondering why creating thousands of RDDs >> doesn't >> > work. I think that could be interesting for using methods that work on >> RDDs >> > like for example JavaDoubleRDD.stats() ( >> > >> http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29 >> ). >> > If the groups are small then I can chain groupBy(), collect(), >> parallelize() >> > and stats(), but that is quite inefficient because it implies moving >> data to >> > and from the driver, and also doesn't scale to big groups; on the other >> hand >> > if I use aggregateByKey or a similar function then I cannot use stats() >> so I >> > have to reimplement it. In general I was looking for a way to reuse >> other >> > functions that I have that work on RDDs, for using them on groups of >> data in >> > a RDD, because I don't see a how to directly apply them to each of the >> > groups in a pair RDD. >> > >> > Again, thanks a lot for your answer, >> > >> > Greetings, >> > >> > Juan Rodriguez >> > >> > >> > >> > >> > 2015-02-18 14:56 GMT+01:00 Paweł Szulc <paul.sz...@gmail.com>: >> >> >> >> Maybe you can omit using grouping all together with groupByKey? What is >> >> your next step after grouping elements by key? Are you trying to reduce >> >> values? If so then I would recommend using some reducing functions >> like for >> >> example reduceByKey or aggregateByKey. Those will first reduce value >> for >> >> each key locally on each node before doing actual IO over the network. >> There >> >> will also be no grouping phase so you will not run into memory issues. >> >> >> >> Please let me know if that helped >> >> >> >> Pawel Szulc >> >> @rabbitonweb >> >> http://www.rabbitonweb.com >> >> >> >> >> >> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá >> >> <juan.rodriguez.hort...@gmail.com> wrote: >> >>> >> >>> Hi, >> >>> >> >>> I'm writing a Spark program where I want to divide a RDD into >> different >> >>> groups, but the groups are too big to use groupByKey. To cope with >> that, >> >>> since I know in advance the list of keys for each group, I build a >> map from >> >>> the keys to the RDDs that result from filtering the input RDD to get >> the >> >>> records for the corresponding key. This works when I have a small >> number of >> >>> keys, but for big number of keys (tens of thousands) the execution >> gets >> >>> stuck, without issuing any new Spark stage. I suspect the reason is >> that the >> >>> Spark scheduler is not able to handle so many RDDs. Does it make >> sense? I'm >> >>> rewriting the program to use a single RDD of pairs, with cached >> partions, >> >>> but I wanted to be sure I understand the problem here. >> >>> >> >>> Thanks a lot in advance, >> >>> >> >>> Greetings, >> >>> >> >>> Juan Rodriguez >> >> >> >> >> > >> > >