Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/3868#discussion_r23708402
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
---
@@ -93,27 +93,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends
Logging {
if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
- val timeWhenStopStarted = System.currentTimeMillis()
- val stopTimeout = conf.getLong(
- "spark.streaming.gracefulStopTimeout",
- 10 * ssc.graph.batchDuration.milliseconds
- )
- val pollTime = 100
-
- // To prevent graceful stop to get stuck permanently
- def hasTimedOut = {
- val timedOut = System.currentTimeMillis() - timeWhenStopStarted >
stopTimeout
- if (timedOut) {
- logWarning("Timed out while stopping the job generator (timeout
= " + stopTimeout + ")")
- }
- timedOut
- }
+
// Wait until all the received blocks in the network input tracker
has
// been consumed by network input DStreams, and jobs have been
generated with them
logInfo("Waiting for all received blocks to be consumed for job
generation")
- while(!hasTimedOut &&
jobScheduler.receiverTracker.hasUnallocatedBlocks) {
- Thread.sleep(pollTime)
+ while (jobScheduler.receiverTracker.hasUnallocatedBlocks) {
+ Thread.sleep(100)
--- End diff --
Please unify both the 100 in this function into a single variable, called
pollInterval
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]