Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2464#discussion_r17934707
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
    @@ -272,43 +276,41 @@ abstract class DStream[T: ClassTag] (
       }
     
       /**
    -   * Retrieve a precomputed RDD of this DStream, or computes the RDD. This 
is an internal
    -   * method that should not be called directly.
    +   * Get the RDD corresponding to the given time; either retrieve it from 
cache
    +   * or compute-and-cache it.
        */
       private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
    -    // If this DStream was not initialized (i.e., zeroTime not set), then 
do it
    -    // If RDD was already generated, then retrieve it from HashMap
    -    generatedRDDs.get(time) match {
    -
    -      // If an RDD was already generated and is being reused, then
    -      // probably all RDDs in this DStream will be reused and hence should 
be cached
    -      case Some(oldRDD) => Some(oldRDD)
    -
    -      // if RDD was not generated, and if the time is valid
    -      // (based on sliding time of this DStream), then generate the RDD
    -      case None => {
    -        if (isTimeValid(time)) {
    -          compute(time) match {
    -            case Some(newRDD) =>
    -              if (storageLevel != StorageLevel.NONE) {
    -                newRDD.persist(storageLevel)
    -                logInfo("Persisting RDD " + newRDD.id + " for time " +
    -                  time + " to " + storageLevel + " at time " + time)
    -              }
    -              if (checkpointDuration != null &&
    -                (time - zeroTime).isMultipleOf(checkpointDuration)) {
    -                newRDD.checkpoint()
    -                logInfo("Marking RDD " + newRDD.id + " for time " + time +
    -                  " for checkpointing at time " + time)
    -              }
    -              generatedRDDs.put(time, newRDD)
    -              Some(newRDD)
    -            case None =>
    -              None
    +    // If RDD was already generated, then retrieve it from HashMap,
    +    // or else compute the RDD
    +    generatedRDDs.get(time).orElse {
    +      // Compute the RDD if time is valid (e.g. correct time in a sliding 
window)
    +      // of RDD generation, else generate nothing.
    +      if (isTimeValid(time)) {
    +        // Set the thread-local property for call sites to this DStream's 
creation site
    +        // such that RDDs generated by compute gets that as their creation 
site.
    +        // Note that this `getOrCompute` may get called from another 
DStream which may have
    +        // set its own call site. So we store its call site in a temporary 
variable,
    +        // set this DStream's creation site, generate RDDs and then 
restore the previous call site.
    --- End diff --
    
    How about explaining this from top down, start with what we're trying to do 
(set RDD call sites properly) to how we're doing it (using thread-local 
property). Right now it's easy to get lost if the reader isn't familiar with 
how call sites are set through SparkContext


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to