Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4588#discussion_r26957187
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ---
    @@ -17,58 +17,65 @@
     
     package org.apache.spark.deploy.worker
     
    -import akka.actor.{Actor, Address, AddressFromURIString}
    -import akka.remote.{AssociatedEvent, AssociationErrorEvent, 
AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}
    -
     import org.apache.spark.Logging
     import org.apache.spark.deploy.DeployMessages.SendHeartbeat
    -import org.apache.spark.util.ActorLogReceive
    +import org.apache.spark.rpc._
     
     /**
      * Actor which connects to a worker process and terminates the JVM if the 
connection is severed.
      * Provides fate sharing between a worker and its associated child 
processes.
      */
    -private[spark] class WorkerWatcher(workerUrl: String)
    -  extends Actor with ActorLogReceive with Logging {
    -
    -  override def preStart() {
    -    context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
    +private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: 
String)
    +  extends RpcEndpoint with Logging {
     
    +  override def onStart() {
         logInfo(s"Connecting to worker $workerUrl")
    -    val worker = context.actorSelection(workerUrl)
    -    worker ! SendHeartbeat // need to send a message here to initiate 
connection
    +    if (!isTesting) {
    +      rpcEnv.asyncSetupEndpointRefByUrl(workerUrl)
    +    }
       }
     
       // Used to avoid shutting down JVM during tests
    +  // In the normal case, exitNonZero will call `System.exit(-1)` to 
shutdown the JVM. In the unit
    +  // test, the user should call `setTesting(true)` so that `exitNonZero` 
will set `isShutDown` to
    +  // true rather than calling `System.exit`. The user can check 
`isShutDown` to know if
    +  // `exitNonZero` is called.
       private[deploy] var isShutDown = false
       private[deploy] def setTesting(testing: Boolean) = isTesting = testing
       private var isTesting = false
     
       // Lets us filter events only from the worker's actor system
    -  private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
    -  private def isWorker(address: Address) = address.hostPort == 
expectedHostPort
    +  private val expectedHostPort = new java.net.URI(workerUrl)
    +  private def isWorker(address: RpcAddress) = {
    +    expectedHostPort.getHost == address.host && expectedHostPort.getPort 
== address.port
    +  }
     
       def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
     
    -  override def receiveWithLogging = {
    -    case AssociatedEvent(localAddress, remoteAddress, inbound) if 
isWorker(remoteAddress) =>
    -      logInfo(s"Successfully connected to $workerUrl")
    +  override def receive = {
    +    case e => logWarning(s"Received unexpected actor system event: $e")
    --- End diff --
    
    nit: "actor system" is very akka-specific.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to