Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6070#discussion_r30104275
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -671,6 +683,50 @@ object StreamingContext extends Logging {
       }
     
       /**
    +   * Either return the "active" StreamingContext (that is, started but not 
stopped), or create a
    +   * new StreamingContext that is
    +   * @param creatingFunc   Function to create a new StreamingContext
    +   */
    +  @Experimental
    +  def getActiveOrCreate(creatingFunc: () => StreamingContext): 
StreamingContext = {
    +    ACTIVATION_LOCK.synchronized {
    +      getActive().getOrElse {
    +        creatingFunc()
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Either get the currently active StreamingContext (that is, started 
but not stopped),
    +   * OR recreate a StreamingContext from checkpoint data in the given 
path. If checkpoint data
    +   * does not exist in the provided, then create a new StreamingContext by 
calling the provided
    +   * `creatingFunc`.
    +   *
    +   * @param checkpointPath Checkpoint directory used in an earlier 
StreamingContext program
    +   * @param creatingFunc   Function to create a new StreamingContext
    +   * @param hadoopConf     Optional Hadoop configuration if necessary for 
reading from the
    +   *                       file system
    +   * @param createOnError  Optional, whether to create a new 
StreamingContext if there is an
    +   *                       error in reading checkpoint data. By default, 
an exception will be
    +   *                       thrown on error.
    +   */
    +  @Experimental
    +  def getActiveOrCreate(
    +      checkpointPath: String,
    +      creatingFunc: () => StreamingContext,
    +      hadoopConf: Configuration = new Configuration(),
    +      createOnError: Boolean = false
    +    ): StreamingContext = {
    +    ACTIVATION_LOCK.synchronized {
    +      getActive().getOrElse {
    +        val checkpointOption = CheckpointReader.read(
    --- End diff --
    
    Why not call `getOrCreate(checkpointPath, creatingFunc, hadoopConf, 
createOnError)` directly? It would eliminate the duplicate codes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to