groupBy always materializes the entire group (on disk or in memory) which is why you should avoid it for large groups.
The key is to never materialize the grouped and shuffled data. To see one approach to do this take a look at https://github.com/tresata/spark-sorted It's basically a combination of smart partitioning and secondary sort. On Oct 20, 2016 1:55 PM, "Yang" <teddyyyy...@gmail.com> wrote: > in my application, I group by same training samples by their model_id's > (the input table contains training samples for 100k different models), > then each group ends up having about 1 million training samples, > > then I feed that group of samples to a little Logistic Regression solver > (SGD), but SGD requires the input data to be shuffled randomly (so that > positive and negative samples are evenly distributed), so now I do > something like > > my_input_rdd.groupBy(x=>x.model_id).map(x=> > val (model_id, group_of_rows) = x > > (model_id, group_of_rows.toSeq().shuffle() ) > > ).map(x=> (x._1, train_sgd(x._2)) > > > the issue is that on the 3rd row above, I had to explicitly call toSeq() > on the group_of_rows in order to shuffle, which is an Iterable and not Seq. > now I have to load the entire 1 million rows into memory, and in practice > I've seen my tasks OOM and GC time goes crazy (about 50% of total run > time). I suspect this toSeq() is the reason, since doing a simple count() > on the groupBy() result works fine. > > I am planning to shuffle the my_input_rdd first, and then groupBy(), and > not do the toSeq().shuffle(). intuitively the input rdd is already > shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the > group SHOULD remain shuffled ???? but overall this remains rather flimsy. > > any ideas to do this more reliably? > > thanks! > >