On Monday 25 April 2016 11:28 PM, Weiping Qu wrote:
Dear Ted,

You are right. ReduceByKey is transformation. My fault.
I would rephrase my question using following code snippet.
object ScalaApp {

  def main(args: Array[String]): Unit ={
    val conf = new SparkConf().setAppName("ScalaApp").setMaster("local")
    val sc = new SparkContext(conf)
    //val textFile: RDD[String] =
    val file = sc.textFile("/home/usr/test.dat")
    val output = file.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)

    output.persist()
    output.count()
    output.collect()
}

It's a simple code snippet.
I realize that the first action count() would trigger the execution based on HadoopRDD, MapParititonRDD and the reduceByKey will take the ShuffleRDD as input to perform the count.

The count() will trigger both the execution as well as the persistence of output RDD (as each partition is iterated).

The second action collect just perform the collect over the same ShuffleRDD.

It will use the persisted ShuffleRDD blocks.

I think the re-calculation will also be carried out over ShuffleRDD instead of re-executing preceding HadoopRDD and MapParitionRDD in case one partition of persisted output is missing.
Am I right?

Since it is a partition of persisted ShuffleRDD that is missing, the partition will have to be recreated from the base HadoopRDD. To avoid it, one can checkpoint the ShuffleRDD if required.


Thanks and Regards,
Weiping

<snip>


regards
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to