Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2464#discussion_r17811250
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
    @@ -272,43 +286,41 @@ abstract class DStream[T: ClassTag] (
       }
     
       /**
    -   * Retrieve a precomputed RDD of this DStream, or computes the RDD. This 
is an internal
    -   * method that should not be called directly.
    +   * Get the RDD corresponding to the given time; either retrieve it from 
cache
    +   * or compute-and-cache it.
        */
       private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
    -    // If this DStream was not initialized (i.e., zeroTime not set), then 
do it
    -    // If RDD was already generated, then retrieve it from HashMap
    -    generatedRDDs.get(time) match {
    -
    -      // If an RDD was already generated and is being reused, then
    -      // probably all RDDs in this DStream will be reused and hence should 
be cached
    -      case Some(oldRDD) => Some(oldRDD)
    -
    -      // if RDD was not generated, and if the time is valid
    -      // (based on sliding time of this DStream), then generate the RDD
    -      case None => {
    -        if (isTimeValid(time)) {
    -          compute(time) match {
    -            case Some(newRDD) =>
    -              if (storageLevel != StorageLevel.NONE) {
    -                newRDD.persist(storageLevel)
    -                logInfo("Persisting RDD " + newRDD.id + " for time " +
    -                  time + " to " + storageLevel + " at time " + time)
    -              }
    -              if (checkpointDuration != null &&
    -                (time - zeroTime).isMultipleOf(checkpointDuration)) {
    -                newRDD.checkpoint()
    -                logInfo("Marking RDD " + newRDD.id + " for time " + time +
    -                  " for checkpointing at time " + time)
    -              }
    -              generatedRDDs.put(time, newRDD)
    -              Some(newRDD)
    -            case None =>
    -              None
    +    // If RDD was already generated, then retrieve it from HashMap,
    +    // or else compute the RDD
    +    generatedRDDs.get(time).orElse {
    --- End diff --
    
    These changes are just a refactoring of the code (from `case Some` and 
`case None` to `Option.orElse`), with no change in the logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to