Hi, You can call less expensive operations like first or take to trigger the computation.
On Tue, Jan 21, 2014 at 2:32 PM, Guillaume Pitel <[email protected] > wrote: > Hi, > > I'm struggling a bit with something : I have several datasets > RDD[((Int,Int),Double)] that I want to merge. > > I've tried with union+reduceByKey and cogroup+mapValues, but in all cases > it seems that if I don't force the computation of the RDD, the final task > fails, probably because the dataset is too big. > > Currently I use count() and persist() to force the computation, but I > suspect there is a useless overhead when doing so. Is there any other way > to force the computation ? > > Or any advice on that kind of matter ? Numbers : 10 datasets of 200M-400M > elements, when merged, 2B elements. > > My code below : > > def reduceCooccurrences(datasets:List[RDD[(Cooccurrence,Double)]]): > RDD[(Cooccurrence, Double)] = { > > println("Reducing a list of " + datasets.length) > > val result = if (datasets.length == 1) > > datasets(0) > > else if (datasets.length == 2) { > > datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")} > > val r = datasets(0).cogroup(datasets(1)).mapValues { case (sda,sdb) => > > sda.sum + sdb.sum > > } > > datasets.map(_.unpersist()) > > r > > } > > else if (datasets.length == 3) { > > datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")} > > val r = datasets(0).cogroup(datasets(1), datasets(2)).mapValues { case > (sda,sdb,sdc) => > > sda.sum + sdb.sum + sdc.sum > > } > > datasets.map(_.unpersist()) > > r > > } else { > > val (b,e) = datasets.splitAt(datasets.length / 2) > > reduceCooccurrences(b).cogroup(reduceCooccurrences(e)).mapValues { case > (sda,sdb) => > > sda.sum + sdb.sum > > } > > } > > result.persist(StorageLevel.MEMORY_AND_DISK_SER) > > println("Total elements " + result.count()) > > result > > } > > def mergeCooccurrences(sc: SparkContext, inputs: List[String], output: > String, symmetric: Boolean, dumpText: Boolean) { > > val nbSets = inputs.length > > val datasets = inputs.map{ input => > > SparseMatrixIO.load(sc, input).groupByKey().mapValues{ > > sd => sd.map(_.get()).sum > > }.persist(StorageLevel.MEMORY_AND_DISK_SER) } > > val dataset = reduceCooccurrences(datasets) > > val result = if (symmetric) { > > dataset.flatMap { > > case c => Seq(c,(new Cooccurrence(c._1.j, c._1.i), c._2)) > > }.groupByKey().mapValues{ sd => new DoubleWritable(sd.map(_.get()).sum) > } > > } else { > > dataset.mapValues(new DoubleWritable(_)) > > } > > SparseMatrixIO.write(output, result, dumpText) > > } > > > NB : > Cooccurrence <=> (Int,Int) > and SparseMatrixIO.load or wirte basically call newAPIHadoopFile and > saveAsNewAPIHadoopFile > > -- > [image: eXenSa] > *Guillaume PITEL, Président* > +33(0)6 25 48 86 80 > > eXenSa S.A.S. <http://www.exensa.com/> > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 > -- https://github.com/zinnia-phatak-dev/Nectar
<<exensa_logo_mail.png>>
