Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/247#discussion_r11282497
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
    @@ -77,30 +82,79 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
         }
       }
     
    -  /** Stop generation of jobs */
    -  def stop() = synchronized {
    -    if (eventActor != null) {
    +  /**
    +   * Stop generation of jobs. processAllReceivedData = true makes this 
wait until jobs
    +   * of current ongoing time interval has been generated, processed and 
corresponding
    +   * checkpoints written.
    +   */
    +  def stop(processAllReceivedData: Boolean): Unit = synchronized {
    +    if (eventActor == null) return // generator has already been stopped
    +
    +    if (processAllReceivedData) {
    +      logInfo("Stopping JobGenerator gracefully")
    +      val timeWhenStopStarted = System.currentTimeMillis()
    +      val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds
    +      val pollTime = 100
    +
    +      // To prevent graceful stop to get stuck permanently
    +      def hasTimedOut = {
    +        val timedOut = System.currentTimeMillis() - timeWhenStopStarted > 
stopTimeout
    +        if (timedOut) logWarning("Timed out while stopping the job 
generator")
    +        timedOut
    +      }
    +
    +      // Wait until all the received blocks in the network input tracker 
has
    +      // been consumed by network input DStreams, and jobs have been 
generated with them
    +      logInfo("Waiting for all received blocks to be consumed for job 
generation")
    +      while(!hasTimedOut && 
jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) {
    +        Thread.sleep(pollTime)
    +      }
    +      logInfo("Waited for all received blocsk to be consumed for job 
generation")
    +
    +      // Stop generating jobs
    +      val stopTime = timer.stop(stopAfterNextCallback = true)
    +      graph.stop()
    +      logInfo("Stopped generation timer")
    +
    +      // Wait for the jobs to complete and checkpoints to be written
    +      def hasAllBatchesBeenFullyProcessed = {
    --- End diff --
    
    haveAllBatchesBeenProcessed. Slightly shorter, grammatically better than 
allBatchesProcessed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to