[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87515130
  
  [Test build #29379 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29379/consoleFull)
 for   PR 4588 at commit 
[`f6f3287`](https://github.com/apache/spark/commit/f6f3287092097f40d8f159321a55b5164cb10968).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363583
  
--- Diff: core/src/main/scala/org/apache/spark/util/RpcUtils.scala ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.{SparkEnv, SparkConf}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+
+object RpcUtils {
+
+  /**
+   * Retrieve a [[RpcEndpointRef]] which is located in the driver via its 
name.
+   */
+  def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): 
RpcEndpointRef = {
+val driverActorSystemName = SparkEnv.driverActorSystemName
+val driverHost: String = conf.get(spark.driver.host, localhost)
+val driverPort: Int = conf.getInt(spark.driver.port, 7077)
+Utils.checkHost(driverHost, Expected hostname)
--- End diff --

I copied it from AkkaUtils.makeDriverRef. Looks it's a legacy code because 
the current `checkHost` does not really verify the host:
```Scala
  def checkHost(host: String, message: String = ) {
assert(host.indexOf(':') == -1, message)
  }
```
Checking `:` is not enough if we really want to check it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87549328
  
@rxin @CodingCat @vanzin @aarondav Thanks for reviewing this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87513980
  
  [Test build #29378 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29378/consoleFull)
 for   PR 4588 at commit 
[`8bd1097`](https://github.com/apache/spark/commit/8bd10973798f80a30d7bf37ff3fdf40a953e5da7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363465
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
   

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363678
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
--- End diff --

Changes urls to URIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363681
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ---
@@ -17,58 +17,65 @@
 
 package org.apache.spark.deploy.worker
 
-import akka.actor.{Actor, Address, AddressFromURIString}
-import akka.remote.{AssociatedEvent, AssociationErrorEvent, 
AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}
-
 import org.apache.spark.Logging
 import org.apache.spark.deploy.DeployMessages.SendHeartbeat
-import org.apache.spark.util.ActorLogReceive
+import org.apache.spark.rpc._
 
 /**
  * Actor which connects to a worker process and terminates the JVM if the 
connection is severed.
  * Provides fate sharing between a worker and its associated child 
processes.
  */
-private[spark] class WorkerWatcher(workerUrl: String)
-  extends Actor with ActorLogReceive with Logging {
-
-  override def preStart() {
-context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: 
String)
+  extends RpcEndpoint with Logging {
 
+  override def onStart() {
 logInfo(sConnecting to worker $workerUrl)
-val worker = context.actorSelection(workerUrl)
-worker ! SendHeartbeat // need to send a message here to initiate 
connection
+if (!isTesting) {
+  rpcEnv.asyncSetupEndpointRefByUrl(workerUrl)
+}
   }
 
   // Used to avoid shutting down JVM during tests
+  // In the normal case, exitNonZero will call `System.exit(-1)` to 
shutdown the JVM. In the unit
+  // test, the user should call `setTesting(true)` so that `exitNonZero` 
will set `isShutDown` to
+  // true rather than calling `System.exit`. The user can check 
`isShutDown` to know if
+  // `exitNonZero` is called.
   private[deploy] var isShutDown = false
   private[deploy] def setTesting(testing: Boolean) = isTesting = testing
   private var isTesting = false
 
   // Lets us filter events only from the worker's actor system
-  private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
-  private def isWorker(address: Address) = address.hostPort == 
expectedHostPort
+  private val expectedHostPort = new java.net.URI(workerUrl)
--- End diff --

Updated to use ` RpcAddress.fromUriString`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87533809
  
  [Test build #29379 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29379/consoleFull)
 for   PR 4588 at commit 
[`f6f3287`](https://github.com/apache/spark/commit/f6f3287092097f40d8f159321a55b5164cb10968).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.
 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/4588


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87532900
  
  [Test build #29378 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29378/consoleFull)
 for   PR 4588 at commit 
[`8bd1097`](https://github.com/apache/spark/commit/8bd10973798f80a30d7bf37ff3fdf40a953e5da7).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.
 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87532945
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29378/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363528
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363505
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
--- End diff --

I used it to make sure the laziness thing is correct. But now it's not 
necessary. I changed it to `assert`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363513
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363474
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
   

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363536
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363480
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
--- End diff --

Added docs for `endpointRef` about a non-existent endpoint


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363917
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  assert(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87535876
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29380/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87535863
  
  [Test build #29380 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29380/consoleFull)
 for   PR 4588 at commit 
[`fe3df4c`](https://github.com/apache/spark/commit/fe3df4cbd9efa052803f0c3d12544874b649728b).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.
 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87537771
  
Thanks. I've merged this in master. Let's start replacing the direct use of 
actors with this API. Great work!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363384
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
   

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363366
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
   

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27364030
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  assert(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
--- End diff --

Ah, sorry. Yes, it can be moved. Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27364040
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  assert(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87520468
  
  [Test build #29380 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29380/consoleFull)
 for   PR 4588 at commit 
[`fe3df4c`](https://github.com/apache/spark/commit/fe3df4cbd9efa052803f0c3d12544874b649728b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363808
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27363828
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  assert(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
--- End diff --

Ah - I think there was a miscommunication on theses lines. I was not 
worried about require vs assert, I was wondering if we could just invoke 
`registerEndpoint(endpoint, endpointRef)` right before `endpointRef.init()`, on 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87533813
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29379/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348578
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348715
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
--- End diff --

I believe I am missing something here, but what prevents us from invoking 
these methods after setting endpointRef (inside setupThreadSafeEndpoint, rather 
than the Actor's constructor)? This would mean that the actor is not valid 
immediately after construction, but I think the laziness thing is making a 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348835
  
--- Diff: core/src/main/scala/org/apache/spark/util/RpcUtils.scala ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.{SparkEnv, SparkConf}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+
+object RpcUtils {
+
+  /**
+   * Retrieve a [[RpcEndpointRef]] which is located in the driver via its 
name.
+   */
+  def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): 
RpcEndpointRef = {
+val driverActorSystemName = SparkEnv.driverActorSystemName
+val driverHost: String = conf.get(spark.driver.host, localhost)
+val driverPort: Int = conf.getInt(spark.driver.port, 7077)
+Utils.checkHost(driverHost, Expected hostname)
--- End diff --

Why is this a requirement? For Akka's sake?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348819
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348651
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348855
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348684
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
--- End diff --

This behavior should be documented (that it is an exception to call this on 
a non-existent endpoint), and that this method will never return null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348730
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-87317475
  
Just a few mainly minor comments, otherwise LGTM. We can iterate on some of 
the API moving forward as we get more endpoints using it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348783
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348800
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348625
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348604
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348886
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
--- End diff --

Should these be URIs instead? URIs are, I believe, a strict superset of 
URLs and do not encode any notions about hostname resolution, and we refer to 
URIs elsewhere, so probably should stick with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27348955
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ---
@@ -17,58 +17,65 @@
 
 package org.apache.spark.deploy.worker
 
-import akka.actor.{Actor, Address, AddressFromURIString}
-import akka.remote.{AssociatedEvent, AssociationErrorEvent, 
AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}
-
 import org.apache.spark.Logging
 import org.apache.spark.deploy.DeployMessages.SendHeartbeat
-import org.apache.spark.util.ActorLogReceive
+import org.apache.spark.rpc._
 
 /**
  * Actor which connects to a worker process and terminates the JVM if the 
connection is severed.
  * Provides fate sharing between a worker and its associated child 
processes.
  */
-private[spark] class WorkerWatcher(workerUrl: String)
-  extends Actor with ActorLogReceive with Logging {
-
-  override def preStart() {
-context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: 
String)
+  extends RpcEndpoint with Logging {
 
+  override def onStart() {
 logInfo(sConnecting to worker $workerUrl)
-val worker = context.actorSelection(workerUrl)
-worker ! SendHeartbeat // need to send a message here to initiate 
connection
+if (!isTesting) {
+  rpcEnv.asyncSetupEndpointRefByUrl(workerUrl)
+}
   }
 
   // Used to avoid shutting down JVM during tests
+  // In the normal case, exitNonZero will call `System.exit(-1)` to 
shutdown the JVM. In the unit
+  // test, the user should call `setTesting(true)` so that `exitNonZero` 
will set `isShutDown` to
+  // true rather than calling `System.exit`. The user can check 
`isShutDown` to know if
+  // `exitNonZero` is called.
   private[deploy] var isShutDown = false
   private[deploy] def setTesting(testing: Boolean) = isTesting = testing
   private var isTesting = false
 
   // Lets us filter events only from the worker's actor system
-  private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
-  private def isWorker(address: Address) = address.hostPort == 
expectedHostPort
+  private val expectedHostPort = new java.net.URI(workerUrl)
--- End diff --

Perhaps RpcAddress.fromUriString(workerUrl) and use of the == method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85417800
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29073/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85417764
  
  [Test build #29073 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29073/consoleFull)
 for   PR 4588 at commit 
[`9ffa997`](https://github.com/apache/spark/commit/9ffa9979cc08b0338411465f736f030a67122304).
 * This patch **fails Spark unit tests**.

 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85422331
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85423410
  
  [Test build #29078 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29078/consoleFull)
 for   PR 4588 at commit 
[`9ffa997`](https://github.com/apache/spark/commit/9ffa9979cc08b0338411465f736f030a67122304).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85451891
  
  [Test build #29078 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29078/consoleFull)
 for   PR 4588 at commit 
[`9ffa997`](https://github.com/apache/spark/commit/9ffa9979cc08b0338411465f736f030a67122304).
 * This patch **passes all tests**.

 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85451900
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29078/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85358650
  
  [Test build #29057 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29057/consoleFull)
 for   PR 4588 at commit 
[`385b9c3`](https://github.com/apache/spark/commit/385b9c3e34d39adcddccfbeaeea29cbb0419270a).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005801
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ---
@@ -17,58 +17,65 @@
 
 package org.apache.spark.deploy.worker
 
-import akka.actor.{Actor, Address, AddressFromURIString}
-import akka.remote.{AssociatedEvent, AssociationErrorEvent, 
AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}
-
 import org.apache.spark.Logging
 import org.apache.spark.deploy.DeployMessages.SendHeartbeat
-import org.apache.spark.util.ActorLogReceive
+import org.apache.spark.rpc._
 
 /**
  * Actor which connects to a worker process and terminates the JVM if the 
connection is severed.
  * Provides fate sharing between a worker and its associated child 
processes.
  */
-private[spark] class WorkerWatcher(workerUrl: String)
-  extends Actor with ActorLogReceive with Logging {
-
-  override def preStart() {
-context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: 
String)
+  extends RpcEndpoint with Logging {
 
+  override def onStart() {
 logInfo(sConnecting to worker $workerUrl)
-val worker = context.actorSelection(workerUrl)
-worker ! SendHeartbeat // need to send a message here to initiate 
connection
+if (!isTesting) {
+  rpcEnv.asyncSetupEndpointRefByUrl(workerUrl)
+}
   }
 
   // Used to avoid shutting down JVM during tests
+  // In the normal case, exitNonZero will call `System.exit(-1)` to 
shutdown the JVM. In the unit
+  // test, the user should call `setTesting(true)` so that `exitNonZero` 
will set `isShutDown` to
+  // true rather than calling `System.exit`. The user can check 
`isShutDown` to know if
+  // `exitNonZero` is called.
   private[deploy] var isShutDown = false
   private[deploy] def setTesting(testing: Boolean) = isTesting = testing
   private var isTesting = false
 
   // Lets us filter events only from the worker's actor system
-  private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
-  private def isWorker(address: Address) = address.hostPort == 
expectedHostPort
+  private val expectedHostPort = new java.net.URI(workerUrl)
+  private def isWorker(address: RpcAddress) = {
+expectedHostPort.getHost == address.host  expectedHostPort.getPort 
== address.port
+  }
 
   def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
 
-  override def receiveWithLogging = {
-case AssociatedEvent(localAddress, remoteAddress, inbound) if 
isWorker(remoteAddress) =
-  logInfo(sSuccessfully connected to $workerUrl)
+  override def receive = {
+case e = logWarning(sReceived unexpected actor system event: $e)
--- End diff --

Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005797
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
   

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005781
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005771
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005780
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005722
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
   

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005775
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+ 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005759
  
--- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala ---
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException}
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkException, SparkConf}
+
+/**
+ * Common tests for an RpcEnv implementation.
+ */
+abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
+
+  var env: RpcEnv = _
+
+  override def beforeAll(): Unit = {
+val conf = new SparkConf()
+env = createRpcEnv(conf, local, 12345)
+  }
+
+  override def afterAll(): Unit = {
+if(env != null) {
+  env.shutdown()
+}
+  }
+
+  def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv
+
+  test(send a message locally) {
+@volatile var message: String = null
+val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint 
{
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+rpcEndpointRef.send(hello)
+eventually(timeout(5 seconds), interval(10 millis)) {
+  assert(hello === message)
+}
+  }
+
+  test(send a message remotely) {
+@volatile var message: String = null
+// Set up a RpcEndpoint using env
+env.setupEndpoint(send-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
send-remotely)
+try {
+  rpcEndpointRef.send(hello)
+  eventually(timeout(5 seconds), interval(10 millis)) {
+assert(hello === message)
+  }
+} finally {
+  anotherEnv.shutdown()
+  anotherEnv.awaitTermination()
+}
+  }
+
+  test(send a RpcEndpointRef) {
+val endpoint = new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case Hello = context.reply(self)
+case Echo = context.reply(Echo)
+  }
+}
+val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint)
+
+val newRpcEndpointRef = 
rpcEndpointRef.askWithReply[RpcEndpointRef](Hello)
+val reply = newRpcEndpointRef.askWithReply[String](Echo)
+assert(Echo === reply)
+  }
+
+  test(ask a message locally) {
+val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+val reply = rpcEndpointRef.askWithReply[String](hello)
+assert(hello === reply)
+  }
+
+  test(ask a message remotely) {
+env.setupEndpoint(ask-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
ask-remotely)
 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85364386
  
 Left mostly minor comments, otherwise looks good. We can iron out any 
kinks later.

Fixed them as per your comments. Thanks @vanzin 

 There's just some odd code in the test suite, where you're calling stop 
in a shared RPC env variable. That looks a little suspicious.

It stops the RpcEndpoint rather than RpcEnv.

For deprecating configurations , I created 
https://issues.apache.org/jira/browse/SPARK-6490 to trace it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27005671
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
   

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85378974
  
  [Test build #29057 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29057/consoleFull)
 for   PR 4588 at commit 
[`385b9c3`](https://github.com/apache/spark/commit/385b9c3e34d39adcddccfbeaeea29cbb0419270a).
 * This patch **fails MiMa tests**.

 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class OutputCommitCoordinatorEndpoint(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85390217
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29068/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85390216
  
  [Test build #29068 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29068/consoleFull)
 for   PR 4588 at commit 
[`1299550`](https://github.com/apache/spark/commit/1299550669c1cf3fa8b6adfd5e8f2eea9a82d1aa).
 * This patch **fails Scala style tests**.

 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class ExecutorCacheTaskLocation(override val host: String, 
executorId: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85378997
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29057/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85390115
  
  [Test build #29068 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29068/consoleFull)
 for   PR 4588 at commit 
[`1299550`](https://github.com/apache/spark/commit/1299550669c1cf3fa8b6adfd5e8f2eea9a82d1aa).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85392299
  
  [Test build #29073 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29073/consoleFull)
 for   PR 4588 at commit 
[`9ffa997`](https://github.com/apache/spark/commit/9ffa9979cc08b0338411465f736f030a67122304).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85348733
  
  [Test build #29050 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29050/consoleFull)
 for   PR 4588 at commit 
[`8da4c44`](https://github.com/apache/spark/commit/8da4c444c39348a2c6ce9890031c498a7027a15e).
 * This patch **passes all tests**.
 * This patch **does not merge cleanly**.
 * This patch adds the following public classes _(experimental)_:
  * `  class OutputCommitCoordinatorEndpoint(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85348738
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29050/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26957187
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ---
@@ -17,58 +17,65 @@
 
 package org.apache.spark.deploy.worker
 
-import akka.actor.{Actor, Address, AddressFromURIString}
-import akka.remote.{AssociatedEvent, AssociationErrorEvent, 
AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}
-
 import org.apache.spark.Logging
 import org.apache.spark.deploy.DeployMessages.SendHeartbeat
-import org.apache.spark.util.ActorLogReceive
+import org.apache.spark.rpc._
 
 /**
  * Actor which connects to a worker process and terminates the JVM if the 
connection is severed.
  * Provides fate sharing between a worker and its associated child 
processes.
  */
-private[spark] class WorkerWatcher(workerUrl: String)
-  extends Actor with ActorLogReceive with Logging {
-
-  override def preStart() {
-context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: 
String)
+  extends RpcEndpoint with Logging {
 
+  override def onStart() {
 logInfo(sConnecting to worker $workerUrl)
-val worker = context.actorSelection(workerUrl)
-worker ! SendHeartbeat // need to send a message here to initiate 
connection
+if (!isTesting) {
+  rpcEnv.asyncSetupEndpointRefByUrl(workerUrl)
+}
   }
 
   // Used to avoid shutting down JVM during tests
+  // In the normal case, exitNonZero will call `System.exit(-1)` to 
shutdown the JVM. In the unit
+  // test, the user should call `setTesting(true)` so that `exitNonZero` 
will set `isShutDown` to
+  // true rather than calling `System.exit`. The user can check 
`isShutDown` to know if
+  // `exitNonZero` is called.
   private[deploy] var isShutDown = false
   private[deploy] def setTesting(testing: Boolean) = isTesting = testing
   private var isTesting = false
 
   // Lets us filter events only from the worker's actor system
-  private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
-  private def isWorker(address: Address) = address.hostPort == 
expectedHostPort
+  private val expectedHostPort = new java.net.URI(workerUrl)
+  private def isWorker(address: RpcAddress) = {
+expectedHostPort.getHost == address.host  expectedHostPort.getPort 
== address.port
+  }
 
   def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
 
-  override def receiveWithLogging = {
-case AssociatedEvent(localAddress, remoteAddress, inbound) if 
isWorker(remoteAddress) =
-  logInfo(sSuccessfully connected to $workerUrl)
+  override def receive = {
+case e = logWarning(sReceived unexpected actor system event: $e)
--- End diff --

nit: actor system is very akka-specific.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26958182
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26957574
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26958295
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26958330
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26959685
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26958734
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26959384
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26958715
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.akka
+
+import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, 
Props, Address}
+import akka.pattern.{ask = akkaAsk}
+import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import org.apache.spark.{SparkException, Logging, SparkConf}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+
+/**
+ * A RpcEnv implementation based on Akka.
+ *
+ * TODO Once we remove all usages of Akka in other place, we can move this 
file to a new project and
+ * remove Akka from the dependencies.
+ *
+ * @param actorSystem
+ * @param conf
+ * @param boundPort
+ */
+private[spark] class AkkaRpcEnv private[akka] (
+val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
+  extends RpcEnv(conf) with Logging {
+
+  private val defaultAddress: RpcAddress = {
+val address = 
actorSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+// In some test case, ActorSystem doesn't bind to any address.
+// So just use some default value since they are only some unit tests
+RpcAddress(address.host.getOrElse(localhost), 
address.port.getOrElse(boundPort))
+  }
+
+  override val address: RpcAddress = defaultAddress
+
+  /**
+   * A lookup table to search a [[RpcEndpointRef]] for a [[RpcEndpoint]]. 
We need it to make
+   * [[RpcEndpoint.self]] work.
+   */
+  private val endpointToRef = new ConcurrentHashMap[RpcEndpoint, 
RpcEndpointRef]()
+
+  /**
+   * Need this map to remove `RpcEndpoint` from `endpointToRef` via a 
`RpcEndpointRef`
+   */
+  private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, 
RpcEndpoint]()
+
+  private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: 
RpcEndpointRef): Unit = {
+endpointToRef.put(endpoint, endpointRef)
+refToEndpoint.put(endpointRef, endpoint)
+  }
+
+  private def unregisterEndpoint(endpointRef: RpcEndpointRef): Unit = {
+val endpoint = refToEndpoint.remove(endpointRef)
+if (endpoint != null) {
+  endpointToRef.remove(endpoint)
+}
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] of `endpoint`.
+   */
+  override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
+val endpointRef = endpointToRef.get(endpoint)
+require(endpointRef != null, sCannot find RpcEndpointRef of 
${endpoint} in ${this})
+endpointRef
+  }
+
+  override def setupEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef = {
+setupThreadSafeEndpoint(name, endpoint)
+  }
+
+  override def setupThreadSafeEndpoint(name: String, endpoint: 
RpcEndpoint): RpcEndpointRef = {
+@volatile var endpointRef: AkkaRpcEndpointRef = null
+// Use lazy because the Actor needs to use `endpointRef`.
+// So `actorRef` should be created after assigning `endpointRef`.
+lazy val actorRef = actorSystem.actorOf(Props(new Actor with 
ActorLogReceive with Logging {
+
+  require(endpointRef != null)
+  registerEndpoint(endpoint, endpointRef)
+
+  override def preStart(): Unit = {
+// Listen for remote client network events
+context.system.eventStream.subscribe(self, 
classOf[AssociationEvent])
+safelyCall(endpoint) {
+  endpoint.onStart()
+}
+  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26960440
  
--- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala ---
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException}
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkException, SparkConf}
+
+/**
+ * Common tests for an RpcEnv implementation.
+ */
+abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
+
+  var env: RpcEnv = _
+
+  override def beforeAll(): Unit = {
+val conf = new SparkConf()
+env = createRpcEnv(conf, local, 12345)
+  }
+
+  override def afterAll(): Unit = {
+if(env != null) {
+  env.shutdown()
+}
+  }
+
+  def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv
+
+  test(send a message locally) {
+@volatile var message: String = null
+val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint 
{
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+rpcEndpointRef.send(hello)
+eventually(timeout(5 seconds), interval(10 millis)) {
+  assert(hello === message)
+}
+  }
+
+  test(send a message remotely) {
+@volatile var message: String = null
+// Set up a RpcEndpoint using env
+env.setupEndpoint(send-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
send-remotely)
+try {
+  rpcEndpointRef.send(hello)
+  eventually(timeout(5 seconds), interval(10 millis)) {
+assert(hello === message)
+  }
+} finally {
+  anotherEnv.shutdown()
+  anotherEnv.awaitTermination()
+}
+  }
+
+  test(send a RpcEndpointRef) {
+val endpoint = new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case Hello = context.reply(self)
+case Echo = context.reply(Echo)
+  }
+}
+val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint)
+
+val newRpcEndpointRef = 
rpcEndpointRef.askWithReply[RpcEndpointRef](Hello)
+val reply = newRpcEndpointRef.askWithReply[String](Echo)
+assert(Echo === reply)
+  }
+
+  test(ask a message locally) {
+val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+val reply = rpcEndpointRef.askWithReply[String](hello)
+assert(hello === reply)
+  }
+
+  test(ask a message remotely) {
+env.setupEndpoint(ask-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
ask-remotely)
  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26960403
  
--- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala ---
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException}
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkException, SparkConf}
+
+/**
+ * Common tests for an RpcEnv implementation.
+ */
+abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
+
+  var env: RpcEnv = _
+
+  override def beforeAll(): Unit = {
+val conf = new SparkConf()
+env = createRpcEnv(conf, local, 12345)
+  }
+
+  override def afterAll(): Unit = {
+if(env != null) {
+  env.shutdown()
+}
+  }
+
+  def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv
+
+  test(send a message locally) {
+@volatile var message: String = null
+val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint 
{
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+rpcEndpointRef.send(hello)
+eventually(timeout(5 seconds), interval(10 millis)) {
+  assert(hello === message)
+}
+  }
+
+  test(send a message remotely) {
+@volatile var message: String = null
+// Set up a RpcEndpoint using env
+env.setupEndpoint(send-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
send-remotely)
+try {
+  rpcEndpointRef.send(hello)
+  eventually(timeout(5 seconds), interval(10 millis)) {
+assert(hello === message)
+  }
+} finally {
+  anotherEnv.shutdown()
+  anotherEnv.awaitTermination()
+}
+  }
+
+  test(send a RpcEndpointRef) {
+val endpoint = new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case Hello = context.reply(self)
+case Echo = context.reply(Echo)
+  }
+}
+val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint)
+
+val newRpcEndpointRef = 
rpcEndpointRef.askWithReply[RpcEndpointRef](Hello)
+val reply = newRpcEndpointRef.askWithReply[String](Echo)
+assert(Echo === reply)
+  }
+
+  test(ask a message locally) {
+val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+val reply = rpcEndpointRef.askWithReply[String](hello)
+assert(hello === reply)
+  }
+
+  test(ask a message remotely) {
+env.setupEndpoint(ask-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
ask-remotely)
  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26960291
  
--- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala ---
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException}
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkException, SparkConf}
+
+/**
+ * Common tests for an RpcEnv implementation.
+ */
+abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
+
+  var env: RpcEnv = _
+
+  override def beforeAll(): Unit = {
+val conf = new SparkConf()
+env = createRpcEnv(conf, local, 12345)
+  }
+
+  override def afterAll(): Unit = {
+if(env != null) {
+  env.shutdown()
+}
+  }
+
+  def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv
+
+  test(send a message locally) {
+@volatile var message: String = null
+val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint 
{
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+rpcEndpointRef.send(hello)
+eventually(timeout(5 seconds), interval(10 millis)) {
+  assert(hello === message)
+}
+  }
+
+  test(send a message remotely) {
+@volatile var message: String = null
+// Set up a RpcEndpoint using env
+env.setupEndpoint(send-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
send-remotely)
+try {
+  rpcEndpointRef.send(hello)
+  eventually(timeout(5 seconds), interval(10 millis)) {
+assert(hello === message)
+  }
+} finally {
+  anotherEnv.shutdown()
+  anotherEnv.awaitTermination()
+}
+  }
+
+  test(send a RpcEndpointRef) {
+val endpoint = new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case Hello = context.reply(self)
+case Echo = context.reply(Echo)
+  }
+}
+val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint)
+
+val newRpcEndpointRef = 
rpcEndpointRef.askWithReply[RpcEndpointRef](Hello)
+val reply = newRpcEndpointRef.askWithReply[String](Echo)
+assert(Echo === reply)
+  }
+
+  test(ask a message locally) {
+val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+val reply = rpcEndpointRef.askWithReply[String](hello)
+assert(hello === reply)
+  }
+
+  test(ask a message remotely) {
+env.setupEndpoint(ask-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
ask-remotely)
  

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-8519
  
Left mostly minor comments, otherwise looks good. We can iron out any kinks 
later.

There's just some odd code in the test suite, where you're calling `stop` 
in a shared RPC env variable. That looks a little suspicious.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r27000701
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, SparkException, SecurityManager, 
SparkConf}
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] abstract class RpcEnv(conf: SparkConf) {
+
+  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url` asynchronously.
+   */
+  def asyncSetupEndpointRefByUrl(url: String): Future[RpcEndpointRef]
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`. This is a 
blocking action.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef = {
+Await.result(asyncSetupEndpointRefByUrl(url), defaultLookupTimeout)
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   * asynchronously.
+   */
+  def asyncSetupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
Future[RpcEndpointRef] = {
+asyncSetupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef = {
+setupEndpointRefByUrl(uriOf(systemName, address, endpointName))
+  }
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
   

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26999735
  
--- Diff: core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala ---
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.util.concurrent.{TimeUnit, CountDownLatch, TimeoutException}
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{SparkException, SparkConf}
+
+/**
+ * Common tests for an RpcEnv implementation.
+ */
+abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
+
+  var env: RpcEnv = _
+
+  override def beforeAll(): Unit = {
+val conf = new SparkConf()
+env = createRpcEnv(conf, local, 12345)
+  }
+
+  override def afterAll(): Unit = {
+if(env != null) {
+  env.shutdown()
+}
+  }
+
+  def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv
+
+  test(send a message locally) {
+@volatile var message: String = null
+val rpcEndpointRef = env.setupEndpoint(send-locally, new RpcEndpoint 
{
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+rpcEndpointRef.send(hello)
+eventually(timeout(5 seconds), interval(10 millis)) {
+  assert(hello === message)
+}
+  }
+
+  test(send a message remotely) {
+@volatile var message: String = null
+// Set up a RpcEndpoint using env
+env.setupEndpoint(send-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receive = {
+case msg: String = message = msg
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote ,13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
send-remotely)
+try {
+  rpcEndpointRef.send(hello)
+  eventually(timeout(5 seconds), interval(10 millis)) {
+assert(hello === message)
+  }
+} finally {
+  anotherEnv.shutdown()
+  anotherEnv.awaitTermination()
+}
+  }
+
+  test(send a RpcEndpointRef) {
+val endpoint = new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case Hello = context.reply(self)
+case Echo = context.reply(Echo)
+  }
+}
+val rpcEndpointRef = env.setupEndpoint(send-ref, endpoint)
+
+val newRpcEndpointRef = 
rpcEndpointRef.askWithReply[RpcEndpointRef](Hello)
+val reply = newRpcEndpointRef.askWithReply[String](Echo)
+assert(Echo === reply)
+  }
+
+  test(ask a message locally) {
+val rpcEndpointRef = env.setupEndpoint(ask-locally, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+val reply = rpcEndpointRef.askWithReply[String](hello)
+assert(hello === reply)
+  }
+
+  test(ask a message remotely) {
+env.setupEndpoint(ask-remotely, new RpcEndpoint {
+  override val rpcEnv = env
+
+  override def receiveAndReply(context: RpcCallContext) = {
+case msg: String = {
+  context.reply(msg)
+}
+  }
+})
+
+val anotherEnv = createRpcEnv(new SparkConf(), remote, 13345)
+// Use anotherEnv to find out the RpcEndpointRef
+val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, 
ask-remotely)
 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-85329972
  
  [Test build #29050 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29050/consoleFull)
 for   PR 4588 at commit 
[`8da4c44`](https://github.com/apache/spark/commit/8da4c444c39348a2c6ce9890031c498a7027a15e).
 * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-22 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-84779926
  
@vanzin could you take a look this one again this week? Thanks! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-16 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-82060820
  
I added a non-blocking method `def asyncSetupEndpointRefByUrl(url: String): 
Future[RpcEndpointRef]` so that people can retrieve `RpcEndpointRef` in the 
message loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-82058962
  
  [Test build #28692 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28692/consoleFull)
 for   PR 4588 at commit 
[`2cc3f78`](https://github.com/apache/spark/commit/2cc3f788b23d2ce3ba468bd603c70613481b1219).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-82091399
  
  [Test build #28692 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28692/consoleFull)
 for   PR 4588 at commit 
[`2cc3f78`](https://github.com/apache/spark/commit/2cc3f788b23d2ce3ba468bd603c70613481b1219).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class OutputCommitCoordinatorEndpoint(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-82091422
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28692/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-13 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-79118468
  
I'm busy with some blockers internally but I'll try to get to this later 
today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-12 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26147989
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.reflect.ClassTag
+
+import org.apache.spark.{SparkException, SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] trait RpcEnv {
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
+  /**
+   * Wait until [[RpcEnv]] exits.
+   *
+   * TODO do we need a timeout parameter?
+   */
+  def awaitTermination(): Unit
+
+  /**
+   * Create a URI used to create a [[RpcEndpointRef]]. Use this one to 
create the URI instead of
+   * creating it manually because different [[RpcEnv]] may have different 
formats.
+   */
+  def uriOf(systemName: String, address: RpcAddress, endpointName: 
String): String
--- End diff --

So, another argument for just using URLs instead of having all these 
parameters is the fact that you even need this method...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-12 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26148867
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.reflect.ClassTag
+
+import org.apache.spark.{SparkException, SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] trait RpcEnv {
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
+  /**
+   * Wait until [[RpcEnv]] exits.
+   *
+   * TODO do we need a timeout parameter?
+   */
+  def awaitTermination(): Unit
+
+  /**
+   * Create a URI used to create a [[RpcEndpointRef]]. Use this one to 
create the URI instead of
+   * creating it manually because different [[RpcEnv]] may have different 
formats.
+   */
+  def uriOf(systemName: String, address: RpcAddress, endpointName: 
String): String
+}
+
+private[spark] case class RpcEnvConfig(
+conf: SparkConf,
+name: String,
+host: String,
+port: Int,
+securityManager: SecurityManager)
+
+/**
+ * A RpcEnv implementation must have a companion object with an
+ * `apply(config: RpcEnvConfig): RpcEnv` method so that it can be created 
via Reflection.
+ *
+ * {{{
+ * object MyCustomRpcEnv {
+ *   def apply(config: RpcEnvConfig): RpcEnv = {
+ * ...
+ *   }

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-12 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26225879
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.reflect.ClassTag
+
+import org.apache.spark.{SparkException, SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] trait RpcEnv {
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   *
+   * Thread-safety means processing of one message happens before 
processing of the next message by
+   * the same [[RpcEndpoint]]. In the other words, changes to internal 
fields of a [[RpcEndpoint]]
+   * are visible when processing the next message, and fields in the 
[[RpcEndpoint]] need not be
+   * volatile or equivalent.
+   *
+   * However, there is no guarantee that the same thread will be executing 
the same [[RpcEndpoint]]
+   * for different messages.
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
+  /**
+   * Wait until [[RpcEnv]] exits.
+   *
+   * TODO do we need a timeout parameter?
+   */
+  def awaitTermination(): Unit
+
+  /**
+   * Create a URI used to create a [[RpcEndpointRef]]. Use this one to 
create the URI instead of
+   * creating it manually because different [[RpcEnv]] may have different 
formats.
+   */
+  def uriOf(systemName: String, address: RpcAddress, endpointName: 
String): String
+}
+
+private[spark] case class RpcEnvConfig(
+conf: SparkConf,
+name: String,
+host: String,
+port: Int,
+securityManager: SecurityManager)
+
+/**
+ * A RpcEnv implementation must have a companion object with an
+ * `apply(config: RpcEnvConfig): RpcEnv` method so that it can be created 
via Reflection.
+ *
+ * {{{
+ * object MyCustomRpcEnv {
+ *   def apply(config: RpcEnvConfig): RpcEnv = {
+ * ...
+ *   }

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-12 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-78765109
  
@vanzin other comments, or any comments I missed? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-11 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26228790
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.reflect.ClassTag
+
+import org.apache.spark.{SparkException, SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] trait RpcEnv {
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve a [[RpcEndpointRef]] which is located in the driver via its 
name.
+   */
+  def setupDriverEndpointRef(name: String): RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
+   */
+  def setupEndpointRef(
+  systemName: String, address: RpcAddress, endpointName: String): 
RpcEndpointRef
+
+  /**
+   * Stop [[RpcEndpoint]] specified by `endpoint`.
+   */
+  def stop(endpoint: RpcEndpointRef): Unit
+
+  /**
+   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure 
[[RpcEnv]] exits successfully,
+   * call [[awaitTermination()]] straight after [[shutdown()]].
+   */
+  def shutdown(): Unit
+
+  /**
+   * Wait until [[RpcEnv]] exits.
+   *
+   * TODO do we need a timeout parameter?
+   */
+  def awaitTermination(): Unit
+
+  /**
+   * Create a URI used to create a [[RpcEndpointRef]]. Use this one to 
create the URI instead of
+   * creating it manually because different [[RpcEnv]] may have different 
formats.
+   */
+  def uriOf(systemName: String, address: RpcAddress, endpointName: 
String): String
+}
+
+private[spark] case class RpcEnvConfig(
+conf: SparkConf,
+name: String,
+host: String,
+port: Int,
+securityManager: SecurityManager)
+
+/**
+ * A RpcEnv implementation must have a companion object with an
+ * `apply(config: RpcEnvConfig): RpcEnv` method so that it can be created 
via Reflection.
+ *
+ * {{{
+ * object MyCustomRpcEnv {
+ *   def apply(config: RpcEnvConfig): RpcEnv = {
+ * ...
+ *   }
+ * }
+ * }}}
+ */
+private[spark] object RpcEnv {
+
+  private def getRpcEnvCompanion(conf: SparkConf): AnyRef = {
+// Add more RpcEnv implementations here
+val rpcEnvNames = Map(akka - org.apache.spark.rpc.akka.AkkaRpcEnv)
+val rpcEnvName = conf.get(spark.rpc, akka)
+val 

[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-78220712
  
  [Test build #28467 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28467/consoleFull)
 for   PR 4588 at commit 
[`08564ae`](https://github.com/apache/spark/commit/08564ae75fbdf02897dd5bd452fc4dbcd44532e4).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/4588#discussion_r26194514
  
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.net.URI
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.reflect.ClassTag
+
+import org.apache.spark.{SparkException, SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
+
+/**
+ * An RPC environment. [[RpcEndpoint]]s need to register itself with a 
name to [[RpcEnv]] to
+ * receives messages. Then [[RpcEnv]] will process messages sent from 
[[RpcEndpointRef]] or remote
+ * nodes, and deliver them to corresponding [[RpcEndpoint]]s.
+ *
+ * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s 
given name or uri.
+ */
+private[spark] trait RpcEnv {
+
+  /**
+   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used 
to implement
+   * [[RpcEndpoint.self]].
+   */
+  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Return the address that [[RpcEnv]] is listening to.
+   */
+  def address: RpcAddress
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] does not
+   * guarantee thread-safety.
+   */
+  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
+
+  /**
+   * Register a [[RpcEndpoint]] with a name and return its 
[[RpcEndpointRef]]. [[RpcEnv]] should
+   * make sure thread-safely sending messages to [[RpcEndpoint]].
+   */
+  def setupThreadSafeEndpoint(name: String, endpoint: RpcEndpoint): 
RpcEndpointRef
+
+  /**
+   * Retrieve a [[RpcEndpointRef]] which is located in the driver via its 
name.
+   */
+  def setupDriverEndpointRef(name: String): RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `url`.
+   */
+  def setupEndpointRefByUrl(url: String): RpcEndpointRef
+
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `systemName`, 
`address` and `endpointName`
--- End diff --

 That's fine. Let's create a jira ticket to track it. Put it as part of 
the umbrella ticket.

Created: https://issues.apache.org/jira/browse/SPARK-6280


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-78230371
  
  [Test build #28467 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28467/consoleFull)
 for   PR 4588 at commit 
[`08564ae`](https://github.com/apache/spark/commit/08564ae75fbdf02897dd5bd452fc4dbcd44532e4).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class OutputCommitCoordinatorEndpoint(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5124][Core] A standard RPC interface an...

2015-03-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4588#issuecomment-78230380
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28467/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >