spark git commit: [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing
Repository: spark Updated Branches: refs/heads/branch-1.6 1cf9d3858 -> 81db8d086 [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, `/_partitioner`. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible. Author: Tathagata DasCloses #9983 from tdas/SPARK-12004. (cherry picked from commit 60b541ee1b97c9e5e84aa2af2ce856f316ad22b3) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81db8d08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81db8d08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81db8d08 Branch: refs/heads/branch-1.6 Commit: 81db8d086bbfe72caa0c45a395ebcaed80b5c237 Parents: 1cf9d38 Author: Tathagata Das Authored: Tue Dec 1 14:08:36 2015 -0800 Committer: Andrew Or Committed: Tue Dec 1 14:08:45 2015 -0800 -- .../spark/rdd/ReliableCheckpointRDD.scala | 122 ++- .../spark/rdd/ReliableRDDCheckpointData.scala | 21 +--- .../org/apache/spark/CheckpointSuite.scala | 61 +- 3 files changed, 173 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81db8d08/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index a69be6a..fa71b8c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -20,12 +20,12 @@ package org.apache.spark.rdd import java.io.IOException import scala.reflect.ClassTag +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.{SerializableConfiguration, Utils} /** @@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} */ private[spark] class ReliableCheckpointRDD[T: ClassTag]( sc: SparkContext, -val checkpointPath: String) - extends CheckpointRDD[T](sc) { +val checkpointPath: String, +_partitioner: Option[Partitioner] = None + ) extends CheckpointRDD[T](sc) { @transient private val hadoopConf = sc.hadoopConfiguration @transient private val cpath = new Path(checkpointPath) @@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( /** * Return the path of the checkpoint directory this RDD reads data from. */ - override def getCheckpointFile: Option[String] = Some(checkpointPath) + override val getCheckpointFile: Option[String] = Some(checkpointPath) + + override val partitioner: Option[Partitioner] = { +_partitioner.orElse { + ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) +} + } /** * Return partitions described by the files in the checkpoint directory. @@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends Logging { "part-%05d".format(partitionIndex) } + private def checkpointPartitionerFileName(): String = { +"_partitioner" + } + + /** + * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. + */ + def writeRDDToCheckpointDirectory[T: ClassTag]( + originalRDD: RDD[T], + checkpointDir: String, + blockSize: Int = -1): ReliableCheckpointRDD[T] = { + +val sc = originalRDD.sparkContext + +// Create the output path for the checkpoint +val checkpointDirPath = new Path(checkpointDir) +val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) +if (!fs.mkdirs(checkpointDirPath)) { + throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") +} + +// Save to file, and reload it as an RDD +val broadcastedConf = sc.broadcast( + new SerializableConfiguration(sc.hadoopConfiguration)) +// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) +sc.runJob(originalRDD, + writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) + +if (originalRDD.partitioner.nonEmpty) { + writePartitionerToCheckpointDir(sc,
spark git commit: [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing
Repository: spark Updated Branches: refs/heads/master 2cef1cdfb -> 60b541ee1 [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, `/_partitioner`. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible. Author: Tathagata DasCloses #9983 from tdas/SPARK-12004. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60b541ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60b541ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60b541ee Branch: refs/heads/master Commit: 60b541ee1b97c9e5e84aa2af2ce856f316ad22b3 Parents: 2cef1cd Author: Tathagata Das Authored: Tue Dec 1 14:08:36 2015 -0800 Committer: Andrew Or Committed: Tue Dec 1 14:08:36 2015 -0800 -- .../spark/rdd/ReliableCheckpointRDD.scala | 122 ++- .../spark/rdd/ReliableRDDCheckpointData.scala | 21 +--- .../org/apache/spark/CheckpointSuite.scala | 61 +- 3 files changed, 173 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60b541ee/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index a69be6a..fa71b8c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -20,12 +20,12 @@ package org.apache.spark.rdd import java.io.IOException import scala.reflect.ClassTag +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.{SerializableConfiguration, Utils} /** @@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} */ private[spark] class ReliableCheckpointRDD[T: ClassTag]( sc: SparkContext, -val checkpointPath: String) - extends CheckpointRDD[T](sc) { +val checkpointPath: String, +_partitioner: Option[Partitioner] = None + ) extends CheckpointRDD[T](sc) { @transient private val hadoopConf = sc.hadoopConfiguration @transient private val cpath = new Path(checkpointPath) @@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( /** * Return the path of the checkpoint directory this RDD reads data from. */ - override def getCheckpointFile: Option[String] = Some(checkpointPath) + override val getCheckpointFile: Option[String] = Some(checkpointPath) + + override val partitioner: Option[Partitioner] = { +_partitioner.orElse { + ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) +} + } /** * Return partitions described by the files in the checkpoint directory. @@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends Logging { "part-%05d".format(partitionIndex) } + private def checkpointPartitionerFileName(): String = { +"_partitioner" + } + + /** + * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. + */ + def writeRDDToCheckpointDirectory[T: ClassTag]( + originalRDD: RDD[T], + checkpointDir: String, + blockSize: Int = -1): ReliableCheckpointRDD[T] = { + +val sc = originalRDD.sparkContext + +// Create the output path for the checkpoint +val checkpointDirPath = new Path(checkpointDir) +val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) +if (!fs.mkdirs(checkpointDirPath)) { + throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") +} + +// Save to file, and reload it as an RDD +val broadcastedConf = sc.broadcast( + new SerializableConfiguration(sc.hadoopConfiguration)) +// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) +sc.runJob(originalRDD, + writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) + +if (originalRDD.partitioner.nonEmpty) { + writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) +} + +val newRDD = new ReliableCheckpointRDD[T]( + sc,