I thought I'd ask first since there's a good chance this isn't a problem, but, I'm having a problem wherein the first batch that Spark Streaming processes fails (due to an app problem), but then, stop() blocks a very long time.
This bit of JobGenerator.stop() executes, since the message appears in the logs: def haveAllBatchesBeenProcessed = { lastProcessedBatch != null && lastProcessedBatch.milliseconds == stopTime } logInfo("Waiting for jobs to be processed and checkpoints to be written") while (!hasTimedOut && !haveAllBatchesBeenProcessed) { Thread.sleep(pollTime) } // ... 10x batch duration wait here, before seeing the next line log: logInfo("Waited for jobs to be processed and checkpoints to be written") I think that lastProcessedBatch is always null since no batch ever succeeds. Of course, for all this code knows, the next batch might succeed and so is there waiting for it. But it should proceed after one more batch completes, even if it failed? JobGenerator.onBatchCompleted is only called for a successful batch. Can it be called if it fails too? I think that would fix it. Should the condition also not be lastProcessedBatch.milliseconds <= stopTime instead of == ? Thanks for any pointers. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org