|
Hi, I'm struggling a bit with something : I have several datasets RDD[((Int,Int),Double)] that I want to merge. I've tried with union+reduceByKey and cogroup+mapValues, but in all cases it seems that if I don't force the computation of the RDD, the final task fails, probably because the dataset is too big. Currently I use count() and persist() to force the computation, but I suspect there is a useless overhead when doing so. Is there any other way to force the computation ? Or any advice on that kind of matter ? Numbers : 10 datasets of 200M-400M elements, when merged, 2B elements. My code below : def reduceCooccurrences(datasets:List[RDD[(Cooccurrence,Double)]]): RDD[(Cooccurrence, Double)] = {
println("Reducing a list of " + datasets.length)
val result = if (datasets.length == 1) datasets(0) else if (datasets.length == 2) {
datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")}
val r = datasets(0).cogroup(datasets(1)).mapValues { case (sda,sdb) =>
sda.sum + sdb.sum } datasets.map(_.unpersist()) r } else if (datasets.length == 3) {
datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")}
val r = datasets(0).cogroup(datasets(1), datasets(2)).mapValues { case (sda,sdb,sdc) =>
sda.sum + sdb.sum + sdc.sum } datasets.map(_.unpersist()) r } else {
val (b,e) = datasets.splitAt(datasets.length / 2) reduceCooccurrences(b).cogroup(reduceCooccurrences(e)).mapValues { case (sda,sdb) =>
sda.sum + sdb.sum } } result.persist(StorageLevel.MEMORY_AND_DISK_SER) println("Total elements " + result.count())
result }
def mergeCooccurrences(sc: SparkContext, inputs: List[String], output: String, symmetric: Boolean, dumpText: Boolean) {
val nbSets = inputs.length
val datasets = inputs.map{ input =>
SparseMatrixIO.load(sc, input).groupByKey().mapValues{
sd => sd.map(_.get()).sum }.persist(StorageLevel.MEMORY_AND_DISK_SER) } val dataset = reduceCooccurrences(datasets) val result = if (symmetric) {
dataset.flatMap {
case c => Seq(c,(new Cooccurrence(c._1.j, c._1.i), c._2)) }.groupByKey().mapValues{ sd => new DoubleWritable(sd.map(_.get()).sum) }
} else {
dataset.mapValues(new DoubleWritable(_)) } SparseMatrixIO.write(output, result, dumpText) } NB : Cooccurrence <=> (Int,Int) and SparseMatrixIO.load or wirte basically call newAPIHadoopFile and saveAsNewAPIHadoopFile --
|
- Forcing RDD computation with something else than count(... Guillaume Pitel
- Re: Forcing RDD computation with something else th... madhu phatak
- Re: Forcing RDD computation with something els... Guillaume Pitel
- Re: Forcing RDD computation with something... Reynold Xin
- Re: Forcing RDD computation with somet... Christopher Nguyen
- Re: Forcing RDD computation with ... Guillaume Pitel

