Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9831#discussion_r45382443
  
    --- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---
    @@ -21,17 +21,222 @@ import java.io.File
     
     import scala.reflect.ClassTag
     
    +import org.apache.spark.CheckpointSuite._
     import org.apache.spark.rdd._
     import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
     import org.apache.spark.util.Utils
     
    +trait RDDCheckpointTester { self: SparkFunSuite =>
    +
    +  protected val partitioner = new HashPartitioner(2)
    +
    +  private def defaultCollectFunc[T](rdd: RDD[T]): Any = rdd.collect()
    +
    +  /** Implementations of this trait must implement this method */
    +  protected def sparkContext: SparkContext
    +
    +  /**
    +   * Test checkpointing of the RDD generated by the given operation. It 
tests whether the
    +   * serialized size of the RDD is reduce after checkpointing or not. This 
function should be called
    +   * on all RDDs that have a parent RDD (i.e., do not call on 
ParallelCollection, BlockRDD, etc.).
    +   *
    +   * @param op an operation to run on the RDD
    +   * @param reliableCheckpoint if true, use reliable checkpoints, 
otherwise use local checkpoints
    +   * @param collectFunc a function for collecting the values in the RDD, 
in case there are
    +   *                    non-comparable types like arrays that we want to 
convert to something that supports ==
    +   */
    +  protected def testRDD[U: ClassTag](
    +    op: (RDD[Int]) => RDD[U],
    +    reliableCheckpoint: Boolean,
    +    collectFunc: RDD[U] => Any = defaultCollectFunc[U] _): Unit = {
    +    // Generate the final RDD using given RDD operation
    +    val baseRDD = generateFatRDD()
    +    val operatedRDD = op(baseRDD)
    +    val parentRDD = operatedRDD.dependencies.headOption.orNull
    +    val rddType = operatedRDD.getClass.getSimpleName
    +    val numPartitions = operatedRDD.partitions.length
    +
    +    // Force initialization of all the data structures in RDDs
    +    // Without this, serializing the RDD will give a wrong estimate of the 
size of the RDD
    +    initializeRdd(operatedRDD)
    +
    +    val partitionsBeforeCheckpoint = operatedRDD.partitions
    +
    +    // Find serialized sizes before and after the checkpoint
    +    logInfo("RDD before checkpoint: " + operatedRDD + "\n" + 
operatedRDD.toDebugString)
    +    val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = 
getSerializedSizes(operatedRDD)
    +    checkpoint(operatedRDD, reliableCheckpoint)
    +    val result = collectFunc(operatedRDD)
    +    operatedRDD.collect() // force re-initialization of post-checkpoint 
lazy variables
    +    val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = 
getSerializedSizes(operatedRDD)
    +    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + 
operatedRDD.toDebugString)
    +
    +    // Test whether the checkpoint file has been created
    +    if (reliableCheckpoint) {
    +      assert(
    +        
collectFunc(sparkContext.checkpointFile[U](operatedRDD.getCheckpointFile.get)) 
=== result)
    +    }
    +
    +    // Test whether dependencies have been changed from its earlier parent 
RDD
    +    assert(operatedRDD.dependencies.head.rdd != parentRDD)
    +
    +    // Test whether the partitions have been changed from its earlier 
partitions
    +    assert(operatedRDD.partitions.toList != 
partitionsBeforeCheckpoint.toList)
    +
    +    // Test whether the partitions have been changed to the new Hadoop 
partitions
    +    assert(operatedRDD.partitions.toList === 
operatedRDD.checkpointData.get.getPartitions.toList)
    +
    +    // Test whether the number of partitions is same as before
    +    assert(operatedRDD.partitions.length === numPartitions)
    +
    +    // Test whether the data in the checkpointed RDD is same as original
    +    assert(collectFunc(operatedRDD) === result)
    +
    +    // Test whether serialized size of the RDD has reduced.
    +    logInfo("Size of " + rddType +
    +      " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + 
"]")
    +    assert(
    +      rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
    +      "Size of " + rddType + " did not reduce after checkpointing " +
    +        " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint 
+ "]"
    +    )
    +  }
    +
    +  /**
    +   * Test whether checkpointing of the parent of the generated RDD also
    +   * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to 
its parent
    +   * RDDs partitions. So even if the parent RDD is checkpointed and its 
partitions changed,
    +   * the generated RDD will remember the partitions and therefore 
potentially the whole lineage.
    +   * This function should be called only those RDD whose partitions refer 
to parent RDD's
    +   * partitions (i.e., do not call it on simple RDD like MappedRDD).
    +   *
    +   * @param op an operation to run on the RDD
    +   * @param reliableCheckpoint if true, use reliable checkpoints, 
otherwise use local checkpoints
    +   * @param collectFunc a function for collecting the values in the RDD, 
in case there are
    +   *                    non-comparable types like arrays that we want to 
convert to something
    +   *                    that supports ==
    +   */
    +  protected def testRDDPartitions[U: ClassTag](
    +    op: (RDD[Int]) => RDD[U],
    +    reliableCheckpoint: Boolean,
    +    collectFunc: RDD[U] => Any = defaultCollectFunc[U] _): Unit = {
    +    // Generate the final RDD using given RDD operation
    +    val baseRDD = generateFatRDD()
    +    val operatedRDD = op(baseRDD)
    +    val parentRDDs = operatedRDD.dependencies.map(_.rdd)
    +    val rddType = operatedRDD.getClass.getSimpleName
    +
    +    // Force initialization of all the data structures in RDDs
    +    // Without this, serializing the RDD will give a wrong estimate of the 
size of the RDD
    +    initializeRdd(operatedRDD)
    +
    +    // Find serialized sizes before and after the checkpoint
    +    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + 
operatedRDD.toDebugString)
    +    val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = 
