Hi,there
I problem an issue when using the zippartition, first I created a rdd
from a seq,then created another one,and zippartitioned them with rdd3, then
cached the rdd3,then created a new rdd ,and zippartitioned it with rdd3.I
repeat this operation many times, and I found that,
The task serialized became bigger and bigger, the serialize time cost became
bigger too.Does anyone else encounter the same problem ,please help.
Code:
def main(args: Array[String]) {
var i: Int = 0
var seq = Seq[String]()
while (i < 720) {
{
seq =
seq.+:("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
}
i = i + 1
}
val sc = new SparkContext("spark://hdh010016146205:7077", "ScalaTest")
var rdd0: RDD[String] = null
var j = 0;
while (j < 200) {
j = j + 1
val rdd1 = sc.parallelize(seq, 72)
if (rdd0 == null) {
rdd0 = rdd1
rdd0.cache()
rdd0.count()
} else {
val rdd2 = new BillZippedPartitionsRDD2[String, String, String](sc, {
(thisIter, otherIter) =>
// val rdd2 = rdd0.zipPartitions(rdd1, true)({ (thisIter,
otherIter) =>
new Iterator[String] {
def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
case _ => throw new SparkException("Can only zip RDDs with " +
"same number of elements in each partition")
}
def next(): String = (thisIter.next() + "--" + otherIter.next())
}
}, rdd0, rdd1, false)
rdd2.cache()
rdd2.count()
rdd0 = rdd2
}
}
}