Hi,

I'm struggling a bit with something : I have several datasets RDD[((Int,Int),Double)] that I want to merge.

I've tried with union+reduceByKey and cogroup+mapValues, but in all cases it seems that if I don't force the computation of the RDD, the final task fails, probably because the dataset is too big.

Currently I use count() and persist() to force the computation, but I suspect there is a useless overhead when doing so. Is there any other way to force the computation ?

Or any advice on that kind of matter ? Numbers : 10 datasets of 200M-400M elements, when merged, 2B elements.

My code below :

  def reduceCooccurrences(datasets:List[RDD[(Cooccurrence,Double)]]): RDD[(Cooccurrence, Double)] = {
    println("Reducing a list of " + datasets.length)
    val result = if (datasets.length == 1)
      datasets(0)
    else if (datasets.length == 2) {
      datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")}
      val r = datasets(0).cogroup(datasets(1)).mapValues { case (sda,sdb) =>
        sda.sum + sdb.sum
      }
      datasets.map(_.unpersist())
      r
    }
    else if (datasets.length == 3) {
      datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")}
      val r = datasets(0).cogroup(datasets(1), datasets(2)).mapValues { case (sda,sdb,sdc) =>
        sda.sum + sdb.sum + sdc.sum
      }
      datasets.map(_.unpersist())
      r
    } else {
      val (b,e) = datasets.splitAt(datasets.length / 2)
      reduceCooccurrences(b).cogroup(reduceCooccurrences(e)).mapValues { case (sda,sdb) =>
        sda.sum + sdb.sum
      }
    }
    result.persist(StorageLevel.MEMORY_AND_DISK_SER)
    println("Total elements " + result.count())
    result
  }
  def mergeCooccurrences(sc: SparkContext, inputs: List[String], output: String, symmetric: Boolean, dumpText: Boolean) {
    val nbSets = inputs.length
    val datasets = inputs.map{ input =>
      SparseMatrixIO.load(sc, input).groupByKey().mapValues{
        sd => sd.map(_.get()).sum
      }.persist(StorageLevel.MEMORY_AND_DISK_SER) }
    val dataset = reduceCooccurrences(datasets)

    
    val result = if (symmetric) {
      dataset.flatMap {
        case c => Seq(c,(new Cooccurrence(c._1.j, c._1.i), c._2))
      }.groupByKey().mapValues{ sd => new DoubleWritable(sd.map(_.get()).sum) }
    } else {
      dataset.mapValues(new DoubleWritable(_))
    }
    SparseMatrixIO.write(output, result, dumpText)
  }

NB :
Cooccurrence <=> (Int,Int)
and SparseMatrixIO.load or wirte basically call newAPIHadoopFile and saveAsNewAPIHadoopFile

--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05

Reply via email to