Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/7913#discussion_r36277317
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
---
@@ -92,29 +95,43 @@ private[streaming] class BlockGenerator(
@volatile private var stopped = false
/** Start block generating and pushing threads. */
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- logInfo("Started BlockGenerator")
+ def start(): Unit = {
+ if (!stopped) {
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Started BlockGenerator")
+ } else {
+ throw new SparkException("Cannot start BlockGenerator as already
stopped")
+ }
}
/** Stop all threads. */
- def stop() {
- logInfo("Stopping BlockGenerator")
- blockIntervalTimer.stop(interruptTimer = false)
- stopped = true
- logInfo("Waiting for block pushing thread")
- blockPushingThread.join()
- logInfo("Stopped BlockGenerator")
+ def stop(): Unit = {
+ if (!stopped) {
+ // First stop generating block, then mark it as non-active and wait
for pushing thread to stop
+ logInfo("Stopping BlockGenerator")
+ blockIntervalTimer.stop(interruptTimer = false)
+ stopped = true
+ logInfo("Waiting for block pushing thread")
+ blockPushingThread.join()
+ logInfo("Stopped BlockGenerator")
+ } else {
+ logWarning("BlockGenerator not started")
+ }
}
/**
* Push a single data item into the buffer. All received data items
* will be periodically pushed into BlockManager.
*/
- def addData (data: Any): Unit = synchronized {
- waitToPush()
- currentBuffer += data
+ def addData(data: Any): Unit = synchronized {
+ if (!stopped) {
+ waitToPush()
--- End diff --
Good catch. However, i dont want to add a synchronized around a lengthy
computations and stop() can be lengthy. I think it may be a better idea to
expand the "active" boolean to an enum which has the following states -
Initiated, Active, AddingDataStopped, BlockGeneratingStopped, AllStopped. This
variable will be volatile.
- Data adding: When state is Active, data can be added. Checking state and
inserting to buffer is synchronized (not the ratelimiter waiting).
- Block pushing: When state is Active, keeping polling and pushing blocks.
When state is BlockGeneratingStopped, drain the queue of blocks and terminate
the thread.
- When stop() is called...
- The state is changed to AddingDataStopped immediately. This will stop
new data from being added. This is synchronized.
- Wait for timer to stop, and then mark state as BlockGeneratingStopped.
This will enable the block pushing thread to start draining the block queue.
- Wait for block pushing thread to join, and then mark state as
AllStopped.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]