Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/5268#discussion_r27490525
--- Diff:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
---
@@ -21,39 +21,43 @@ import java.net.URL
import java.nio.ByteBuffer
import scala.collection.mutable
-import scala.concurrent.Await
+import scala.util.{Failure, Success}
-import akka.actor.{Actor, ActorSelection, Props}
-import akka.pattern.Patterns
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.rpc._
+import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger,
Utils}
+import org.apache.spark.util.{SignalLogger, Utils}
private[spark] class CoarseGrainedExecutorBackend(
+ override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
- extends Actor with ActorLogReceive with ExecutorBackend with Logging {
+ extends RpcEndpoint with ExecutorBackend with Logging {
Utils.checkHostPort(hostPort, "Expected hostport")
var executor: Executor = null
- var driver: ActorSelection = null
+ @volatile var driver: Option[RpcEndpointRef] = None
- override def preStart() {
+ override def onStart() {
+ import scala.concurrent.ExecutionContext.Implicits.global
logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorSelection(driverUrl)
- driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
- context.system.eventStream.subscribe(self,
classOf[RemotingLifecycleEvent])
+ rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
+ driver = Some(ref)
+ ref.sendWithReply[RegisteredExecutor.type](
+ RegisterExecutor(executorId, self, hostPort, cores,
extractLogUrls))
+ } onComplete {
+ case Success(msg) => self.send(msg)
+ case Failure(e) => logError(s"Cannot register to driver:
$driverUrl", e)
--- End diff --
Fixed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]