Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/3868#discussion_r22447455 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala --- @@ -93,27 +93,18 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { if (processReceivedData) { logInfo("Stopping JobGenerator gracefully") - val timeWhenStopStarted = System.currentTimeMillis() val stopTimeout = conf.getLong( "spark.streaming.gracefulStopTimeout", 10 * ssc.graph.batchDuration.milliseconds ) - val pollTime = 100 - - // To prevent graceful stop to get stuck permanently - def hasTimedOut = { - val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout - if (timedOut) { - logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")") - } - timedOut - } + // Wait until all the received blocks in the network input tracker has // been consumed by network input DStreams, and jobs have been generated with them logInfo("Waiting for all received blocks to be consumed for job generation") - while(!hasTimedOut && jobScheduler.receiverTracker.hasUnallocatedBlocks) { - Thread.sleep(pollTime) + if (!TimeoutUtils.waitUntilDone(stopTimeout, --- End diff -- Actually, instead of this utility, it might be a better idea to use the `clock` in this class, which has the `waitForTime` method. In fact using that clock allows us to check stuff in unit tests using manual clock. Might help you in fact in implementing a unit test for this.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org