Hi,there
         I problem an issue when using the zippartition, first I created a rdd 
from a seq,then created another one,and zippartitioned them with rdd3, then 
cached the rdd3,then created a new rdd ,and zippartitioned it with rdd3.I 
repeat this operation many times, and I found that,
The task serialized became bigger and bigger, the serialize time cost became 
bigger too.Does anyone else encounter the same problem ,please help.
Code:
def main(args: Array[String]) {
  var i: Int = 0
  var seq = Seq[String]()
  while (i < 720) {
    {
      seq = 
seq.+:("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
    }
    i = i + 1
  }
  val sc = new SparkContext("spark://hdh010016146205:7077", "ScalaTest")
  var rdd0: RDD[String] = null
  var j = 0;
  while (j < 200) {
    j = j + 1
    val rdd1 = sc.parallelize(seq, 72)
    if (rdd0 == null) {
      rdd0 = rdd1
      rdd0.cache()
      rdd0.count()
    } else {
      val rdd2 = new BillZippedPartitionsRDD2[String, String, String](sc, { 
(thisIter, otherIter) =>
        //        val rdd2 = rdd0.zipPartitions(rdd1, true)({ (thisIter, 
otherIter) =>
        new Iterator[String] {
          def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
            case (true, true) => true
            case (false, false) => false
            case _ => throw new SparkException("Can only zip RDDs with " +
              "same number of elements in each partition")
          }
          def next(): String = (thisIter.next() + "--" + otherIter.next())
        }
      }, rdd0, rdd1, false)
      rdd2.cache()
      rdd2.count()
      rdd0 = rdd2
    }
  }
}

Reply via email to