[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87515130 [Test build #29379 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29379/consoleFull) for PR 4588 at commit [`f6f3287`](https://github.com/apache/spark/commit/f6f3287092097f40d8f159321a55b5164cb10968). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363583 --- Diff: core/src/main/scala/org/apache/spark/util/RpcUtils.scala --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} + +object RpcUtils { + + /** + * Retrieve a [[RpcEndpointRef]] which is located in the driver via its name. + */ + def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { +val driverActorSystemName = SparkEnv.driverActorSystemName +val driverHost: String = conf.get(spark.driver.host, localhost) +val driverPort: Int = conf.getInt(spark.driver.port, 7077) +Utils.checkHost(driverHost, Expected hostname) --- End diff -- I copied it from AkkaUtils.makeDriverRef. Looks it's a legacy code because the current `checkHost` does not really verify the host: ```Scala def checkHost(host: String, message: String = ) { assert(host.indexOf(':') == -1, message) } ``` Checking `:` is not enough if we really want to check it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87549328 @rxin @CodingCat @vanzin @aarondav Thanks for reviewing this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87513980 [Test build #29378 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29378/consoleFull) for PR 4588 at commit [`8bd1097`](https://github.com/apache/spark/commit/8bd10973798f80a30d7bf37ff3fdf40a953e5da7). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363465 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363678 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] --- End diff -- Changes urls to URIs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363681 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala --- @@ -17,58 +17,65 @@ package org.apache.spark.deploy.worker -import akka.actor.{Actor, Address, AddressFromURIString} -import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent} - import org.apache.spark.Logging import org.apache.spark.deploy.DeployMessages.SendHeartbeat -import org.apache.spark.util.ActorLogReceive +import org.apache.spark.rpc._ /** * Actor which connects to a worker process and terminates the JVM if the connection is severed. * Provides fate sharing between a worker and its associated child processes. */ -private[spark] class WorkerWatcher(workerUrl: String) - extends Actor with ActorLogReceive with Logging { - - override def preStart() { -context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) +private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: String) + extends RpcEndpoint with Logging { + override def onStart() { logInfo(sConnecting to worker $workerUrl) -val worker = context.actorSelection(workerUrl) -worker ! SendHeartbeat // need to send a message here to initiate connection +if (!isTesting) { + rpcEnv.asyncSetupEndpointRefByUrl(workerUrl) +} } // Used to avoid shutting down JVM during tests + // In the normal case, exitNonZero will call `System.exit(-1)` to shutdown the JVM. In the unit + // test, the user should call `setTesting(true)` so that `exitNonZero` will set `isShutDown` to + // true rather than calling `System.exit`. The user can check `isShutDown` to know if + // `exitNonZero` is called. private[deploy] var isShutDown = false private[deploy] def setTesting(testing: Boolean) = isTesting = testing private var isTesting = false // Lets us filter events only from the worker's actor system - private val expectedHostPort = AddressFromURIString(workerUrl).hostPort - private def isWorker(address: Address) = address.hostPort == expectedHostPort + private val expectedHostPort = new java.net.URI(workerUrl) --- End diff -- Updated to use ` RpcAddress.fromUriString` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87533809 [Test build #29379 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29379/consoleFull) for PR 4588 at commit [`f6f3287`](https://github.com/apache/spark/commit/f6f3287092097f40d8f159321a55b5164cb10968). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/4588 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87532900 [Test build #29378 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29378/consoleFull) for PR 4588 at commit [`8bd1097`](https://github.com/apache/spark/commit/8bd10973798f80a30d7bf37ff3fdf40a953e5da7). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87532945 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29378/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363528 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363505 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) --- End diff -- I used it to make sure the laziness thing is correct. But now it's not necessary. I changed it to `assert`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363513 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363474 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363536 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363480 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) --- End diff -- Added docs for `endpointRef` about a non-existent endpoint --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363917 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + assert(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87535876 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29380/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87535863 [Test build #29380 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29380/consoleFull) for PR 4588 at commit [`fe3df4c`](https://github.com/apache/spark/commit/fe3df4cbd9efa052803f0c3d12544874b649728b). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87537771 Thanks. I've merged this in master. Let's start replacing the direct use of actors with this API. Great work! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363384 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363366 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27364030 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + assert(endpointRef != null) + registerEndpoint(endpoint, endpointRef) --- End diff -- Ah, sorry. Yes, it can be moved. Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27364040 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + assert(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87520468 [Test build #29380 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29380/consoleFull) for PR 4588 at commit [`fe3df4c`](https://github.com/apache/spark/commit/fe3df4cbd9efa052803f0c3d12544874b649728b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363808 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27363828 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + assert(endpointRef != null) + registerEndpoint(endpoint, endpointRef) --- End diff -- Ah - I think there was a miscommunication on theses lines. I was not worried about require vs assert, I was wondering if we could just invoke `registerEndpoint(endpoint, endpointRef)` right before `endpointRef.init()`, on
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87533813 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29379/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348578 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348715 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) --- End diff -- I believe I am missing something here, but what prevents us from invoking these methods after setting endpointRef (inside setupThreadSafeEndpoint, rather than the Actor's constructor)? This would mean that the actor is not valid immediately after construction, but I think the laziness thing is making a
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348835 --- Diff: core/src/main/scala/org/apache/spark/util/RpcUtils.scala --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} + +object RpcUtils { + + /** + * Retrieve a [[RpcEndpointRef]] which is located in the driver via its name. + */ + def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { +val driverActorSystemName = SparkEnv.driverActorSystemName +val driverHost: String = conf.get(spark.driver.host, localhost) +val driverPort: Int = conf.getInt(spark.driver.port, 7077) +Utils.checkHost(driverHost, Expected hostname) --- End diff -- Why is this a requirement? For Akka's sake? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348819 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348651 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348855 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348684 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) --- End diff -- This behavior should be documented (that it is an exception to call this on a non-existent endpoint), and that this method will never return null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348730 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-87317475 Just a few mainly minor comments, otherwise LGTM. We can iterate on some of the API moving forward as we get more endpoints using it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348783 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348800 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348625 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348604 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348886 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] --- End diff -- Should these be URIs instead? URIs are, I believe, a strict superset of URLs and do not encode any notions about hostname resolution, and we refer to URIs elsewhere, so probably should stick with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27348955 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala --- @@ -17,58 +17,65 @@ package org.apache.spark.deploy.worker -import akka.actor.{Actor, Address, AddressFromURIString} -import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent} - import org.apache.spark.Logging import org.apache.spark.deploy.DeployMessages.SendHeartbeat -import org.apache.spark.util.ActorLogReceive +import org.apache.spark.rpc._ /** * Actor which connects to a worker process and terminates the JVM if the connection is severed. * Provides fate sharing between a worker and its associated child processes. */ -private[spark] class WorkerWatcher(workerUrl: String) - extends Actor with ActorLogReceive with Logging { - - override def preStart() { -context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) +private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: String) + extends RpcEndpoint with Logging { + override def onStart() { logInfo(sConnecting to worker $workerUrl) -val worker = context.actorSelection(workerUrl) -worker ! SendHeartbeat // need to send a message here to initiate connection +if (!isTesting) { + rpcEnv.asyncSetupEndpointRefByUrl(workerUrl) +} } // Used to avoid shutting down JVM during tests + // In the normal case, exitNonZero will call `System.exit(-1)` to shutdown the JVM. In the unit + // test, the user should call `setTesting(true)` so that `exitNonZero` will set `isShutDown` to + // true rather than calling `System.exit`. The user can check `isShutDown` to know if + // `exitNonZero` is called. private[deploy] var isShutDown = false private[deploy] def setTesting(testing: Boolean) = isTesting = testing private var isTesting = false // Lets us filter events only from the worker's actor system - private val expectedHostPort = AddressFromURIString(workerUrl).hostPort - private def isWorker(address: Address) = address.hostPort == expectedHostPort + private val expectedHostPort = new java.net.URI(workerUrl) --- End diff -- Perhaps RpcAddress.fromUriString(workerUrl) and use of the == method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85417800 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29073/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85417764 [Test build #29073 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29073/consoleFull) for PR 4588 at commit [`9ffa997`](https://github.com/apache/spark/commit/9ffa9979cc08b0338411465f736f030a67122304). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85422331 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85423410 [Test build #29078 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29078/consoleFull) for PR 4588 at commit [`9ffa997`](https://github.com/apache/spark/commit/9ffa9979cc08b0338411465f736f030a67122304). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85451891 [Test build #29078 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29078/consoleFull) for PR 4588 at commit [`9ffa997`](https://github.com/apache/spark/commit/9ffa9979cc08b0338411465f736f030a67122304). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85451900 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29078/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85358650 [Test build #29057 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29057/consoleFull) for PR 4588 at commit [`385b9c3`](https://github.com/apache/spark/commit/385b9c3e34d39adcddccfbeaeea29cbb0419270a). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005801 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala --- @@ -17,58 +17,65 @@ package org.apache.spark.deploy.worker -import akka.actor.{Actor, Address, AddressFromURIString} -import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent} - import org.apache.spark.Logging import org.apache.spark.deploy.DeployMessages.SendHeartbeat -import org.apache.spark.util.ActorLogReceive +import org.apache.spark.rpc._ /** * Actor which connects to a worker process and terminates the JVM if the connection is severed. * Provides fate sharing between a worker and its associated child processes. */ -private[spark] class WorkerWatcher(workerUrl: String) - extends Actor with ActorLogReceive with Logging { - - override def preStart() { -context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) +private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: String) + extends RpcEndpoint with Logging { + override def onStart() { logInfo(sConnecting to worker $workerUrl) -val worker = context.actorSelection(workerUrl) -worker ! SendHeartbeat // need to send a message here to initiate connection +if (!isTesting) { + rpcEnv.asyncSetupEndpointRefByUrl(workerUrl) +} } // Used to avoid shutting down JVM during tests + // In the normal case, exitNonZero will call `System.exit(-1)` to shutdown the JVM. In the unit + // test, the user should call `setTesting(true)` so that `exitNonZero` will set `isShutDown` to + // true rather than calling `System.exit`. The user can check `isShutDown` to know if + // `exitNonZero` is called. private[deploy] var isShutDown = false private[deploy] def setTesting(testing: Boolean) = isTesting = testing private var isTesting = false // Lets us filter events only from the worker's actor system - private val expectedHostPort = AddressFromURIString(workerUrl).hostPort - private def isWorker(address: Address) = address.hostPort == expectedHostPort + private val expectedHostPort = new java.net.URI(workerUrl) + private def isWorker(address: RpcAddress) = { +expectedHostPort.getHost == address.host expectedHostPort.getPort == address.port + } def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1) - override def receiveWithLogging = { -case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) = - logInfo(sSuccessfully connected to $workerUrl) + override def receive = { +case e = logWarning(sReceived unexpected actor system event: $e) --- End diff -- Fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005797 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005781 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005771 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005780 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005722 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005775 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005759 --- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala --- @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException} + +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{SparkException, SparkConf} + +/** + * Common tests for an RpcEnv implementation. + */ +abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { + + var env: RpcEnv = _ + + override def beforeAll(): Unit = { +val conf = new SparkConf() +env = createRpcEnv(conf, local, 12345) + } + + override def afterAll(): Unit = { +if(env != null) { + env.shutdown() +} + } + + def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv + + test(send a message locally) { +@volatile var message: String = null +val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) +rpcEndpointRef.send(hello) +eventually(timeout(5 seconds), interval(10 millis)) { + assert(hello === message) +} + } + + test(send a message remotely) { +@volatile var message: String = null +// Set up a RpcEndpoint using env +env.setupEndpoint(send-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, send-remotely) +try { + rpcEndpointRef.send(hello) + eventually(timeout(5 seconds), interval(10 millis)) { +assert(hello === message) + } +} finally { + anotherEnv.shutdown() + anotherEnv.awaitTermination() +} + } + + test(send a RpcEndpointRef) { +val endpoint = new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case Hello = context.reply(self) +case Echo = context.reply(Echo) + } +} +val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint) + +val newRpcEndpointRef = rpcEndpointRef.askWithReply[RpcEndpointRef](Hello) +val reply = newRpcEndpointRef.askWithReply[String](Echo) +assert(Echo === reply) + } + + test(ask a message locally) { +val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) +val reply = rpcEndpointRef.askWithReply[String](hello) +assert(hello === reply) + } + + test(ask a message remotely) { +env.setupEndpoint(ask-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, ask-remotely)
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85364386 Left mostly minor comments, otherwise looks good. We can iron out any kinks later. Fixed them as per your comments. Thanks @vanzin There's just some odd code in the test suite, where you're calling stop in a shared RPC env variable. That looks a little suspicious. It stops the RpcEndpoint rather than RpcEnv. For deprecating configurations , I created https://issues.apache.org/jira/browse/SPARK-6490 to trace it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27005671 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85378974 [Test build #29057 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29057/consoleFull) for PR 4588 at commit [`385b9c3`](https://github.com/apache/spark/commit/385b9c3e34d39adcddccfbeaeea29cbb0419270a). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class OutputCommitCoordinatorEndpoint(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85390217 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29068/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85390216 [Test build #29068 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29068/consoleFull) for PR 4588 at commit [`1299550`](https://github.com/apache/spark/commit/1299550669c1cf3fa8b6adfd5e8f2eea9a82d1aa). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class ExecutorCacheTaskLocation(override val host: String, executorId: String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85378997 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29057/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85390115 [Test build #29068 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29068/consoleFull) for PR 4588 at commit [`1299550`](https://github.com/apache/spark/commit/1299550669c1cf3fa8b6adfd5e8f2eea9a82d1aa). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85392299 [Test build #29073 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29073/consoleFull) for PR 4588 at commit [`9ffa997`](https://github.com/apache/spark/commit/9ffa9979cc08b0338411465f736f030a67122304). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85348733 [Test build #29050 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29050/consoleFull) for PR 4588 at commit [`8da4c44`](https://github.com/apache/spark/commit/8da4c444c39348a2c6ce9890031c498a7027a15e). * This patch **passes all tests**. * This patch **does not merge cleanly**. * This patch adds the following public classes _(experimental)_: * ` class OutputCommitCoordinatorEndpoint(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85348738 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29050/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26957187 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala --- @@ -17,58 +17,65 @@ package org.apache.spark.deploy.worker -import akka.actor.{Actor, Address, AddressFromURIString} -import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent} - import org.apache.spark.Logging import org.apache.spark.deploy.DeployMessages.SendHeartbeat -import org.apache.spark.util.ActorLogReceive +import org.apache.spark.rpc._ /** * Actor which connects to a worker process and terminates the JVM if the connection is severed. * Provides fate sharing between a worker and its associated child processes. */ -private[spark] class WorkerWatcher(workerUrl: String) - extends Actor with ActorLogReceive with Logging { - - override def preStart() { -context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) +private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: String) + extends RpcEndpoint with Logging { + override def onStart() { logInfo(sConnecting to worker $workerUrl) -val worker = context.actorSelection(workerUrl) -worker ! SendHeartbeat // need to send a message here to initiate connection +if (!isTesting) { + rpcEnv.asyncSetupEndpointRefByUrl(workerUrl) +} } // Used to avoid shutting down JVM during tests + // In the normal case, exitNonZero will call `System.exit(-1)` to shutdown the JVM. In the unit + // test, the user should call `setTesting(true)` so that `exitNonZero` will set `isShutDown` to + // true rather than calling `System.exit`. The user can check `isShutDown` to know if + // `exitNonZero` is called. private[deploy] var isShutDown = false private[deploy] def setTesting(testing: Boolean) = isTesting = testing private var isTesting = false // Lets us filter events only from the worker's actor system - private val expectedHostPort = AddressFromURIString(workerUrl).hostPort - private def isWorker(address: Address) = address.hostPort == expectedHostPort + private val expectedHostPort = new java.net.URI(workerUrl) + private def isWorker(address: RpcAddress) = { +expectedHostPort.getHost == address.host expectedHostPort.getPort == address.port + } def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1) - override def receiveWithLogging = { -case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) = - logInfo(sSuccessfully connected to $workerUrl) + override def receive = { +case e = logWarning(sReceived unexpected actor system event: $e) --- End diff -- nit: actor system is very akka-specific. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26958182 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26957574 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26958295 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26958330 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26959685 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26958734 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26959384 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26958715 --- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.akka + +import java.net.URI +import java.util.concurrent.ConcurrentHashMap + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Address} +import akka.pattern.{ask = akkaAsk} +import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import org.apache.spark.{SparkException, Logging, SparkConf} +import org.apache.spark.rpc._ +import org.apache.spark.util.{ActorLogReceive, AkkaUtils} + +/** + * A RpcEnv implementation based on Akka. + * + * TODO Once we remove all usages of Akka in other place, we can move this file to a new project and + * remove Akka from the dependencies. + * + * @param actorSystem + * @param conf + * @param boundPort + */ +private[spark] class AkkaRpcEnv private[akka] ( +val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) + extends RpcEnv(conf) with Logging { + + private val defaultAddress: RpcAddress = { +val address = actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress +// In some test case, ActorSystem doesn't bind to any address. +// So just use some default value since they are only some unit tests +RpcAddress(address.host.getOrElse(localhost), address.port.getOrElse(boundPort)) + } + + override val address: RpcAddress = defaultAddress + + /** + * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. We need it to make + * [[RpcEndpoint.self]] work. + */ + private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]() + + /** + * Need this map to remove `RpcEndpoint` from `endpointToRef` via a `RpcEndpointRef` + */ + private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]() + + private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = { +endpointToRef.put(endpoint, endpointRef) +refToEndpoint.put(endpointRef, endpoint) + } + + private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = { +val endpoint = refToEndpoint.remove(endpointRef) +if (endpoint != null) { + endpointToRef.remove(endpoint) +} + } + + /** + * Retrieve the [[RpcEndpointRef]] of `endpoint`. + */ + override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = { +val endpointRef = endpointToRef.get(endpoint) +require(endpointRef != null, sCannot find RpcEndpointRef of ${endpoint} in ${this}) +endpointRef + } + + override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +setupThreadSafeEndpoint(name, endpoint) + } + + override def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { +@volatile var endpointRef: AkkaRpcEndpointRef = null +// Use lazy because the Actor needs to use `endpointRef`. +// So `actorRef` should be created after assigning `endpointRef`. +lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { + + require(endpointRef != null) + registerEndpoint(endpoint, endpointRef) + + override def preStart(): Unit = { +// Listen for remote client network events +context.system.eventStream.subscribe(self, classOf[AssociationEvent]) +safelyCall(endpoint) { + endpoint.onStart() +} +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26960440 --- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala --- @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException} + +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{SparkException, SparkConf} + +/** + * Common tests for an RpcEnv implementation. + */ +abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { + + var env: RpcEnv = _ + + override def beforeAll(): Unit = { +val conf = new SparkConf() +env = createRpcEnv(conf, local, 12345) + } + + override def afterAll(): Unit = { +if(env != null) { + env.shutdown() +} + } + + def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv + + test(send a message locally) { +@volatile var message: String = null +val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) +rpcEndpointRef.send(hello) +eventually(timeout(5 seconds), interval(10 millis)) { + assert(hello === message) +} + } + + test(send a message remotely) { +@volatile var message: String = null +// Set up a RpcEndpoint using env +env.setupEndpoint(send-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, send-remotely) +try { + rpcEndpointRef.send(hello) + eventually(timeout(5 seconds), interval(10 millis)) { +assert(hello === message) + } +} finally { + anotherEnv.shutdown() + anotherEnv.awaitTermination() +} + } + + test(send a RpcEndpointRef) { +val endpoint = new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case Hello = context.reply(self) +case Echo = context.reply(Echo) + } +} +val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint) + +val newRpcEndpointRef = rpcEndpointRef.askWithReply[RpcEndpointRef](Hello) +val reply = newRpcEndpointRef.askWithReply[String](Echo) +assert(Echo === reply) + } + + test(ask a message locally) { +val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) +val reply = rpcEndpointRef.askWithReply[String](hello) +assert(hello === reply) + } + + test(ask a message remotely) { +env.setupEndpoint(ask-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, ask-remotely)
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26960403 --- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala --- @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException} + +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{SparkException, SparkConf} + +/** + * Common tests for an RpcEnv implementation. + */ +abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { + + var env: RpcEnv = _ + + override def beforeAll(): Unit = { +val conf = new SparkConf() +env = createRpcEnv(conf, local, 12345) + } + + override def afterAll(): Unit = { +if(env != null) { + env.shutdown() +} + } + + def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv + + test(send a message locally) { +@volatile var message: String = null +val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) +rpcEndpointRef.send(hello) +eventually(timeout(5 seconds), interval(10 millis)) { + assert(hello === message) +} + } + + test(send a message remotely) { +@volatile var message: String = null +// Set up a RpcEndpoint using env +env.setupEndpoint(send-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, send-remotely) +try { + rpcEndpointRef.send(hello) + eventually(timeout(5 seconds), interval(10 millis)) { +assert(hello === message) + } +} finally { + anotherEnv.shutdown() + anotherEnv.awaitTermination() +} + } + + test(send a RpcEndpointRef) { +val endpoint = new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case Hello = context.reply(self) +case Echo = context.reply(Echo) + } +} +val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint) + +val newRpcEndpointRef = rpcEndpointRef.askWithReply[RpcEndpointRef](Hello) +val reply = newRpcEndpointRef.askWithReply[String](Echo) +assert(Echo === reply) + } + + test(ask a message locally) { +val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) +val reply = rpcEndpointRef.askWithReply[String](hello) +assert(hello === reply) + } + + test(ask a message remotely) { +env.setupEndpoint(ask-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, ask-remotely)
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26960291 --- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala --- @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException} + +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{SparkException, SparkConf} + +/** + * Common tests for an RpcEnv implementation. + */ +abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { + + var env: RpcEnv = _ + + override def beforeAll(): Unit = { +val conf = new SparkConf() +env = createRpcEnv(conf, local, 12345) + } + + override def afterAll(): Unit = { +if(env != null) { + env.shutdown() +} + } + + def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv + + test(send a message locally) { +@volatile var message: String = null +val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) +rpcEndpointRef.send(hello) +eventually(timeout(5 seconds), interval(10 millis)) { + assert(hello === message) +} + } + + test(send a message remotely) { +@volatile var message: String = null +// Set up a RpcEndpoint using env +env.setupEndpoint(send-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, send-remotely) +try { + rpcEndpointRef.send(hello) + eventually(timeout(5 seconds), interval(10 millis)) { +assert(hello === message) + } +} finally { + anotherEnv.shutdown() + anotherEnv.awaitTermination() +} + } + + test(send a RpcEndpointRef) { +val endpoint = new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case Hello = context.reply(self) +case Echo = context.reply(Echo) + } +} +val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint) + +val newRpcEndpointRef = rpcEndpointRef.askWithReply[RpcEndpointRef](Hello) +val reply = newRpcEndpointRef.askWithReply[String](Echo) +assert(Echo === reply) + } + + test(ask a message locally) { +val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) +val reply = rpcEndpointRef.askWithReply[String](hello) +assert(hello === reply) + } + + test(ask a message remotely) { +env.setupEndpoint(ask-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, ask-remotely)
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-8519 Left mostly minor comments, otherwise looks good. We can iron out any kinks later. There's just some odd code in the test suite, where you're calling `stop` in a shared RPC env variable. That looks a little suspicious. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r27000701 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] abstract class RpcEnv(conf: SparkConf) { + + private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf) + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously. + */ + def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef] + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a blocking action. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef = { +Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + * asynchronously. + */ + def asyncSetupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): Future[RpcEndpointRef] = { +asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName`. + * This is a blocking action. + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef = { +setupEndpointRefByUrl(uriOf(systemName, address, endpointName)) + } + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit +
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26999735 --- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala --- @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException} + +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{SparkException, SparkConf} + +/** + * Common tests for an RpcEnv implementation. + */ +abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { + + var env: RpcEnv = _ + + override def beforeAll(): Unit = { +val conf = new SparkConf() +env = createRpcEnv(conf, local, 12345) + } + + override def afterAll(): Unit = { +if(env != null) { + env.shutdown() +} + } + + def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv + + test(send a message locally) { +@volatile var message: String = null +val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) +rpcEndpointRef.send(hello) +eventually(timeout(5 seconds), interval(10 millis)) { + assert(hello === message) +} + } + + test(send a message remotely) { +@volatile var message: String = null +// Set up a RpcEndpoint using env +env.setupEndpoint(send-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receive = { +case msg: String = message = msg + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, send-remotely) +try { + rpcEndpointRef.send(hello) + eventually(timeout(5 seconds), interval(10 millis)) { +assert(hello === message) + } +} finally { + anotherEnv.shutdown() + anotherEnv.awaitTermination() +} + } + + test(send a RpcEndpointRef) { +val endpoint = new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case Hello = context.reply(self) +case Echo = context.reply(Echo) + } +} +val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint) + +val newRpcEndpointRef = rpcEndpointRef.askWithReply[RpcEndpointRef](Hello) +val reply = newRpcEndpointRef.askWithReply[String](Echo) +assert(Echo === reply) + } + + test(ask a message locally) { +val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) +val reply = rpcEndpointRef.askWithReply[String](hello) +assert(hello === reply) + } + + test(ask a message remotely) { +env.setupEndpoint(ask-remotely, new RpcEndpoint { + override val rpcEnv = env + + override def receiveAndReply(context: RpcCallContext) = { +case msg: String = { + context.reply(msg) +} + } +}) + +val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345) +// Use anotherEnv to find out the RpcEndpointRef +val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, ask-remotely)
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-85329972 [Test build #29050 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29050/consoleFull) for PR 4588 at commit [`8da4c44`](https://github.com/apache/spark/commit/8da4c444c39348a2c6ce9890031c498a7027a15e). * This patch **does not merge cleanly**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-84779926 @vanzin could you take a look this one again this week? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-82060820 I added a non-blocking method `def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]` so that people can retrieve `RpcEndpointRef` in the message loop. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-82058962 [Test build #28692 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28692/consoleFull) for PR 4588 at commit [`2cc3f78`](https://github.com/apache/spark/commit/2cc3f788b23d2ce3ba468bd603c70613481b1219). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-82091399 [Test build #28692 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28692/consoleFull) for PR 4588 at commit [`2cc3f78`](https://github.com/apache/spark/commit/2cc3f788b23d2ce3ba468bd603c70613481b1219). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class OutputCommitCoordinatorEndpoint(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-82091422 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28692/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-79118468 I'm busy with some blockers internally but I'll try to get to this later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26147989 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + +import org.apache.spark.{SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.Utils + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] trait RpcEnv { + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit + + /** + * Wait until [[RpcEnv]] exits. + * + * TODO do we need a timeout parameter? + */ + def awaitTermination(): Unit + + /** + * Create a URI used to create a [[RpcEndpointRef]]. Use this one to create the URI instead of + * creating it manually because different [[RpcEnv]] may have different formats. + */ + def uriOf(systemName: String, address: RpcAddress, endpointName: String): String --- End diff -- So, another argument for just using URLs instead of having all these parameters is the fact that you even need this method... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26148867 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + +import org.apache.spark.{SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.Utils + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] trait RpcEnv { + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit + + /** + * Wait until [[RpcEnv]] exits. + * + * TODO do we need a timeout parameter? + */ + def awaitTermination(): Unit + + /** + * Create a URI used to create a [[RpcEndpointRef]]. Use this one to create the URI instead of + * creating it manually because different [[RpcEnv]] may have different formats. + */ + def uriOf(systemName: String, address: RpcAddress, endpointName: String): String +} + +private[spark] case class RpcEnvConfig( +conf: SparkConf, +name: String, +host: String, +port: Int, +securityManager: SecurityManager) + +/** + * A RpcEnv implementation must have a companion object with an + * `apply(config: RpcEnvConfig): RpcEnv` method so that it can be created via Reflection. + * + * {{{ + * object MyCustomRpcEnv { + * def apply(config: RpcEnvConfig): RpcEnv = { + * ... + * }
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26225879 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + +import org.apache.spark.{SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.Utils + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] trait RpcEnv { + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + * + * Thread-safety means processing of one message happens before processing of the next message by + * the same [[RpcEndpoint]]. In the other words, changes to internal fields of a [[RpcEndpoint]] + * are visible when processing the next message, and fields in the [[RpcEndpoint]] need not be + * volatile or equivalent. + * + * However, there is no guarantee that the same thread will be executing the same [[RpcEndpoint]] + * for different messages. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit + + /** + * Wait until [[RpcEnv]] exits. + * + * TODO do we need a timeout parameter? + */ + def awaitTermination(): Unit + + /** + * Create a URI used to create a [[RpcEndpointRef]]. Use this one to create the URI instead of + * creating it manually because different [[RpcEnv]] may have different formats. + */ + def uriOf(systemName: String, address: RpcAddress, endpointName: String): String +} + +private[spark] case class RpcEnvConfig( +conf: SparkConf, +name: String, +host: String, +port: Int, +securityManager: SecurityManager) + +/** + * A RpcEnv implementation must have a companion object with an + * `apply(config: RpcEnvConfig): RpcEnv` method so that it can be created via Reflection. + * + * {{{ + * object MyCustomRpcEnv { + * def apply(config: RpcEnvConfig): RpcEnv = { + * ... + * }
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-78765109 @vanzin other comments, or any comments I missed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26228790 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + +import org.apache.spark.{SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.Utils + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] trait RpcEnv { + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve a [[RpcEndpointRef]] which is located in the driver via its name. + */ + def setupDriverEndpointRef(name: String): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` + */ + def setupEndpointRef( + systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef + + /** + * Stop [[RpcEndpoint]] specified by `endpoint`. + */ + def stop(endpoint: RpcEndpointRef): Unit + + /** + * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, + * call [[awaitTermination()]] straight after [[shutdown()]]. + */ + def shutdown(): Unit + + /** + * Wait until [[RpcEnv]] exits. + * + * TODO do we need a timeout parameter? + */ + def awaitTermination(): Unit + + /** + * Create a URI used to create a [[RpcEndpointRef]]. Use this one to create the URI instead of + * creating it manually because different [[RpcEnv]] may have different formats. + */ + def uriOf(systemName: String, address: RpcAddress, endpointName: String): String +} + +private[spark] case class RpcEnvConfig( +conf: SparkConf, +name: String, +host: String, +port: Int, +securityManager: SecurityManager) + +/** + * A RpcEnv implementation must have a companion object with an + * `apply(config: RpcEnvConfig): RpcEnv` method so that it can be created via Reflection. + * + * {{{ + * object MyCustomRpcEnv { + * def apply(config: RpcEnvConfig): RpcEnv = { + * ... + * } + * } + * }}} + */ +private[spark] object RpcEnv { + + private def getRpcEnvCompanion(conf: SparkConf): AnyRef = { +// Add more RpcEnv implementations here +val rpcEnvNames = Map(akka - org.apache.spark.rpc.akka.AkkaRpcEnv) +val rpcEnvName = conf.get(spark.rpc, akka) +val
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-78220712 [Test build #28467 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28467/consoleFull) for PR 4588 at commit [`08564ae`](https://github.com/apache/spark/commit/08564ae75fbdf02897dd5bd452fc4dbcd44532e4). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/4588#discussion_r26194514 --- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala --- @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc + +import java.net.URI + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag + +import org.apache.spark.{SparkException, SecurityManager, SparkConf} +import org.apache.spark.util.Utils + +/** + * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to + * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote + * nodes, and deliver them to corresponding [[RpcEndpoint]]s. + * + * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. + */ +private[spark] trait RpcEnv { + + /** + * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement + * [[RpcEndpoint.self]]. + */ + private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Return the address that [[RpcEnv]] is listening to. + */ + def address: RpcAddress + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not + * guarantee thread-safety. + */ + def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] should + * make sure thread-safely sending messages to [[RpcEndpoint]]. + */ + def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef + + /** + * Retrieve a [[RpcEndpointRef]] which is located in the driver via its name. + */ + def setupDriverEndpointRef(name: String): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `url`. + */ + def setupEndpointRefByUrl(url: String): RpcEndpointRef + + /** + * Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` --- End diff -- That's fine. Let's create a jira ticket to track it. Put it as part of the umbrella ticket. Created: https://issues.apache.org/jira/browse/SPARK-6280 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-78230371 [Test build #28467 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28467/consoleFull) for PR 4588 at commit [`08564ae`](https://github.com/apache/spark/commit/08564ae75fbdf02897dd5bd452fc4dbcd44532e4). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class OutputCommitCoordinatorEndpoint(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4588#issuecomment-78230380 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28467/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org