Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9831#discussion_r45382804
  
    --- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---
    @@ -21,17 +21,222 @@ import java.io.File
     
     import scala.reflect.ClassTag
     
    +import org.apache.spark.CheckpointSuite._
     import org.apache.spark.rdd._
     import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
     import org.apache.spark.util.Utils
     
    +trait RDDCheckpointTester { self: SparkFunSuite =>
    +
    +  protected val partitioner = new HashPartitioner(2)
    +
    +  private def defaultCollectFunc[T](rdd: RDD[T]): Any = rdd.collect()
    +
    +  /** Implementations of this trait must implement this method */
    +  protected def sparkContext: SparkContext
    +
    +  /**
    +   * Test checkpointing of the RDD generated by the given operation. It 
tests whether the
    +   * serialized size of the RDD is reduce after checkpointing or not. This 
function should be called
    +   * on all RDDs that have a parent RDD (i.e., do not call on 
ParallelCollection, BlockRDD, etc.).
    +   *
    +   * @param op an operation to run on the RDD
    +   * @param reliableCheckpoint if true, use reliable checkpoints, 
otherwise use local checkpoints
    +   * @param collectFunc a function for collecting the values in the RDD, 
in case there are
    +   *                    non-comparable types like arrays that we want to 
convert to something that supports ==
    +   */
    +  protected def testRDD[U: ClassTag](
    +    op: (RDD[Int]) => RDD[U],
    +    reliableCheckpoint: Boolean,
    +    collectFunc: RDD[U] => Any = defaultCollectFunc[U] _): Unit = {
    +    // Generate the final RDD using given RDD operation
    +    val baseRDD = generateFatRDD()
    +    val operatedRDD = op(baseRDD)
    +    val parentRDD = operatedRDD.dependencies.headOption.orNull
    +    val rddType = operatedRDD.getClass.getSimpleName
    +    val numPartitions = operatedRDD.partitions.length
    +
    +    // Force initialization of all the data structures in RDDs
    +    // Without this, serializing the RDD will give a wrong estimate of the 
size of the RDD
    +    initializeRdd(operatedRDD)
    +
    +    val partitionsBeforeCheckpoint = operatedRDD.partitions
    +
    +    // Find serialized sizes before and after the checkpoint
    +    logInfo("RDD before checkpoint: " + operatedRDD + "\n" + 
operatedRDD.toDebugString)
    +    val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = 
getSerializedSizes(operatedRDD)
    +    checkpoint(operatedRDD, reliableCheckpoint)
    +    val result = collectFunc(operatedRDD)
    +    operatedRDD.collect() // force re-initialization of post-checkpoint 
lazy variables
    +    val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = 
getSerializedSizes(operatedRDD)
    +    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + 
operatedRDD.toDebugString)
    +
    +    // Test whether the checkpoint file has been created
    +    if (reliableCheckpoint) {
    +      assert(
    +        
collectFunc(sparkContext.checkpointFile[U](operatedRDD.getCheckpointFile.get)) 
=== result)
    +    }
    +
    +    // Test whether dependencies have been changed from its earlier parent 
RDD
    +    assert(operatedRDD.dependencies.head.rdd != parentRDD)
    +
    +    // Test whether the partitions have been changed from its earlier 
partitions
    +    assert(operatedRDD.partitions.toList != 
partitionsBeforeCheckpoint.toList)
    +
    +    // Test whether the partitions have been changed to the new Hadoop 
partitions
    +    assert(operatedRDD.partitions.toList === 
operatedRDD.checkpointData.get.getPartitions.toList)
    +
    +    // Test whether the number of partitions is same as before
    +    assert(operatedRDD.partitions.length === numPartitions)
    +
    +    // Test whether the data in the checkpointed RDD is same as original
    +    assert(collectFunc(operatedRDD) === result)
    +
    +    // Test whether serialized size of the RDD has reduced.
    +    logInfo("Size of " + rddType +
    +      " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + 
"]")
    +    assert(
    +      rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
    +      "Size of " + rddType + " did not reduce after checkpointing " +
    +        " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint 
+ "]"
    +    )
    +  }
    +
    +  /**
    +   * Test whether checkpointing of the parent of the generated RDD also
    +   * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to 
its parent
    +   * RDDs partitions. So even if the parent RDD is checkpointed and its 
partitions changed,
    +   * the generated RDD will remember the partitions and therefore 
potentially the whole lineage.
    +   * This function should be called only those RDD whose partitions refer 
to parent RDD's
    +   * partitions (i.e., do not call it on simple RDD like MappedRDD).
    +   *
    +   * @param op an operation to run on the RDD
    +   * @param reliableCheckpoint if true, use reliable checkpoints, 
otherwise use local checkpoints
    +   * @param collectFunc a function for collecting the values in the RDD, 
in case there are
    +   *                    non-comparable types like arrays that we want to 
convert to something
    +   *                    that supports ==
    +   */
    +  protected def testRDDPartitions[U: ClassTag](
    +    op: (RDD[Int]) => RDD[U],
    +    reliableCheckpoint: Boolean,
    +    collectFunc: RDD[U] => Any = defaultCollectFunc[U] _): Unit = {
    --- End diff --
    
    2 more


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to