Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/7913#discussion_r36275319
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
---
@@ -92,29 +95,43 @@ private[streaming] class BlockGenerator(
@volatile private var stopped = false
/** Start block generating and pushing threads. */
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- logInfo("Started BlockGenerator")
+ def start(): Unit = {
+ if (!stopped) {
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Started BlockGenerator")
+ } else {
+ throw new SparkException("Cannot start BlockGenerator as already
stopped")
+ }
}
/** Stop all threads. */
- def stop() {
- logInfo("Stopping BlockGenerator")
- blockIntervalTimer.stop(interruptTimer = false)
- stopped = true
- logInfo("Waiting for block pushing thread")
- blockPushingThread.join()
- logInfo("Stopped BlockGenerator")
+ def stop(): Unit = {
+ if (!stopped) {
+ // First stop generating block, then mark it as non-active and wait
for pushing thread to stop
+ logInfo("Stopping BlockGenerator")
+ blockIntervalTimer.stop(interruptTimer = false)
+ stopped = true
+ logInfo("Waiting for block pushing thread")
+ blockPushingThread.join()
+ logInfo("Stopped BlockGenerator")
+ } else {
+ logWarning("BlockGenerator not started")
+ }
}
/**
* Push a single data item into the buffer. All received data items
* will be periodically pushed into BlockManager.
*/
- def addData (data: Any): Unit = synchronized {
- waitToPush()
- currentBuffer += data
+ def addData(data: Any): Unit = synchronized {
+ if (!stopped) {
+ waitToPush()
--- End diff --
Do you want to avoid adding any data after BlockGenerator stops? If so, it
can still happen because `def stop()` doesn't use `synchronized`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]