Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/4588#discussion_r26957187
--- Diff:
core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ---
@@ -17,58 +17,65 @@
package org.apache.spark.deploy.worker
-import akka.actor.{Actor, Address, AddressFromURIString}
-import akka.remote.{AssociatedEvent, AssociationErrorEvent,
AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}
-
import org.apache.spark.Logging
import org.apache.spark.deploy.DeployMessages.SendHeartbeat
-import org.apache.spark.util.ActorLogReceive
+import org.apache.spark.rpc._
/**
* Actor which connects to a worker process and terminates the JVM if the
connection is severed.
* Provides fate sharing between a worker and its associated child
processes.
*/
-private[spark] class WorkerWatcher(workerUrl: String)
- extends Actor with ActorLogReceive with Logging {
-
- override def preStart() {
- context.system.eventStream.subscribe(self,
classOf[RemotingLifecycleEvent])
+private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl:
String)
+ extends RpcEndpoint with Logging {
+ override def onStart() {
logInfo(s"Connecting to worker $workerUrl")
- val worker = context.actorSelection(workerUrl)
- worker ! SendHeartbeat // need to send a message here to initiate
connection
+ if (!isTesting) {
+ rpcEnv.asyncSetupEndpointRefByUrl(workerUrl)
+ }
}
// Used to avoid shutting down JVM during tests
+ // In the normal case, exitNonZero will call `System.exit(-1)` to
shutdown the JVM. In the unit
+ // test, the user should call `setTesting(true)` so that `exitNonZero`
will set `isShutDown` to
+ // true rather than calling `System.exit`. The user can check
`isShutDown` to know if
+ // `exitNonZero` is called.
private[deploy] var isShutDown = false
private[deploy] def setTesting(testing: Boolean) = isTesting = testing
private var isTesting = false
// Lets us filter events only from the worker's actor system
- private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
- private def isWorker(address: Address) = address.hostPort ==
expectedHostPort
+ private val expectedHostPort = new java.net.URI(workerUrl)
+ private def isWorker(address: RpcAddress) = {
+ expectedHostPort.getHost == address.host && expectedHostPort.getPort
== address.port
+ }
def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
- override def receiveWithLogging = {
- case AssociatedEvent(localAddress, remoteAddress, inbound) if
isWorker(remoteAddress) =>
- logInfo(s"Successfully connected to $workerUrl")
+ override def receive = {
+ case e => logWarning(s"Received unexpected actor system event: $e")
--- End diff --
nit: "actor system" is very akka-specific.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]