thanks, this direction seems to be inline with what I want. what i really want is groupBy() and then for the rows in each group, get an Iterator, and run each element from the iterator through a local function (specifically SGD), right now the DataSet API provides this , but it's literally an Iterator so I can't "reset" the Iterator, but SGD does need the ability to run multiple passes on the iterator
On Sat, Oct 22, 2016 at 1:22 PM, Koert Kuipers <ko...@tresata.com> wrote: > groupBy always materializes the entire group (on disk or in memory) which > is why you should avoid it for large groups. > > The key is to never materialize the grouped and shuffled data. > > To see one approach to do this take a look at > https://github.com/tresata/spark-sorted > > It's basically a combination of smart partitioning and secondary sort. > > On Oct 20, 2016 1:55 PM, "Yang" <teddyyyy...@gmail.com> wrote: > >> in my application, I group by same training samples by their model_id's >> (the input table contains training samples for 100k different models), >> then each group ends up having about 1 million training samples, >> >> then I feed that group of samples to a little Logistic Regression solver >> (SGD), but SGD requires the input data to be shuffled randomly (so that >> positive and negative samples are evenly distributed), so now I do >> something like >> >> my_input_rdd.groupBy(x=>x.model_id).map(x=> >> val (model_id, group_of_rows) = x >> >> (model_id, group_of_rows.toSeq().shuffle() ) >> >> ).map(x=> (x._1, train_sgd(x._2)) >> >> >> the issue is that on the 3rd row above, I had to explicitly call toSeq() >> on the group_of_rows in order to shuffle, which is an Iterable and not Seq. >> now I have to load the entire 1 million rows into memory, and in practice >> I've seen my tasks OOM and GC time goes crazy (about 50% of total run >> time). I suspect this toSeq() is the reason, since doing a simple count() >> on the groupBy() result works fine. >> >> I am planning to shuffle the my_input_rdd first, and then groupBy(), and >> not do the toSeq().shuffle(). intuitively the input rdd is already >> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the >> group SHOULD remain shuffled ???? but overall this remains rather flimsy. >> >> any ideas to do this more reliably? >> >> thanks! >> >>