getSerializedSizes(operatedRDD)
    +    // checkpoint the parent RDD, not the generated one
    +    parentRDDs.foreach { rdd =>
    +      checkpoint(rdd, reliableCheckpoint)
    +    }
    +    val result = collectFunc(operatedRDD) // force checkpointing
    +    operatedRDD.collect() // force re-initialization of post-checkpoint 
lazy variables
    +    val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = 
getSerializedSizes(operatedRDD)
    +    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + 
operatedRDD.toDebugString)
    +
    +    // Test whether the data in the checkpointed RDD is same as original
    +    assert(collectFunc(operatedRDD) === result)
    +
    +    // Test whether serialized size of the partitions has reduced
    +    logInfo("Size of partitions of " + rddType +
    +      " [" + partitionSizeBeforeCheckpoint + " --> " + 
partitionSizeAfterCheckpoint + "]")
    +    assert(
    +      partitionSizeAfterCheckpoint < partitionSizeBeforeCheckpoint,
    +      "Size of " + rddType + " partitions did not reduce after 
checkpointing parent RDDs" +
    +        " [" + partitionSizeBeforeCheckpoint + " --> " + 
partitionSizeAfterCheckpoint + "]"
    +    )
    +  }
    +
    +  /**
    +   * Get serialized sizes of the RDD and its partitions, in order to test 
whether the size shrinks
    +   * upon checkpointing. Ignores the checkpointData field, which may grow 
when we checkpoint.
    +   */
    +  private def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
    +    val rddSize = Utils.serialize(rdd).size
    +    val rddCpDataSize = Utils.serialize(rdd.checkpointData).size
    +    val rddPartitionSize = Utils.serialize(rdd.partitions).size
    +    val rddDependenciesSize = Utils.serialize(rdd.dependencies).size
    +
    +    // Print detailed size, helps in debugging
    +    logInfo("Serialized sizes of " + rdd +
    +      ": RDD = " + rddSize +
    +      ", RDD checkpoint data = " + rddCpDataSize +
    +      ", RDD partitions = " + rddPartitionSize +
    +      ", RDD dependencies = " + rddDependenciesSize
    +    )
    +    // this makes sure that serializing the RDD's checkpoint data does not
    +    // serialize the whole RDD as well
    +    assert(
    +      rddSize > rddCpDataSize,
    +      "RDD's checkpoint data (" + rddCpDataSize + ") is equal or larger 
than the " +
    +        "whole RDD with checkpoint data (" + rddSize + ")"
    +    )
    +    (rddSize - rddCpDataSize, rddPartitionSize)
    +  }
    +
    +  /**
    +   * Serialize and deserialize an object. This is useful to verify the 
objects
    +   * contents after deserialization (e.g., the contents of an RDD split 
after
    +   * it is sent to a slave along with a task)
    +   */
    +  protected def serializeDeserialize[T](obj: T): T = {
    +    val bytes = Utils.serialize(obj)
    +    Utils.deserialize[T](bytes)
    +  }
    +
    +  /**
    +   * Recursively force the initialization of the all members of an RDD and 
it parents.
    +   */
    +  private def initializeRdd(rdd: RDD[_]): Unit = {
    +    rdd.partitions // forces the initialization of the partitions
    +    rdd.dependencies.map(_.rdd).foreach(initializeRdd)
    +  }
    +
    +  /** Checkpoint the RDD either locally or reliably. */
    +  protected def checkpoint(rdd: RDD[_], reliableCheckpoint: Boolean): Unit 
= {
    +    if (reliableCheckpoint) {
    +      rdd.checkpoint()
    +    } else {
    +      rdd.localCheckpoint()
    +    }
    +  }
    +
    +  /** Run a test twice, once for local checkpointing and once for reliable 
checkpointing. */
    +  protected def runTest(name: String)(body: Boolean => Unit): Unit = {
    +    test(name + " [reliable checkpoint]")(body(true))
    +    test(name + " [local checkpoint]")(body(false))
    +  }
    +
    +  /**
    +   * Generate an RDD such that both the RDD and its partitions have large 
size.
    +   */
    +  protected def generateFatRDD(): RDD[Int] = {
    +    new FatRDD(sparkContext.makeRDD(1 to 100, 4)).map(x => x)
    +  }
    +
    +  /**
    +   * Generate an pair RDD (with partitioner) such that both the RDD and 
its partitions
    +   * have large size.
    +   */
    +  protected def generateFatPairRDD(): RDD[(Int, Int)] = {
    +    new FatPairRDD(sparkContext.makeRDD(1 to 100, 4), 
partitioner).mapValues(x => x)
    +  }
    +}
    --- End diff --
    
    this is a lot of code. Can you move it to a new file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to