spark git commit: [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing

2015-12-01 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1cf9d3858 -> 81db8d086


[SPARK-12004] Preserve the RDD partitioner through RDD checkpointing

The solution is the save the RDD partitioner in a separate file in the RDD 
checkpoint directory. That is, `/_partitioner`.  In most cases, 
whether the RDD partitioner was recovered or not, does not affect the 
correctness, only reduces performance. So this solution makes a best-effort 
attempt to save and recover the partitioner. If either fails, the checkpointing 
is not affected. This makes this patch safe and backward compatible.

Author: Tathagata Das 

Closes #9983 from tdas/SPARK-12004.

(cherry picked from commit 60b541ee1b97c9e5e84aa2af2ce856f316ad22b3)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81db8d08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81db8d08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81db8d08

Branch: refs/heads/branch-1.6
Commit: 81db8d086bbfe72caa0c45a395ebcaed80b5c237
Parents: 1cf9d38
Author: Tathagata Das 
Authored: Tue Dec 1 14:08:36 2015 -0800
Committer: Andrew Or 
Committed: Tue Dec 1 14:08:45 2015 -0800

--
 .../spark/rdd/ReliableCheckpointRDD.scala   | 122 ++-
 .../spark/rdd/ReliableRDDCheckpointData.scala   |  21 +---
 .../org/apache/spark/CheckpointSuite.scala  |  61 +-
 3 files changed, 173 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81db8d08/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index a69be6a..fa71b8c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -20,12 +20,12 @@ package org.apache.spark.rdd
 import java.io.IOException
 
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
@@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, 
Utils}
  */
 private[spark] class ReliableCheckpointRDD[T: ClassTag](
 sc: SparkContext,
-val checkpointPath: String)
-  extends CheckpointRDD[T](sc) {
+val checkpointPath: String,
+_partitioner: Option[Partitioner] = None
+  ) extends CheckpointRDD[T](sc) {
 
   @transient private val hadoopConf = sc.hadoopConfiguration
   @transient private val cpath = new Path(checkpointPath)
@@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
   /**
* Return the path of the checkpoint directory this RDD reads data from.
*/
-  override def getCheckpointFile: Option[String] = Some(checkpointPath)
+  override val getCheckpointFile: Option[String] = Some(checkpointPath)
+
+  override val partitioner: Option[Partitioner] = {
+_partitioner.orElse {
+  ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, 
checkpointPath)
+}
+  }
 
   /**
* Return partitions described by the files in the checkpoint directory.
@@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends 
Logging {
 "part-%05d".format(partitionIndex)
   }
 
+  private def checkpointPartitionerFileName(): String = {
+"_partitioner"
+  }
+
+  /**
+   * Write RDD to checkpoint files and return a ReliableCheckpointRDD 
representing the RDD.
+   */
+  def writeRDDToCheckpointDirectory[T: ClassTag](
+  originalRDD: RDD[T],
+  checkpointDir: String,
+  blockSize: Int = -1): ReliableCheckpointRDD[T] = {
+
+val sc = originalRDD.sparkContext
+
+// Create the output path for the checkpoint
+val checkpointDirPath = new Path(checkpointDir)
+val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
+if (!fs.mkdirs(checkpointDirPath)) {
+  throw new SparkException(s"Failed to create checkpoint path 
$checkpointDirPath")
+}
+
+// Save to file, and reload it as an RDD
+val broadcastedConf = sc.broadcast(
+  new SerializableConfiguration(sc.hadoopConfiguration))
+// TODO: This is expensive because it computes the RDD again unnecessarily 
(SPARK-8582)
+sc.runJob(originalRDD,
+  writePartitionToCheckpointFile[T](checkpointDirPath.toString, 
broadcastedConf) _)
+
+if (originalRDD.partitioner.nonEmpty) {
+  writePartitionerToCheckpointDir(sc, 

spark git commit: [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing

2015-12-01 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 2cef1cdfb -> 60b541ee1


[SPARK-12004] Preserve the RDD partitioner through RDD checkpointing

The solution is the save the RDD partitioner in a separate file in the RDD 
checkpoint directory. That is, `/_partitioner`.  In most cases, 
whether the RDD partitioner was recovered or not, does not affect the 
correctness, only reduces performance. So this solution makes a best-effort 
attempt to save and recover the partitioner. If either fails, the checkpointing 
is not affected. This makes this patch safe and backward compatible.

Author: Tathagata Das 

Closes #9983 from tdas/SPARK-12004.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60b541ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60b541ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60b541ee

Branch: refs/heads/master
Commit: 60b541ee1b97c9e5e84aa2af2ce856f316ad22b3
Parents: 2cef1cd
Author: Tathagata Das 
Authored: Tue Dec 1 14:08:36 2015 -0800
Committer: Andrew Or 
Committed: Tue Dec 1 14:08:36 2015 -0800

--
 .../spark/rdd/ReliableCheckpointRDD.scala   | 122 ++-
 .../spark/rdd/ReliableRDDCheckpointData.scala   |  21 +---
 .../org/apache/spark/CheckpointSuite.scala  |  61 +-
 3 files changed, 173 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/60b541ee/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index a69be6a..fa71b8c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -20,12 +20,12 @@ package org.apache.spark.rdd
 import java.io.IOException
 
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
@@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, 
Utils}
  */
 private[spark] class ReliableCheckpointRDD[T: ClassTag](
 sc: SparkContext,
-val checkpointPath: String)
-  extends CheckpointRDD[T](sc) {
+val checkpointPath: String,
+_partitioner: Option[Partitioner] = None
+  ) extends CheckpointRDD[T](sc) {
 
   @transient private val hadoopConf = sc.hadoopConfiguration
   @transient private val cpath = new Path(checkpointPath)
@@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
   /**
* Return the path of the checkpoint directory this RDD reads data from.
*/
-  override def getCheckpointFile: Option[String] = Some(checkpointPath)
+  override val getCheckpointFile: Option[String] = Some(checkpointPath)
+
+  override val partitioner: Option[Partitioner] = {
+_partitioner.orElse {
+  ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, 
checkpointPath)
+}
+  }
 
   /**
* Return partitions described by the files in the checkpoint directory.
@@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends 
Logging {
 "part-%05d".format(partitionIndex)
   }
 
+  private def checkpointPartitionerFileName(): String = {
+"_partitioner"
+  }
+
+  /**
+   * Write RDD to checkpoint files and return a ReliableCheckpointRDD 
representing the RDD.
+   */
+  def writeRDDToCheckpointDirectory[T: ClassTag](
+  originalRDD: RDD[T],
+  checkpointDir: String,
+  blockSize: Int = -1): ReliableCheckpointRDD[T] = {
+
+val sc = originalRDD.sparkContext
+
+// Create the output path for the checkpoint
+val checkpointDirPath = new Path(checkpointDir)
+val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
+if (!fs.mkdirs(checkpointDirPath)) {
+  throw new SparkException(s"Failed to create checkpoint path 
$checkpointDirPath")
+}
+
+// Save to file, and reload it as an RDD
+val broadcastedConf = sc.broadcast(
+  new SerializableConfiguration(sc.hadoopConfiguration))
+// TODO: This is expensive because it computes the RDD again unnecessarily 
(SPARK-8582)
+sc.runJob(originalRDD,
+  writePartitionToCheckpointFile[T](checkpointDirPath.toString, 
broadcastedConf) _)
+
+if (originalRDD.partitioner.nonEmpty) {
+  writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, 
checkpointDirPath)
+}
+
+val newRDD = new ReliableCheckpointRDD[T](
+  sc,