Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/6294#discussion_r34858236
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
---
@@ -136,12 +141,19 @@ private[streaming] abstract class ReceiverSupervisor(
def stopReceiver(message: String, error: Option[Throwable]): Unit =
synchronized {
try {
logInfo("Stopping receiver with message: " + message + ": " +
error.getOrElse(""))
- receiverState = Stopped
- receiver.onStop()
- logInfo("Called receiver onStop")
- onReceiverStop(message, error)
+ receiverState match {
+ case Initialized =>
+ logWarning("Skip stopping receiver because it has not yet
stared")
+ case Started =>
--- End diff --
Thats the right way to do this. But then by that logic we should start only
when the state is Init or Stopped. We are not doing that. In fact, that leads
to the question of locking the state correctly and all. I dont want to get into
fixing those in this PR.
I think its better not change stopReceiver() and rather change stop() to
call stopReceiver() only when the state is started
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]